From 0701386101c10ce0f45e666ed3f294e472027eb4 Mon Sep 17 00:00:00 2001 From: Siqi Date: Mon, 4 Dec 2023 13:14:47 +0800 Subject: [PATCH] [feat] reimplement sync snapshot --- .../com/trilobita/core/common}/Snapshot.java | 14 ++++---- .../core/messaging/MessageProducer.java | 17 ++++------ .../server/masterserver/MasterServer.java | 2 ++ .../execution/ExecutionManager.java | 7 ++++ .../execution/synchronize/Synchronizer.java | 33 +++++++++++-------- .../server/workerserver/WorkerServer.java | 4 +++ 6 files changed, 48 insertions(+), 29 deletions(-) rename {engine/src/main/java/com/trilobita/engine/server/masterserver/execution/synchronize => core/src/main/java/com/trilobita/core/common}/Snapshot.java (76%) diff --git a/engine/src/main/java/com/trilobita/engine/server/masterserver/execution/synchronize/Snapshot.java b/core/src/main/java/com/trilobita/core/common/Snapshot.java similarity index 76% rename from engine/src/main/java/com/trilobita/engine/server/masterserver/execution/synchronize/Snapshot.java rename to core/src/main/java/com/trilobita/core/common/Snapshot.java index ed93203f..6d926055 100644 --- a/engine/src/main/java/com/trilobita/engine/server/masterserver/execution/synchronize/Snapshot.java +++ b/core/src/main/java/com/trilobita/core/common/Snapshot.java @@ -1,4 +1,4 @@ -package com.trilobita.engine.server.masterserver.execution.synchronize; +package com.trilobita.core.common; import com.fasterxml.jackson.databind.ObjectMapper; import com.trilobita.commons.Mail; @@ -8,9 +8,9 @@ import java.io.File; import java.io.IOException; +import java.io.Serializable; import java.util.List; import java.util.Map; -import java.util.Queue; /** * The snapshot of the graph @@ -18,17 +18,19 @@ */ @Slf4j @Data -public class Snapshot { +public class Snapshot implements Serializable { private final int id; private final int superstep; private final Graph graph; private final Map> mailTable; + private final List aliveWorkerIds; private final String snapshotDirectory = "data/snapshot/"; - private Snapshot(int id, int superstep, Graph graph, Map> mailTable) { + private Snapshot(int id, int superstep, Graph graph, List aliveWorkerIds, Map> mailTable) { this.id = id; this.superstep = superstep; this.graph = graph; + this.aliveWorkerIds = aliveWorkerIds; this.mailTable = mailTable; } @@ -61,7 +63,7 @@ public void store() { * @return the snapshot * @param the type of the vertex value */ - public static Snapshot createSnapshot(int snapshotId, int superstep, Graph graph, Map> mailTable) { - return new Snapshot<>(snapshotId, superstep, graph, mailTable); + public static Snapshot createSnapshot(int snapshotId, int superstep, Graph graph, List aliveWorkerIds, Map> mailTable) { + return new Snapshot<>(snapshotId, superstep, graph, aliveWorkerIds, mailTable); } } diff --git a/core/src/main/java/com/trilobita/core/messaging/MessageProducer.java b/core/src/main/java/com/trilobita/core/messaging/MessageProducer.java index 6d6122d4..97159904 100644 --- a/core/src/main/java/com/trilobita/core/messaging/MessageProducer.java +++ b/core/src/main/java/com/trilobita/core/messaging/MessageProducer.java @@ -3,7 +3,7 @@ import com.trilobita.commons.Computable; import com.trilobita.commons.Mail; import com.trilobita.commons.Message; -import com.trilobita.core.graph.Graph; +import com.trilobita.core.common.Snapshot; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; @@ -94,17 +94,15 @@ public static void produceFinishSignal(HashMap> verte } /** - * Produce a sync message among masters to the topic. + * Produce a snapshot message to the topic. * - * @param graph the graph - * @param aliveWorkerIds alive worker ids + * @param snapshot snapshot to be sent */ - public static void produceSyncMessage(Graph graph, List aliveWorkerIds) { - HashMap syncMap = new HashMap<>(); - syncMap.put("GRAPH", graph); - syncMap.put("ALIVE_WORKER_IDS", aliveWorkerIds); + public static void produceSyncMessage(Snapshot snapshot) { + HashMap syncContent = new HashMap<>(); + syncContent.put("SNAPSHOT", snapshot); Message message = new Message(); - message.setContent(syncMap); + message.setContent(syncContent); Mail mail = new Mail(); mail.setMessage(message); MessageProducer.createAndProduce(null, mail, "MASTER_SYNC"); @@ -144,5 +142,4 @@ public static void producePartitionGraphMessage(Object objectMap, Integer server public static void produceWorkerServerMessage(Mail mail, Integer serverId) { MessageProducer.createAndProduce(null, mail, "SERVER_" + serverId + "_MESSAGE"); } - } diff --git a/engine/src/main/java/com/trilobita/engine/server/masterserver/MasterServer.java b/engine/src/main/java/com/trilobita/engine/server/masterserver/MasterServer.java index bfaffd98..b06f64c5 100644 --- a/engine/src/main/java/com/trilobita/engine/server/masterserver/MasterServer.java +++ b/engine/src/main/java/com/trilobita/engine/server/masterserver/MasterServer.java @@ -36,6 +36,8 @@ public class MasterServer extends AbstractServer { List masterIds = new ArrayList<>(); // the alive master servers' ids public boolean isPrimary; MasterFunctionableRunner masterFunctionableRunner; + @Setter + Map> mailTable = new HashMap<>(); // the mail table of the graph public MasterServer(Partitioner graphPartitioner, int nWorker, int id, int nReplica, int snapshotFrequency, boolean isPrimary) throws ExecutionException, InterruptedException { super(id, graphPartitioner.getPartitionStrategy()); // the standard server id of master is 0 diff --git a/engine/src/main/java/com/trilobita/engine/server/masterserver/execution/ExecutionManager.java b/engine/src/main/java/com/trilobita/engine/server/masterserver/execution/ExecutionManager.java index 2a8eaffa..b0929e6f 100644 --- a/engine/src/main/java/com/trilobita/engine/server/masterserver/execution/ExecutionManager.java +++ b/engine/src/main/java/com/trilobita/engine/server/masterserver/execution/ExecutionManager.java @@ -155,9 +155,16 @@ public void partitionGraph(List aliveWorkerIds) { nConfirmWorker = 0; Map> vertexGroups = this.masterServer.getGraphPartitioner().partition(this.masterServer.getGraph(), aliveWorkerIds); vertexGroups.forEach((workerId, vertexGroup) -> { + List mails = new ArrayList<>(); + vertexGroup.getVertices().forEach(vertex -> { + if (masterServer.getMailTable().get(vertex.getId()) != null) { + mails.addAll(masterServer.getMailTable().get(vertex.getId())); + } + }); Map objectMap = new HashMap<>(); objectMap.put("PARTITION", vertexGroup); objectMap.put("PARTITION_STRATEGY", this.masterServer.getGraphPartitioner().getPartitionStrategy()); + objectMap.put("MAILS", mails); MessageProducer.producePartitionGraphMessage(objectMap, workerId); }); } diff --git a/engine/src/main/java/com/trilobita/engine/server/masterserver/execution/synchronize/Synchronizer.java b/engine/src/main/java/com/trilobita/engine/server/masterserver/execution/synchronize/Synchronizer.java index 93f9a971..1e40b779 100644 --- a/engine/src/main/java/com/trilobita/engine/server/masterserver/execution/synchronize/Synchronizer.java +++ b/engine/src/main/java/com/trilobita/engine/server/masterserver/execution/synchronize/Synchronizer.java @@ -1,6 +1,7 @@ package com.trilobita.engine.server.masterserver.execution.synchronize; import com.trilobita.commons.Mail; +import com.trilobita.core.common.Snapshot; import com.trilobita.core.graph.Graph; import com.trilobita.core.messaging.MessageConsumer; import com.trilobita.core.messaging.MessageProducer; @@ -26,22 +27,22 @@ public Synchronizer(MasterServer masterServer) { @Override public void handleMessage(UUID key, Mail value, int partition, long offset) { Map objectMap = (Map) value.getMessage().getContent(); - Graph graph = (Graph) objectMap.get("GRAPH"); - List aliveWorkerIds = (List) objectMap.get("ALIVE_WORKER_IDS"); - synchronize(graph, aliveWorkerIds); + Snapshot snapshot = (Snapshot) objectMap.get("SNAPSHOT"); + synchronize(snapshot); } }); } /** - * Synchronize the graph and alive worker ids among the master and replicas - * @param graph the graph to be synchronized - * @param aliveWorkerIds the alive worker ids + * synchronize the graph with other masters + * @param snapshot the snapshot to synchronize */ - public void synchronize(Graph graph, List aliveWorkerIds) { + public void synchronize(Snapshot snapshot) { // TODO: SYNC should synchronize everything, not just the graph - masterServer.setGraph(graph); - masterServer.setWorkerIds(aliveWorkerIds); + snapshot.store(); + masterServer.setGraph(snapshot.getGraph()); + masterServer.setWorkerIds(snapshot.getAliveWorkerIds()); + masterServer.setMailTable(snapshot.getMailTable()); } /** @@ -49,18 +50,24 @@ public void synchronize(Graph graph, List aliveWorkerIds) { */ public void snapshotAndSync(Graph graph) { log.info("[Snapshot] doing a snapshot"); - Snapshot snapshot = Snapshot.createSnapshot(masterServer.getExecutionManager().getSuperstep(), masterServer.getExecutionManager().getSuperstep(), graph, masterServer.getExecutionManager().snapshotMailTable); + Snapshot snapshot = Snapshot.createSnapshot( + masterServer.getExecutionManager().getSuperstep(), + masterServer.getExecutionManager().getSuperstep(), + graph, + this.masterServer.getWorkerIds(), + masterServer.getExecutionManager().snapshotMailTable + ); snapshot.store(); masterServer.getExecutionManager().snapshotMailTable.clear(); this.snapshots.add(snapshot); - this.syncGraph(); + this.sendSynchronizeMessage(snapshot); } /** * sync the graph with other masters */ - private void syncGraph() { - MessageProducer.produceSyncMessage(this.masterServer.getGraph(), this.masterServer.getWorkerIds()); + private void sendSynchronizeMessage(Snapshot snapshot) { + MessageProducer.produceSyncMessage(snapshot); } /** diff --git a/engine/src/main/java/com/trilobita/engine/server/workerserver/WorkerServer.java b/engine/src/main/java/com/trilobita/engine/server/workerserver/WorkerServer.java index 13b81eb0..cd4cf61c 100644 --- a/engine/src/main/java/com/trilobita/engine/server/workerserver/WorkerServer.java +++ b/engine/src/main/java/com/trilobita/engine/server/workerserver/WorkerServer.java @@ -63,6 +63,10 @@ public void handleMessage(UUID key, Mail mail, int partition, long offset) throw Map res = (Map) mail.getMessage().getContent(); setVertexGroup((VertexGroup) res.get("PARTITION")); PartitionStrategy partitionStrategy = (PartitionStrategy) res.get("PARTITION_STRATEGY"); + List incomingMails = (List) res.get("MAILS"); + if (!incomingMails.isEmpty()) { + getInMailQueue().addAll(incomingMails); + } setPartitionStrategy(partitionStrategy); // assign the server's hashmap to each vertex Metrics.Superstep.initialize();