Skip to content

Commit

Permalink
[ISSUE #540] Add task lifecycle management; fix stopAll bug (#598)
Browse files Browse the repository at this point in the history
* 1.add start() in WorkerSinktask and decouple it from run() 2 add 5 states to tasks

* move commitTaskPosition to where it should be

* 1. put stop() in run(), put extra layer of isolation

* refactor state machine implementation

* add exception handling logic

* try to shutdown taskExecutor when worker exits

add bug logs

fix bugs

WIP

WIP

WIP

* fix checkstype

* fix illegal state

* add new restful handler

* fix WorkerSourceTask

WIP

* use timestamp to mark timeout

WIP

* use StateMachineService to polling the states

WIP

* working on output format

* set timeout to 10 seconds

printing states

printing states

add logs

* add timestamp inside pullMessageFromQueues

WIP

add more logs

WIP

add logs

add logs

try something

WIP

try

add more logs

WIP

* update max suspend time

* change the timeout value

* will cancel tasks in error/stopped Tasks

* use concurrentHashMap in Memory Based Keyvalue store

WIP

WIP

add logs

add logs

* add merge config

* allow interruption

* detect state change eralier

* fix stopAll by disable config sync overwrite deleted configs

* fix unused import

* fix(runtime) resolve all todos
  • Loading branch information
imaffe authored Jul 28, 2020
1 parent 0e891ff commit cd6ff77
Show file tree
Hide file tree
Showing 14 changed files with 697 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ private static ConnectController createConnectController(String[] args) {
// Create controller and initialize.
ConnectController controller = new ConnectController(connectConfig);
controller.initialize();

// Invoked when shutdown.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,22 @@ public class ConnectConfig {

private int rmqMaxRedeliveryTimes;

private int rmqMessageConsumeTimeout = 3000;
private int rmqMessageConsumeTimeout = 300;

private int rmqMaxConsumeThreadNums = 32;

private int rmqMinConsumeThreadNums = 1;

public int getBrokerSuspendMaxTimeMillis() {
return brokerSuspendMaxTimeMillis;
}

public void setBrokerSuspendMaxTimeMillis(int brokerSuspendMaxTimeMillis) {
this.brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis;
}

private int brokerSuspendMaxTimeMillis = 300;

/**
* Default topic to send/consume online or offline message.
*/
Expand Down
Loading

0 comments on commit cd6ff77

Please sign in to comment.