13
13
import com .zendesk .maxwell .schema .*;
14
14
import com .zendesk .maxwell .schema .columndef .ColumnDefCastException ;
15
15
import com .zendesk .maxwell .util .Logging ;
16
- import org .jgroups .JChannel ;
17
- import org .jgroups .protocols .raft .RaftLeaderException ;
18
- import org .jgroups .protocols .raft .Role ;
19
- import org .jgroups .protocols .raft .Settable ;
20
- import org .jgroups .protocols .raft .StateMachine ;
21
- import org .jgroups .raft .RaftHandle ;
22
16
import org .slf4j .Logger ;
23
17
import org .slf4j .LoggerFactory ;
24
18
25
- import java .io .DataInput ;
26
- import java .io .DataOutput ;
27
19
import java .net .URISyntaxException ;
28
20
import java .sql .Connection ;
29
21
import java .sql .SQLException ;
30
22
import java .util .ArrayList ;
31
23
import java .util .List ;
32
- import java .util .concurrent .CompletableFuture ;
33
- import java .util .concurrent .TimeUnit ;
34
- import java .util .concurrent .TimeoutException ;
35
- import java .util .concurrent .atomic .AtomicBoolean ;
36
24
37
25
public class Maxwell implements Runnable {
38
26
protected MaxwellConfig config ;
@@ -58,13 +46,22 @@ public void run() {
58
46
}
59
47
}
60
48
49
+ public void restart () {
50
+ try {
51
+ this .context = new MaxwellContext (config );
52
+ } catch ( Exception e ) {
53
+ throw new RuntimeException (e );
54
+ }
55
+
56
+ run ();
57
+ }
58
+
61
59
public void terminate () {
62
60
Thread terminationThread = this .context .terminate ();
63
61
if (terminationThread != null ) {
64
62
try {
65
63
terminationThread .join ();
66
64
} catch (InterruptedException e ) {
67
- // ignore
68
65
}
69
66
}
70
67
}
@@ -182,91 +179,6 @@ private void logBanner(AbstractProducer producer, Position initialPosition) {
182
179
protected void onReplicatorStart () {}
183
180
protected void onReplicatorEnd () {}
184
181
185
- private AtomicBoolean isLeader = new AtomicBoolean (false );
186
-
187
- private void startHA () throws Exception {
188
- JChannel ch =new JChannel (this .config .jgroupsConf );
189
- StateMachine s = new StateMachine () {
190
- @ Override
191
- public byte [] apply (byte [] bytes , int i , int i1 ) throws Exception {
192
- return new byte [0 ];
193
- }
194
-
195
- @ Override
196
- public void readContentFrom (DataInput dataInput ) throws Exception {
197
-
198
- }
199
-
200
- @ Override
201
- public void writeContentTo (DataOutput dataOutput ) throws Exception {
202
-
203
- }
204
- };
205
-
206
- RaftHandle handle =new RaftHandle (ch , s );
207
- handle .raftId (this .config .raftMemberID );
208
-
209
- handle .addRoleListener (role -> {
210
- if (role == Role .Leader ) {
211
- LOGGER .info ("won HA election, starting maxwell" );
212
- try {
213
- isLeader .set (true );
214
- this .start ();
215
- } catch ( Exception e ) {
216
- } finally {
217
- isLeader .set (false );
218
- }
219
-
220
- } else
221
- LOGGER .info ("lost HA election, current leader: " + handle .leader ());
222
- // stop singleton services
223
- });
224
-
225
- ch .connect (this .config .clientID );
226
- LOGGER .info ("enter HA group, current leader: " + handle .leader ());
227
-
228
- new Thread (() -> {
229
- int exceptionCount = 0 ;
230
- while ( true ) {
231
- byte [] b = new byte [] { (byte ) 0x1 };
232
- try {
233
- handle .set (b , 0 , 1 , 5000 , TimeUnit .MILLISECONDS );
234
- LOGGER .debug ("RAFT-heartbeat successful" );
235
- exceptionCount = 0 ;
236
- if ( handle .isLeader () && !this .isLeader .get () ) {
237
- LOGGER .info ("RAFT-consensus available, restarting maxwell..." );
238
- try {
239
- isLeader .set (true );
240
- this .start ();
241
- } catch ( Exception e ) {
242
- } finally {
243
- isLeader .set (false );
244
- }
245
- }
246
- } catch ( RaftLeaderException e ) {
247
- LOGGER .warn ("RAFT leader unavailable: " + e .getMessage ());
248
- exceptionCount ++;
249
- } catch ( TimeoutException e ) {
250
- exceptionCount ++;
251
- LOGGER .warn ("RAFT-heartbeat timed out. Exception Count:" + exceptionCount );
252
- } catch ( Exception e ) {
253
- LOGGER .error ("unexpected exception in RAFT-heartbeat" , e );
254
- }
255
-
256
- if ( exceptionCount > 1 && isLeader .get () ) {
257
- LOGGER .warn ("RAFT consensus unavailable after " + exceptionCount + " tries, stopping maxwell" );
258
- this .context .shutdown (new AtomicBoolean (false ));
259
- }
260
-
261
- try {
262
- Thread .sleep (1000 );
263
- } catch (InterruptedException e ) { }
264
- }
265
- }).start ();
266
-
267
-
268
- Thread .sleep (Long .MAX_VALUE );
269
- }
270
182
271
183
private void start () throws Exception {
272
184
try {
@@ -368,7 +280,7 @@ public void run() {
368
280
});
369
281
370
282
if ( config .haMode ) {
371
- maxwell .startHA ();
283
+ new MaxwellHA ( maxwell , config . jgroupsConf , config . raftMemberID , config . clientID ) .startHA ();
372
284
} else {
373
285
maxwell .start ();
374
286
}
0 commit comments