diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java index f7ca4872d..db15cce71 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/ConnectStartup.java @@ -101,7 +101,6 @@ private static ConnectController createConnectController(String[] args) { // Create controller and initialize. ConnectController controller = new ConnectController(connectConfig); controller.initialize(); - // Invoked when shutdown. Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { private volatile boolean hasShutdown = false; diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java index 2162526d4..ffeba0b1c 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/config/ConnectConfig.java @@ -41,12 +41,22 @@ public class ConnectConfig { private int rmqMaxRedeliveryTimes; - private int rmqMessageConsumeTimeout = 3000; + private int rmqMessageConsumeTimeout = 300; private int rmqMaxConsumeThreadNums = 32; private int rmqMinConsumeThreadNums = 1; + public int getBrokerSuspendMaxTimeMillis() { + return brokerSuspendMaxTimeMillis; + } + + public void setBrokerSuspendMaxTimeMillis(int brokerSuspendMaxTimeMillis) { + this.brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis; + } + + private int brokerSuspendMaxTimeMillis = 300; + /** * Default topic to send/consume online or offline message. */ diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java index fcc65cb81..c57ad138e 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java @@ -23,15 +23,20 @@ import io.openmessaging.connector.api.data.Converter; import io.openmessaging.connector.api.sink.SinkTask; import io.openmessaging.connector.api.source.SourceTask; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.exception.MQClientException; @@ -48,6 +53,7 @@ import org.apache.rocketmq.connect.runtime.utils.ConnectUtil; import org.apache.rocketmq.connect.runtime.utils.Plugin; import org.apache.rocketmq.connect.runtime.utils.PluginClassLoader; +import org.apache.rocketmq.connect.runtime.utils.ServiceThread; import org.apache.rocketmq.remoting.protocol.LanguageCode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +72,28 @@ public class Worker { /** * Current running tasks. */ - private Set workingTasks = new ConcurrentSet<>(); + private Map pendingTasks = new ConcurrentHashMap<>(); + + private Set runningTasks = new ConcurrentSet<>(); + + private Set errorTasks = new ConcurrentSet<>(); + + private Set cleanedErrorTasks = new ConcurrentSet<>(); + + private Map stoppingTasks = new ConcurrentHashMap<>(); + + private Set stoppedTasks = new ConcurrentSet<>(); + + private Set cleanedStoppedTasks = new ConcurrentSet<>(); + + + Map> latestTaskConfigs = new HashMap<>(); + /** + * Current running tasks to its Future map. + */ + private Map taskToFutureMap = new ConcurrentHashMap<>(); + + /** * Thread pool for connectors and tasks. @@ -94,9 +121,14 @@ public class Worker { private final DefaultMQProducer producer; + private static final int MAX_START_TIMEOUT_MILLS = 5000; + + private static final long MAX_STOP_TIMEOUT_MILLS = 20000; // for MQProducer private volatile boolean producerStarted = false; + private StateMachineService stateMachineService = new StateMachineService(); + public Worker(ConnectConfig connectConfig, PositionManagementService positionManagementService, PositionManagementService offsetManagementService, Plugin plugin) { @@ -104,7 +136,10 @@ public Worker(ConnectConfig connectConfig, this.taskExecutor = Executors.newCachedThreadPool(); this.positionManagementService = positionManagementService; this.offsetManagementService = offsetManagementService; - this.taskPositionCommitService = new TaskPositionCommitService(this); + this.taskPositionCommitService = new TaskPositionCommitService( + this, + positionManagementService, + offsetManagementService); this.plugin = plugin; this.producer = new DefaultMQProducer(); @@ -118,6 +153,7 @@ public Worker(ConnectConfig connectConfig, public void start() { taskPositionCommitService.start(); + stateMachineService.start(); } /** @@ -194,10 +230,15 @@ public synchronized void startConnectors(Map connectorC * @param taskConfigs * @throws Exception */ - public synchronized void startTasks(Map> taskConfigs) throws Exception { + public void startTasks(Map> taskConfigs) { + synchronized (latestTaskConfigs) { + this.latestTaskConfigs = taskConfigs; + } + } + - Set stoppedTasks = new HashSet<>(); - for (Runnable runnable : workingTasks) { + private boolean isConfigInSet(ConnectKeyValue keyValue, Set set) { + for (Runnable runnable : set) { WorkerSourceTask workerSourceTask = null; WorkerSinkTask workerSinkTask = null; if (runnable instanceof WorkerSourceTask) { @@ -205,54 +246,116 @@ public synchronized void startTasks(Map> taskConfi } else { workerSinkTask = (WorkerSinkTask) runnable; } - - String connectorName = null != workerSourceTask ? workerSourceTask.getConnectorName() : workerSinkTask.getConnectorName(); ConnectKeyValue taskConfig = null != workerSourceTask ? workerSourceTask.getTaskConfig() : workerSinkTask.getTaskConfig(); - List keyValues = taskConfigs.get(connectorName); - boolean needStop = true; - if (null != keyValues && keyValues.size() > 0) { - for (ConnectKeyValue keyValue : keyValues) { - if (keyValue.equals(taskConfig)) { - needStop = false; - break; - } - } + if (keyValue.equals(taskConfig)) { + return true; } - if (needStop) { - if (null != workerSourceTask) { - workerSourceTask.stop(); - log.info("Source task stop, connector name {}, config {}", workerSourceTask.getConnectorName(), workerSourceTask.getTaskConfig()); - stoppedTasks.add(workerSourceTask); - } else { - workerSinkTask.stop(); - log.info("Sink task stop, connector name {}, config {}", workerSinkTask.getConnectorName(), workerSinkTask.getTaskConfig()); - stoppedTasks.add(workerSinkTask); - } + } + return false; + } + + + + /** + * Commit the position of all working tasks to PositionManagementService. + */ + + + + + + private void checkRmqProducerState() { + if (!this.producerStarted) { + try { + this.producer.start(); + this.producerStarted = true; + } catch (MQClientException e) { + log.error("Start producer failed!", e); } } - workingTasks.removeAll(stoppedTasks); + } - if (null == taskConfigs || 0 == taskConfigs.size()) { - return; + /** + * We can choose to persist in-memory task status + * so we can view history tasks + */ + public void stop() { + taskExecutor.shutdownNow(); + stateMachineService.shutdown(); + // shutdown producers + if (this.producerStarted && this.producer != null) { + this.producer.shutdown(); + this.producerStarted = false; } + } + + public Set getWorkingConnectors() { + return workingConnectors; + } + + public void setWorkingConnectors( + Set workingConnectors) { + this.workingConnectors = workingConnectors; + } + + + /** + * Beaware that we are not creating a defensive copy of these tasks + * So developers should only use these references for read-only purposes. + * These variables should be immutable + * @return + */ + public Set getWorkingTasks() { + return runningTasks; + } + + public Set getErrorTasks() { + return errorTasks; + } + + public Set getPendingTasks() { + return pendingTasks.keySet(); + } + + public Set getStoppedTasks() { + return stoppedTasks; + } + + public Set getStoppingTasks() { + return stoppingTasks.keySet(); + } + + public Set getCleanedErrorTasks() { + return cleanedErrorTasks; + } + + public Set getCleanedStoppedTasks() { + return cleanedStoppedTasks; + } + + public void setWorkingTasks(Set workingTasks) { + this.runningTasks = workingTasks; + } + + + public void maintainConnectorState() { + + } + + public void maintainTaskState() throws Exception { + + Map> taskConfigs = new HashMap<>(); + synchronized (latestTaskConfigs) { + taskConfigs.putAll(latestTaskConfigs); + } + // get new Tasks Map> newTasks = new HashMap<>(); for (String connectorName : taskConfigs.keySet()) { for (ConnectKeyValue keyValue : taskConfigs.get(connectorName)) { boolean isNewTask = true; - for (Runnable runnable : workingTasks) { - WorkerSourceTask workerSourceTask = null; - WorkerSinkTask workerSinkTask = null; - if (runnable instanceof WorkerSourceTask) { - workerSourceTask = (WorkerSourceTask) runnable; - } else { - workerSinkTask = (WorkerSinkTask) runnable; - } - ConnectKeyValue taskConfig = null != workerSourceTask ? workerSourceTask.getTaskConfig() : workerSinkTask.getTaskConfig(); - if (keyValue.equals(taskConfig)) { - isNewTask = false; - break; - } + if (isConfigInSet(keyValue, runningTasks) || isConfigInSet(keyValue, pendingTasks.keySet()) || isConfigInSet(keyValue, errorTasks)) { + isNewTask = false; } if (isNewTask) { if (!newTasks.containsKey(connectorName)) { @@ -264,6 +367,7 @@ public synchronized void startTasks(Map> taskConfi } } + // STEP 1: try to create new tasks for (String connectorName : newTasks.keySet()) { for (ConnectKeyValue keyValue : newTasks.get(connectorName)) { String taskClass = keyValue.getString(RuntimeConfigDefine.TASK_CLASS); @@ -290,81 +394,217 @@ public synchronized void startTasks(Map> taskConfi if (task instanceof SourceTask) { checkRmqProducerState(); WorkerSourceTask workerSourceTask = new WorkerSourceTask(connectorName, - (SourceTask) task, keyValue, - new PositionStorageReaderImpl(positionManagementService), recordConverter, producer); + (SourceTask) task, keyValue, + new PositionStorageReaderImpl(positionManagementService), recordConverter, producer); Plugin.compareAndSwapLoaders(currentThreadLoader); - this.taskExecutor.submit(workerSourceTask); - this.workingTasks.add(workerSourceTask); + + Future future = taskExecutor.submit(workerSourceTask); + taskToFutureMap.put(workerSourceTask, future); + this.pendingTasks.put(workerSourceTask, System.currentTimeMillis()); } else if (task instanceof SinkTask) { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(); consumer.setNamesrvAddr(connectConfig.getNamesrvAddr()); consumer.setInstanceName(ConnectUtil.createInstance(connectConfig.getNamesrvAddr())); consumer.setConsumerGroup(ConnectUtil.createGroupName(connectConfig.getRmqConsumerGroup())); consumer.setMaxReconsumeTimes(connectConfig.getRmqMaxRedeliveryTimes()); + consumer.setBrokerSuspendMaxTimeMillis(connectConfig.getBrokerSuspendMaxTimeMillis()); consumer.setConsumerPullTimeoutMillis((long) connectConfig.getRmqMessageConsumeTimeout()); consumer.start(); WorkerSinkTask workerSinkTask = new WorkerSinkTask(connectorName, - (SinkTask) task, keyValue, - new PositionStorageReaderImpl(offsetManagementService), - recordConverter, consumer); + (SinkTask) task, keyValue, + new PositionStorageReaderImpl(offsetManagementService), + recordConverter, consumer); Plugin.compareAndSwapLoaders(currentThreadLoader); - this.taskExecutor.submit(workerSinkTask); - this.workingTasks.add(workerSinkTask); + Future future = taskExecutor.submit(workerSinkTask); + taskToFutureMap.put(workerSinkTask, future); + this.pendingTasks.put(workerSinkTask, System.currentTimeMillis()); } } } - } - /** - * Commit the position of all working tasks to PositionManagementService. - */ - public void commitTaskPosition() { - Map positionData = new HashMap<>(); - Map offsetData = new HashMap<>(); - for (Runnable task : workingTasks) { - if (task instanceof WorkerSourceTask) { - positionData.putAll(((WorkerSourceTask) task).getPositionData()); - positionManagementService.putPosition(positionData); - } else if (task instanceof WorkerSinkTask) { - offsetData.putAll(((WorkerSinkTask) task).getOffsetData()); - offsetManagementService.putPosition(offsetData); + + // STEP 2: check all pending state + for (Map.Entry entry : pendingTasks.entrySet()) { + Runnable runnable = entry.getKey(); + Long startTimestamp = entry.getValue(); + Long currentTimeMillis = System.currentTimeMillis(); + WorkerTaskState state = ((WorkerTask) runnable).getState(); + + if (WorkerTaskState.ERROR == state) { + errorTasks.add(runnable); + pendingTasks.remove(runnable); + } else if (WorkerTaskState.RUNNING == state) { + runningTasks.add(runnable); + pendingTasks.remove(runnable); + } else if (WorkerTaskState.NEW == state) { + log.info("[RACE CONDITION] we checked the pending tasks before state turns to PENDING"); + } else if (WorkerTaskState.PENDING == state) { + if (currentTimeMillis - startTimestamp > MAX_START_TIMEOUT_MILLS) { + ((WorkerTask) runnable).timeout(); + pendingTasks.remove(runnable); + errorTasks.add(runnable); + } + } else { + log.error("[BUG] Illegal State in when checking pending tasks, {} is in {} state", + ((WorkerTask) runnable).getConnectorName(), state.toString()); } } - } - private void checkRmqProducerState() { - if (!this.producerStarted) { + // STEP 3: check running tasks and put to error status + for (Runnable runnable : runningTasks) { + WorkerTask workerTask = (WorkerTask) runnable; + String connectorName = workerTask.getConnectorName(); + ConnectKeyValue taskConfig = workerTask.getTaskConfig(); + List keyValues = taskConfigs.get(connectorName); + WorkerTaskState state = ((WorkerTask) runnable).getState(); + + + if (WorkerTaskState.ERROR == state) { + errorTasks.add(runnable); + runningTasks.remove(runnable); + } else if (WorkerTaskState.RUNNING == state) { + boolean needStop = true; + if (null != keyValues && keyValues.size() > 0) { + for (ConnectKeyValue keyValue : keyValues) { + if (keyValue.equals(taskConfig)) { + needStop = false; + break; + } + } + } + + + if (needStop) { + workerTask.stop(); + + log.info("Task stopping, connector name {}, config {}", workerTask.getConnectorName(), workerTask.getTaskConfig()); + runningTasks.remove(runnable); + stoppingTasks.put(runnable, System.currentTimeMillis()); + } + } else { + log.error("[BUG] Illegal State in when checking running tasks, {} is in {} state", + ((WorkerTask) runnable).getConnectorName(), state.toString()); + } + + + } + + // STEP 4 check stopping tasks + for (Map.Entry entry : stoppingTasks.entrySet()) { + Runnable runnable = entry.getKey(); + Long stopTimestamp = entry.getValue(); + Long currentTimeMillis = System.currentTimeMillis(); + Future future = taskToFutureMap.get(runnable); + WorkerTaskState state = ((WorkerTask) runnable).getState(); + // exited normally + + if (WorkerTaskState.STOPPED == state) { + // concurrent modification Exception ? Will it pop that in the + + if (null == future || !future.isDone()) { + log.error("[BUG] future is null or Stopped task should have its Future.isDone() true, but false"); + } + stoppingTasks.remove(runnable); + stoppedTasks.add(runnable); + } else if (WorkerTaskState.ERROR == state) { + stoppingTasks.remove(runnable); + errorTasks.add(runnable); + } else if (WorkerTaskState.STOPPING == state) { + if (currentTimeMillis - stopTimestamp > MAX_STOP_TIMEOUT_MILLS) { + ((WorkerTask) runnable).timeout(); + stoppingTasks.remove(runnable); + errorTasks.add(runnable); + } + } else { + + log.error("[BUG] Illegal State in when checking stopping tasks, {} is in {} state", + ((WorkerTask) runnable).getConnectorName(), state.toString()); + } + } + + // STEP 5 check errorTasks and stopped tasks + for (Runnable runnable: errorTasks) { + WorkerTask workerTask = (WorkerTask) runnable; + Future future = taskToFutureMap.get(runnable); + try { - this.producer.start(); - this.producerStarted = true; - } catch (MQClientException e) { - log.error("Start producer failed!", e); + if (null != future) { + future.get(1000, TimeUnit.MILLISECONDS); + } else { + log.error("[BUG] errorTasks reference not found in taskFutureMap"); + } + } catch (ExecutionException e) { + Throwable t = e.getCause(); + } catch (CancellationException | TimeoutException | InterruptedException e) { + + } finally { + future.cancel(true); + workerTask.cleanup(); + taskToFutureMap.remove(runnable); + errorTasks.remove(runnable); + cleanedErrorTasks.add(runnable); + } } - } - public void stop() { - if (this.producerStarted && this.producer != null) { - this.producer.shutdown(); - this.producerStarted = false; + + // STEP 5 check errorTasks and stopped tasks + for (Runnable runnable: stoppedTasks) { + WorkerTask workerTask = (WorkerTask) runnable; + workerTask.cleanup(); + Future future = taskToFutureMap.get(runnable); + try { + if (null != future) { + future.get(1000, TimeUnit.MILLISECONDS); + } else { + log.error("[BUG] stopped Tasks reference not found in taskFutureMap"); + } + } catch (ExecutionException e) { + Throwable t = e.getCause(); + log.info("[BUG] Stopped Tasks should not throw any exception"); + t.printStackTrace(); + } catch (CancellationException e) { + log.info("[BUG] Stopped Tasks throws PrintStackTrace"); + e.printStackTrace(); + } catch (TimeoutException e) { + log.info("[BUG] Stopped Tasks should not throw any exception"); + e.printStackTrace(); + } catch (InterruptedException e) { + log.info("[BUG] Stopped Tasks should not throw any exception"); + e.printStackTrace(); + } + finally { + future.cancel(true); + taskToFutureMap.remove(runnable); + stoppedTasks.remove(runnable); + cleanedStoppedTasks.add(runnable); + } } } - public Set getWorkingConnectors() { - return workingConnectors; - } - public void setWorkingConnectors( - Set workingConnectors) { - this.workingConnectors = workingConnectors; - } + public class StateMachineService extends ServiceThread { + @Override + public void run() { + log.info(this.getServiceName() + " service started"); - public Set getWorkingTasks() { - return workingTasks; - } + while (!this.isStopped()) { + this.waitForRunning(1000); + try { + Worker.this.maintainConnectorState(); + Worker.this.maintainTaskState(); + } catch (Exception e) { + log.error("RebalanceImpl#StateMachineService start connector or task failed", e); + } + } - public void setWorkingTasks(Set workingTasks) { - this.workingTasks = workingTasks; + log.info(this.getServiceName() + " service end"); + } + + @Override + public String getServiceName() { + return StateMachineService.class.getSimpleName(); + } } } diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java index c08b49fbc..b8b3f70d2 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSinkTask.java @@ -39,7 +39,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer; import org.apache.rocketmq.client.consumer.PullResult; @@ -60,7 +60,7 @@ /** * A wrapper of {@link SinkTask} for runtime. */ -public class WorkerSinkTask implements Runnable { +public class WorkerSinkTask implements WorkerTask { private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME); @@ -84,10 +84,17 @@ public class WorkerSinkTask implements Runnable { */ private ConnectKeyValue taskConfig; + /** - * A switch for the sink task. + * Atomic state variable */ - private AtomicBoolean isStopping; + private AtomicReference state; + + /** + * Stop retry limit + */ + + /** * A RocketMQ consumer to pull message from MQ. @@ -132,12 +139,12 @@ public WorkerSinkTask(String connectorName, this.connectorName = connectorName; this.sinkTask = sinkTask; this.taskConfig = taskConfig; - this.isStopping = new AtomicBoolean(false); this.consumer = consumer; this.offsetStorageReader = offsetStorageReader; this.recordConverter = recordConverter; this.messageQueuesOffsetMap = new ConcurrentHashMap<>(256); this.messageQueuesStateMap = new ConcurrentHashMap<>(256); + this.state = new AtomicReference<>(WorkerTaskState.NEW); } /** @@ -146,6 +153,7 @@ public WorkerSinkTask(String connectorName, @Override public void run() { try { + state.compareAndSet(WorkerTaskState.NEW, WorkerTaskState.PENDING); sinkTask.initialize(new SinkTaskContext() { @Override public void resetOffset(QueueMetaData queueMetaData, Long offset) { @@ -232,8 +240,10 @@ public KeyValue configs() { return taskConfig; } }); + String topicNamesStr = taskConfig.getString(QUEUENAMES_CONFIG); + if (!StringUtils.isEmpty(topicNamesStr)) { String[] topicNames = topicNamesStr.split(COMMA); for (String topicName : topicNames) { @@ -247,6 +257,8 @@ public KeyValue configs() { log.debug("{} Initializing and starting task for topicNames {}", this, topicNames); } else { log.error("Lack of sink comsume topicNames config"); + state.set(WorkerTaskState.ERROR); + return; } for (Map.Entry entry : messageQueuesOffsetMap.entrySet()) { @@ -256,20 +268,39 @@ public KeyValue configs() { messageQueuesOffsetMap.put(messageQueue, convertToOffset(byteBuffer)); } } + + sinkTask.start(taskConfig); + // we assume executed here means we are safe log.info("Sink task start, config:{}", JSON.toJSONString(taskConfig)); - while (!isStopping.get()) { + state.compareAndSet(WorkerTaskState.PENDING, WorkerTaskState.RUNNING); + + while (WorkerTaskState.RUNNING == state.get()) { + // this method can block up to 3 minutes long pullMessageFromQueues(); } + + sinkTask.stop(); + state.compareAndSet(WorkerTaskState.STOPPING, WorkerTaskState.STOPPED); log.info("Sink task stop, config:{}", JSON.toJSONString(taskConfig)); + } catch (Exception e) { log.error("Run task failed.", e); + state.set(WorkerTaskState.ERROR); } } private void pullMessageFromQueues() throws MQClientException, RemotingException, MQBrokerException, InterruptedException { + long startTimeStamp = System.currentTimeMillis(); + log.info("START pullMessageFromQueues, time started : {}", startTimeStamp); for (Map.Entry entry : messageQueuesOffsetMap.entrySet()) { + log.info("START pullBlockIfNotFound, time started : {}", System.currentTimeMillis()); + + if (WorkerTaskState.RUNNING != state.get()) break; final PullResult pullResult = consumer.pullBlockIfNotFound(entry.getKey(), "*", entry.getValue(), MAX_MESSAGE_NUM); + long currentTime = System.currentTimeMillis(); + + log.info("INSIDE pullMessageFromQueues, time elapsed : {}", currentTime - startTimeStamp); if (pullResult.getPullStatus().equals(PullStatus.FOUND)) { final List messages = pullResult.getMsgFoundList(); removePauseQueueMessage(entry.getKey(), messages); @@ -314,10 +345,20 @@ private void removePauseQueueMessage(MessageQueue messageQueue, List } } + + @Override public void stop() { - isStopping.set(true); - consumer.shutdown(); - sinkTask.stop(); + state.compareAndSet(WorkerTaskState.RUNNING, WorkerTaskState.STOPPING); + } + + @Override + public void cleanup() { + if (state.compareAndSet(WorkerTaskState.STOPPED, WorkerTaskState.TERMINATED) || + state.compareAndSet(WorkerTaskState.ERROR, WorkerTaskState.TERMINATED)) + consumer.shutdown(); + else { + log.error("[BUG] cleaning a task but it's not in STOPPED or ERROR state"); + } } /** @@ -384,23 +425,49 @@ private SinkDataEntry convertToSinkDataEntry(MessageExt message) { return sinkDataEntry; } + + @Override public String getConnectorName() { return connectorName; } + @Override + public WorkerTaskState getState() { + return state.get(); + } + + @Override public ConnectKeyValue getTaskConfig() { return taskConfig; } + + /** + * Further we cant try to log what caused the error + */ + @Override + public void timeout() { + this.state.set(WorkerTaskState.ERROR); + } @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("connectorName:" + connectorName) - .append("\nConfigs:" + JSON.toJSONString(taskConfig)); + .append("\nConfigs:" + JSON.toJSONString(taskConfig)) + .append("\nState:" + state.get().toString()); return sb.toString(); } + @Override + public Object getJsonObject() { + HashMap obj = new HashMap(); + obj.put("connectorName", connectorName); + obj.put("configs", JSON.toJSONString(taskConfig)); + obj.put("state", state.get().toString()); + return obj; + } + private enum QueueState { PAUSE } diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java index b1e102f18..a7a535594 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerSourceTask.java @@ -33,7 +33,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.exception.MQClientException; @@ -52,7 +52,7 @@ /** * A wrapper of {@link SourceTask} for runtime. */ -public class WorkerSourceTask implements Runnable { +public class WorkerSourceTask implements WorkerTask { private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME); @@ -72,9 +72,9 @@ public class WorkerSourceTask implements Runnable { private ConnectKeyValue taskConfig; /** - * A switch for the source task. + * Atomic state variable */ - private AtomicBoolean isStopping; + private AtomicReference state; /** * Used to read the position of source data source. @@ -106,9 +106,9 @@ public WorkerSourceTask(String connectorName, this.sourceTask = sourceTask; this.taskConfig = taskConfig; this.positionStorageReader = positionStorageReader; - this.isStopping = new AtomicBoolean(false); this.producer = producer; this.recordConverter = recordConverter; + this.state = new AtomicReference<>(WorkerTaskState.NEW); } /** @@ -117,6 +117,7 @@ public WorkerSourceTask(String connectorName, @Override public void run() { try { + state.compareAndSet(WorkerTaskState.NEW, WorkerTaskState.PENDING); sourceTask.initialize(new SourceTaskContext() { @Override public PositionStorageReader positionStorageReader() { @@ -129,32 +130,46 @@ public KeyValue configs() { } }); sourceTask.start(taskConfig); - } catch (Exception e) { - log.error("Run task failed.", e); - this.stop(); - } - - log.info("Source task start, config:{}", JSON.toJSONString(taskConfig)); - while (!isStopping.get()) { - try { - Collection toSendEntries = sourceTask.poll(); - if (null != toSendEntries && toSendEntries.size() > 0) { - sendRecord(toSendEntries); + state.compareAndSet(WorkerTaskState.PENDING, WorkerTaskState.RUNNING); + log.info("Source task start, config:{}", JSON.toJSONString(taskConfig)); + while (WorkerTaskState.RUNNING == state.get()) { + try { + Collection toSendEntries = sourceTask.poll(); + if (null != toSendEntries && toSendEntries.size() > 0) { + sendRecord(toSendEntries); + } + } catch (Exception e) { + log.warn("Source task runtime exception", e); + state.set(WorkerTaskState.ERROR); } - } catch (Exception e) { - log.warn("Source task runtime exception", e); } + sourceTask.stop(); + state.compareAndSet(WorkerTaskState.STOPPING, WorkerTaskState.STOPPED); + log.info("Source task stop, config:{}", JSON.toJSONString(taskConfig)); + } catch (Exception e) { + log.error("Run task failed.", e); + state.set(WorkerTaskState.ERROR); } - log.info("Source task stop, config:{}", JSON.toJSONString(taskConfig)); } public Map getPositionData() { return positionData; } + + + @Override public void stop() { - isStopping.set(true); - sourceTask.stop(); + state.compareAndSet(WorkerTaskState.RUNNING, WorkerTaskState.STOPPING); + } + + @Override + public void cleanup() { + if (state.compareAndSet(WorkerTaskState.STOPPED, WorkerTaskState.TERMINATED) || + state.compareAndSet(WorkerTaskState.ERROR, WorkerTaskState.TERMINATED)) { + } else { + log.error("[BUG] cleaning a task but it's not in STOPPED or ERROR state"); + } } /** @@ -250,20 +265,42 @@ private void sendRecord(Collection sourceDataEntries) { } } + @Override + public WorkerTaskState getState() { + return this.state.get(); + } + + @Override public String getConnectorName() { return connectorName; } + @Override public ConnectKeyValue getTaskConfig() { return taskConfig; } + @Override + public void timeout() { + this.state.set(WorkerTaskState.ERROR); + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("connectorName:" + connectorName) - .append("\nConfigs:" + JSON.toJSONString(taskConfig)); + .append("\nConfigs:" + JSON.toJSONString(taskConfig)) + .append("\nState:" + state.get().toString()); return sb.toString(); } + + @Override + public Object getJsonObject() { + HashMap obj = new HashMap(); + obj.put("connectorName", connectorName); + obj.put("configs", JSON.toJSONString(taskConfig)); + obj.put("state", state.get().toString()); + return obj; + } } diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java new file mode 100644 index 000000000..52bd26d41 --- /dev/null +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTask.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.connect.runtime.connectorwrapper; +import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue; + +/** + * Should we use callable here ? + */ +public interface WorkerTask extends Runnable { + + public WorkerTaskState getState(); + + public void stop(); + + public void cleanup(); + + public String getConnectorName(); + + public ConnectKeyValue getTaskConfig(); + + public Object getJsonObject(); + + public void timeout(); +} diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTaskState.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTaskState.java new file mode 100644 index 000000000..315805841 --- /dev/null +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/WorkerTaskState.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.rocketmq.connect.runtime.connectorwrapper; + +public enum WorkerTaskState { + NEW, + PENDING, + RUNNING, + ERROR, + OUT_LOOP, + STOPPING, + STOPPED, + TERMINATED, +}; diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java index 67f1cf47d..3b25738cb 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/rest/RestHandler.java @@ -20,6 +20,8 @@ import com.alibaba.fastjson.JSON; import io.javalin.Context; import io.javalin.Javalin; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -27,6 +29,7 @@ import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue; import org.apache.rocketmq.connect.runtime.common.LoggerName; import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerConnector; +import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerTask; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,21 +42,38 @@ public class RestHandler { private final ConnectController connectController; + private static final String CONNECTOR_CONFIGS = "connectorConfigs"; + + private static final String TASK_CONFIGS = "taskConfigs"; + public RestHandler(ConnectController connectController) { this.connectController = connectController; Javalin app = Javalin.start(connectController.getConnectConfig().getHttpPort()); app.get("/connectors/stopAll", this::handleStopAllConnector); + app.get("/connectors/pauseAll", this::handlePauseAllConnector); + app.get("/connectors/resumeAll", this::handleResumeAllConnector); + app.get("/connectors/enableAll", this::handleEnableAllConnector); + app.get("/connectors/disableAll", this::handleDisableAllConnector); app.get("/connectors/:connectorName", this::handleCreateConnector); app.get("/connectors/:connectorName/config", this::handleQueryConnectorConfig); app.get("/connectors/:connectorName/status", this::handleQueryConnectorStatus); app.get("/connectors/:connectorName/stop", this::handleStopConnector); + app.get("/connectors/:connectorName/pause", this::handlePauseConnector); + app.get("/connectors/:connectorName/resume", this::handleResumeConnector); + app.get("/connectors/:connectorName/enable", this::handleEnableConnector); + app.get("/connectors/:connectorName/disable", this::handleDisableConnector); app.get("/getClusterInfo", this::getClusterInfo); app.get("/getConfigInfo", this::getConfigInfo); - app.get("/getAllocatedInfo", this::getAllocatedInfo); + app.get("/getAllocatedConnectors", this::getAllocatedConnectors); + app.get("/getAllocatedTasks", this::getAllocatedTasks); app.get("/plugin/reload", this::reloadPlugins); } - private void getAllocatedInfo(Context context) { + /** + * We need to refactor this method to use json output format + * @param context + */ + private void getAllocatedConnectors(Context context) { Set workerConnectors = connectController.getWorker().getWorkingConnectors(); Set workerTasks = connectController.getWorker().getWorkingTasks(); @@ -69,11 +89,39 @@ private void getAllocatedInfo(Context context) { context.result(sb.toString()); } + + + private void getAllocatedTasks(Context context) { + StringBuilder sb = new StringBuilder(); + + Set allErrorTasks = new HashSet<>(); + allErrorTasks.addAll(connectController.getWorker().getErrorTasks()); + allErrorTasks.addAll(connectController.getWorker().getCleanedErrorTasks()); + + Set allStoppedTasks = new HashSet<>(); + allStoppedTasks.addAll(connectController.getWorker().getStoppedTasks()); + allStoppedTasks.addAll(connectController.getWorker().getCleanedStoppedTasks()); + + Map formatter = new HashMap<>(); + formatter.put("pendingTasks", convertWorkerTaskToString(connectController.getWorker().getPendingTasks())); + formatter.put("runningTasks", convertWorkerTaskToString(connectController.getWorker().getWorkingTasks())); + formatter.put("stoppingTasks", convertWorkerTaskToString(connectController.getWorker().getStoppingTasks())); + formatter.put("stoppedTasks", convertWorkerTaskToString(allStoppedTasks)); + formatter.put("errorTasks", convertWorkerTaskToString(allErrorTasks)); + + context.result(JSON.toJSONString(formatter)); + } + private void getConfigInfo(Context context) { Map connectorConfigs = connectController.getConfigManagementService().getConnectorConfigs(); Map> taskConfigs = connectController.getConfigManagementService().getTaskConfigs(); - context.result("ConnectorConfigs:" + JSON.toJSONString(connectorConfigs) + "\nTaskConfigs:" + JSON.toJSONString(taskConfigs)); + + Map formatter = new HashMap<>(); + formatter.put(CONNECTOR_CONFIGS, connectorConfigs); + formatter.put(TASK_CONFIGS, taskConfigs); + + context.result(JSON.toJSONString(formatter)); } private void getClusterInfo(Context context) { @@ -157,6 +205,46 @@ private void handleStopAllConnector(Context context) { } } + private void handlePauseAllConnector(Context context) { + + } + + private void handleResumeAllConnector(Context context) { + + } + + private void handleEnableAllConnector(Context context) { + + } + + private void handleDisableAllConnector(Context context) { + + } + + private void handlePauseConnector(Context context) { + + } + + private void handleResumeConnector(Context context) { + + } + + private void handleEnableConnector(Context context) { + + } + + private void handleDisableConnector(Context context) { + + } + + private Set convertWorkerTaskToString(Set tasks) { + Set result = new HashSet<>(); + for (Runnable task : tasks) { + result.add(((WorkerTask) task).getJsonObject()); + } + return result; + } + private void reloadPlugins(Context context) { connectController.getConfigManagementService().getPlugin().initPlugin(); context.result("success"); diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java index af8c95ff5..32ac1920e 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementService.java @@ -40,12 +40,19 @@ public interface ConfigManagementService { void stop(); /** - * Get all connector configs from the cluster. + * Get all connector configs from the cluster filter out DELETE set to 1 * * @return */ Map getConnectorConfigs(); + /** + * Get all connector configs from the cluster including DELETED bit set to 1 + * + * @return + */ + Map getConnectorConfigsIncludeDeleted(); + /** * Put the configs of the specified connector in the cluster. * diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java index a74269d22..227d37198 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java @@ -122,6 +122,18 @@ public Map getConnectorConfigs() { return result; } + @Override + public Map getConnectorConfigsIncludeDeleted() { + + Map result = new HashMap<>(); + Map connectorConfigs = connectorKeyValueStore.getKVMap(); + for (String connectorName : connectorConfigs.keySet()) { + ConnectKeyValue config = connectorConfigs.get(connectorName); + result.put(connectorName, config); + } + return result; + } + @Override public String putConnectorConfig(String connectorName, ConnectKeyValue configs) throws Exception { @@ -184,6 +196,7 @@ public void recomputeTaskConfigs(String connectorName, Connector connector, Long public void removeConnectorConfig(String connectorName) { ConnectKeyValue config = new ConnectKeyValue(); + config.put(RuntimeConfigDefine.UPDATE_TIMESATMP, System.currentTimeMillis()); config.put(RuntimeConfigDefine.CONFIG_DELETED, 1); List taskConfigList = new ArrayList<>(); @@ -191,6 +204,7 @@ public void removeConnectorConfig(String connectorName) { connectorKeyValueStore.put(connectorName, config); putTaskConfigs(connectorName, taskConfigList); + log.info("[ISSUE #2027] After removal The configs are:\n" + getConnectorConfigs().toString()); sendSynchronizeConfig(); triggerListener(); } @@ -288,13 +302,11 @@ public void onCompletion(Throwable error, String key, ConnAndTaskConfigs result) * @return */ private boolean mergeConfig(ConnAndTaskConfigs newConnAndTaskConfig) { - boolean changed = false; for (String connectorName : newConnAndTaskConfig.getConnectorConfigs().keySet()) { ConnectKeyValue newConfig = newConnAndTaskConfig.getConnectorConfigs().get(connectorName); - ConnectKeyValue oldConfig = getConnectorConfigs().get(connectorName); + ConnectKeyValue oldConfig = getConnectorConfigsIncludeDeleted().get(connectorName); if (null == oldConfig) { - changed = true; connectorKeyValueStore.put(connectorName, newConfig); taskKeyValueStore.put(connectorName, newConnAndTaskConfig.getTaskConfigs().get(connectorName)); diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java index 2079f85ec..cb86aefb6 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java @@ -77,11 +77,10 @@ public void checkClusterStoreTopic() { * Distribute connectors and tasks according to the {@link RebalanceImpl#allocateConnAndTaskStrategy}. */ public void doRebalance() { - List curAliveWorkers = clusterManagementService.getAllAliveWorkers(); Map curConnectorConfigs = configManagementService.getConnectorConfigs(); Map> curTaskConfigs = configManagementService.getTaskConfigs(); - + log.info("[ISSUE #2027] The connectorConfigs are:" + curConnectorConfigs.toString() + " with timestamp :" + System.currentTimeMillis()); ConnAndTaskConfigs allocateResult = allocateConnAndTaskStrategy.allocate(curAliveWorkers, clusterManagementService.getCurrentWorker(), curConnectorConfigs, curTaskConfigs); log.info("Allocated connector:{}", allocateResult.getConnectorConfigs()); log.info("Allocated task:{}", allocateResult.getTaskConfigs()); diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceService.java index 68a529309..cce32f2ed 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceService.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceService.java @@ -59,6 +59,7 @@ public void run() { this.rebalanceImpl.checkClusterStoreTopic(); while (!this.isStopped()) { + this.waitForRunning(waitInterval); this.rebalanceImpl.doRebalance(); } diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/TaskPositionCommitService.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/TaskPositionCommitService.java index 2717619ca..d4e01fbb2 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/TaskPositionCommitService.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/TaskPositionCommitService.java @@ -17,8 +17,13 @@ package org.apache.rocketmq.connect.runtime.service; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; import org.apache.rocketmq.connect.runtime.common.LoggerName; import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker; +import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSinkTask; +import org.apache.rocketmq.connect.runtime.connectorwrapper.WorkerSourceTask; import org.apache.rocketmq.connect.runtime.utils.ServiceThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,8 +37,19 @@ public class TaskPositionCommitService extends ServiceThread { private Worker worker; - public TaskPositionCommitService(Worker worker) { + + private final PositionManagementService positionManagementService; + + + private final PositionManagementService offsetManagementService; + + + public TaskPositionCommitService(Worker worker, + PositionManagementService positionManagementService, + PositionManagementService offsetManagementService) { this.worker = worker; + this.positionManagementService = positionManagementService; + this.offsetManagementService = offsetManagementService; } @Override @@ -42,7 +58,7 @@ public void run() { while (!this.isStopped()) { this.waitForRunning(10000); - this.worker.commitTaskPosition(); + commitTaskPosition(); } log.info(this.getServiceName() + " service end"); @@ -52,4 +68,19 @@ public void run() { public String getServiceName() { return TaskPositionCommitService.class.getSimpleName(); } + + + public void commitTaskPosition() { + Map positionData = new HashMap<>(); + Map offsetData = new HashMap<>(); + for (Runnable task : worker.getWorkingTasks()) { + if (task instanceof WorkerSourceTask) { + positionData.putAll(((WorkerSourceTask) task).getPositionData()); + positionManagementService.putPosition(positionData); + } else if (task instanceof WorkerSinkTask) { + offsetData.putAll(((WorkerSinkTask) task).getOffsetData()); + offsetManagementService.putPosition(offsetData); + } + } + } } diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/MemoryBasedKeyValueStore.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/MemoryBasedKeyValueStore.java index f37d483ab..ea8a78bdd 100644 --- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/MemoryBasedKeyValueStore.java +++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/store/MemoryBasedKeyValueStore.java @@ -17,15 +17,16 @@ package org.apache.rocketmq.connect.runtime.store; -import java.util.HashMap; + import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; public class MemoryBasedKeyValueStore implements KeyValueStore { protected Map data; public MemoryBasedKeyValueStore() { - this.data = new HashMap<>(); + this.data = new ConcurrentHashMap<>(); } @Override