You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: active/0028-durable-shared-subscriptions.md
+66-5
Original file line number
Diff line number
Diff line change
@@ -3,6 +3,10 @@
3
3
## Changelog
4
4
5
5
* 2024-05-10: @savonarola Initial draft
6
+
* 2024-06-28: @savonarola
7
+
* Add the Agent abstraction
8
+
* Describe thr two-side communication sequence between an Agent and the SGL
9
+
* Describe the stream reassignment algorithm
6
10
7
11
## Abstract
8
12
@@ -28,11 +32,11 @@ SGL keeps track of topics belonging to the group, their streams, and stream stat
28
32
29
33
The groups' consumers are persistent sessions. They connect to the SGL, and the SGL leases them streams to consume. Sessions consume these streams together with their proper streams but do not persist the progress. Instead, they report the progress to the SGL.
30
34
31
-
SGL is responsible for reassigning streams to the other group consumers in case a consumer disconnects and for reassigning streams to the new consumers.
35
+
SGL is responsible for reassigning streams to the other group of consumers in case a consumer disconnects and for reassigning streams to the new consumers.
All communication between the consumers(sessions) and the SGL, SGLM, is done asynchronously because leaders may need to be spawned (this requires election) or may be running on a remote node.
39
+
All communication between the consumers(sessions) and the SGL, SGLM is done asynchronously because leaders may need to be spawned (this requires election) or may be running on a remote node.
36
40
37
41
### Session Side
38
42
@@ -43,12 +47,18 @@ The SSubHandler is passive, i.e., it does not contain any running processes. It
43
47
A session is responsible for:
44
48
* Initializing the SSubHandler data on session bootstrap.
45
49
* Delivering Shared Sub-related messages (from timers, from other entities) to the SSubHandler.
46
-
* Forwarding subscribe/unsubscribe `$shares/group/...` requests to the SSubHandler.
50
+
* Forwarding subscribe/unsubscribe `$shared/group/...` requests to the SSubHandler.
47
51
* Querying stream states (`stream_state()`) from SSubHandler for replay and reporting replay results to the SSubHandler.
48
52
49
53
### Shared Subscription Session Handler
50
54
51
-
SSubHandler data is a collection of Group Subscription FSMs (GSFSM) identified by the group ID.
55
+
SSubHandler *lies in the domain of the session*. It knows the session's state, stores shared suscriptions and the related data in the session's state and uses **Agent** abstraction to communicate with the SGL. Agent provides the interface lying *outside* the session's domain.
56
+
57
+
### Agent
58
+
59
+
Agent is the entity that communicates with different SGLs. It speaks in the terms of streams and iterators, not knowing about sessions, subscriptions, etc.
60
+
61
+
Agent's data is a collection of Group Subscription FSMs (GSFSM) identified by the group ID.
52
62
53
63
```erlang
54
64
#{
@@ -63,6 +73,7 @@ Each GSFSM contains the following states:
63
73
*`connecting` - the initial state, the GSFSM is looking for a Group Subscription Leader (SGL).
64
74
*`replaying` - the GSFSM is connected to the SGL and provides stream states for replay.
65
75
*`updating` - the GSFSM is connected to the SGL and is updating the set of streams.
76
+
*`disconnected` - the GSFSM is disconnected from the SGL.
66
77
67
78
### Protocol between Session and SGL
68
79
@@ -128,10 +139,60 @@ In the `updating` state, the GSFSM accepts replay requests from the session side
128
139
129
140
*`{update_stream_states, VersionNew, StreamsNew, VersionOld, StreamsOld, ...}` - to the SGL to update the stream states. Both for active streams and for taken-over streams. If taken-over streams are fully acked, the according flag is sent for them.
130
141
131
-
### State transitions
142
+
#### `disconnected` state
143
+
144
+
GSFSM can pass to the `disconnected` state from any other state. It happens when the session disconnects. When entering the `disconnected` state, the GSFSM sends a `disconnect` message to the SGL with the latest stream states.
Agent may have many GroupSMs (one for each subscription), each GroupSM is connected to its own Leader. So a Leader may have many Agents connected (through GroupSMs) to it.
153
+
154
+
The Leader tracks the state of each agent connected to it and has its own state machine for each agent. The Leader's view of the GroupSM's state can be one of the following:
155
+
156
+
* waiting_replaing
157
+
* replaying
158
+
* waiting_updating
159
+
* updating
160
+
161
+
The target state of GroupSM and its representation in Leader is `replaying`. That is, when the GroupSM and the Leader agree on the leased streams, the Leader sends lease confirmations to the GroupSM, and the GroupSM sends iteration updates.
162
+
163
+
Other states are used to gracefully reassign streams to the GroupSM.
164
+
165
+
## Communication sequence
166
+
167
+
Below is the sequence diagram of the interaction. The full cycle is shown, from replaying-replaying to replaying-replaying states.
168
+
169
+

170
+
171
+
## Stream reassignment (rebalance)
172
+
173
+
We want the streams of a Leader be evenly distributed among the agents. The periodical rebalancing algorithm is the following.
174
+
175
+
* We discover new streams from DS, and mark _unassigned_ if there are any.
176
+
* We do any changes only to _stable_ agents (those which are not in the `replaying` state).
177
+
* We check that replaying agents have the _desired_ number of streams. The desired number is calculated as the total number of streams divided by the number of agents (+1, if not divisible evenly):
178
+
```erlang
179
+
DesiredStreamCount=case
180
+
TotalStreamCountremAgentCountof
181
+
0 -> TotalStreamCountdivAgentCount;
182
+
_ -> (TotalStreamCountdivAgentCount) +1
183
+
end.
184
+
```
185
+
* If the agent has more streams than desired, we _select streams for revocation_ and mark them as _revoked_. The streams still belong to the agent, but the agent goes to the 'updating' cycle, finishes stream replay, and returns them to the Leader. As soon as a stream is returned, it becomes _unassigned_.
186
+
* If the agent has fewer streams than desired, we _select streams for assignment_ and mark them as assigned to the agent. The agent goes to the `updating` cycle in which it confirms the reception of the streams and starts replaying them.
187
+
188
+
So, there is no "direct transfer" of streams between agents. When e.g., a new agent connects, a typical scenario is:
189
+
190
+
* The agent connects to the Leader.
191
+
* It may be assigned 0 streams just now as there are no free ones.
192
+
* On the next rebalance, we see that some agents became overpopulated (since the number of agents increased). So, we select streams for revocation. There are no free streams yet. We also still see that the recently connected agent is underpopulated.
193
+
* Gradually, the streams are returned to the Leader and become unassigned. This happens outside the iterations of the rebalance.
194
+
* On one of the next rebalances, we see again that the recently connected agent is underpopulated, but now there are some free streams. We assign them to the agent.
0 commit comments