Skip to content

Commit

Permalink
[feat] reimplement sync snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
sxiangag committed Dec 4, 2023
1 parent 8e7f676 commit 0701386
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.trilobita.engine.server.masterserver.execution.synchronize;
package com.trilobita.core.common;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.trilobita.commons.Mail;
Expand All @@ -8,27 +8,29 @@

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Queue;

/**
* The snapshot of the graph
* @param <T>
*/
@Slf4j
@Data
public class Snapshot<T> {
public class Snapshot<T> implements Serializable {
private final int id;
private final int superstep;
private final Graph<T> graph;
private final Map<Integer, List<Mail>> mailTable;
private final List<Integer> aliveWorkerIds;
private final String snapshotDirectory = "data/snapshot/";

private Snapshot(int id, int superstep, Graph<T> graph, Map<Integer, List<Mail>> mailTable) {
private Snapshot(int id, int superstep, Graph<T> graph, List<Integer> aliveWorkerIds, Map<Integer, List<Mail>> mailTable) {
this.id = id;
this.superstep = superstep;
this.graph = graph;
this.aliveWorkerIds = aliveWorkerIds;
this.mailTable = mailTable;
}

Expand Down Expand Up @@ -61,7 +63,7 @@ public void store() {
* @return the snapshot
* @param <T> the type of the vertex value
*/
public static <T> Snapshot<T> createSnapshot(int snapshotId, int superstep, Graph<T> graph, Map<Integer, List<Mail>> mailTable) {
return new Snapshot<>(snapshotId, superstep, graph, mailTable);
public static <T> Snapshot<T> createSnapshot(int snapshotId, int superstep, Graph<T> graph, List<Integer> aliveWorkerIds, Map<Integer, List<Mail>> mailTable) {
return new Snapshot<>(snapshotId, superstep, graph, aliveWorkerIds, mailTable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.trilobita.commons.Computable;
import com.trilobita.commons.Mail;
import com.trilobita.commons.Message;
import com.trilobita.core.graph.Graph;
import com.trilobita.core.common.Snapshot;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand Down Expand Up @@ -94,17 +94,15 @@ public static <T> void produceFinishSignal(HashMap<Integer, Computable<T>> verte
}

/**
* Produce a sync message among masters to the topic.
* Produce a snapshot message to the topic.
*
* @param graph the graph
* @param aliveWorkerIds alive worker ids
* @param snapshot snapshot to be sent
*/
public static void produceSyncMessage(Graph<?> graph, List<Integer> aliveWorkerIds) {
HashMap<String, Object> syncMap = new HashMap<>();
syncMap.put("GRAPH", graph);
syncMap.put("ALIVE_WORKER_IDS", aliveWorkerIds);
public static void produceSyncMessage(Snapshot<?> snapshot) {
HashMap<String, Object> syncContent = new HashMap<>();
syncContent.put("SNAPSHOT", snapshot);
Message message = new Message();
message.setContent(syncMap);
message.setContent(syncContent);
Mail mail = new Mail();
mail.setMessage(message);
MessageProducer.createAndProduce(null, mail, "MASTER_SYNC");
Expand Down Expand Up @@ -144,5 +142,4 @@ public static void producePartitionGraphMessage(Object objectMap, Integer server
public static void produceWorkerServerMessage(Mail mail, Integer serverId) {
MessageProducer.createAndProduce(null, mail, "SERVER_" + serverId + "_MESSAGE");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public class MasterServer<T> extends AbstractServer<T> {
List<Integer> masterIds = new ArrayList<>(); // the alive master servers' ids
public boolean isPrimary;
MasterFunctionableRunner masterFunctionableRunner;
@Setter
Map<Integer, List<Mail>> mailTable = new HashMap<>(); // the mail table of the graph

public MasterServer(Partitioner<T> graphPartitioner, int nWorker, int id, int nReplica, int snapshotFrequency, boolean isPrimary) throws ExecutionException, InterruptedException {
super(id, graphPartitioner.getPartitionStrategy()); // the standard server id of master is 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,16 @@ public void partitionGraph(List<Integer> aliveWorkerIds) {
nConfirmWorker = 0;
Map<Integer, VertexGroup<T>> vertexGroups = this.masterServer.getGraphPartitioner().partition(this.masterServer.getGraph(), aliveWorkerIds);
vertexGroups.forEach((workerId, vertexGroup) -> {
List<Mail> mails = new ArrayList<>();
vertexGroup.getVertices().forEach(vertex -> {
if (masterServer.getMailTable().get(vertex.getId()) != null) {
mails.addAll(masterServer.getMailTable().get(vertex.getId()));
}
});
Map<String, Object> objectMap = new HashMap<>();
objectMap.put("PARTITION", vertexGroup);
objectMap.put("PARTITION_STRATEGY", this.masterServer.getGraphPartitioner().getPartitionStrategy());
objectMap.put("MAILS", mails);
MessageProducer.producePartitionGraphMessage(objectMap, workerId);
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.trilobita.engine.server.masterserver.execution.synchronize;

import com.trilobita.commons.Mail;
import com.trilobita.core.common.Snapshot;
import com.trilobita.core.graph.Graph;
import com.trilobita.core.messaging.MessageConsumer;
import com.trilobita.core.messaging.MessageProducer;
Expand All @@ -26,41 +27,47 @@ public Synchronizer(MasterServer<T> masterServer) {
@Override
public void handleMessage(UUID key, Mail value, int partition, long offset) {
Map<String, Object> objectMap = (Map<String, Object>) value.getMessage().getContent();
Graph<T> graph = (Graph<T>) objectMap.get("GRAPH");
List<Integer> aliveWorkerIds = (List<Integer>) objectMap.get("ALIVE_WORKER_IDS");
synchronize(graph, aliveWorkerIds);
Snapshot<T> snapshot = (Snapshot<T>) objectMap.get("SNAPSHOT");
synchronize(snapshot);
}
});
}

/**
* Synchronize the graph and alive worker ids among the master and replicas
* @param graph the graph to be synchronized
* @param aliveWorkerIds the alive worker ids
* synchronize the graph with other masters
* @param snapshot the snapshot to synchronize
*/
public void synchronize(Graph<T> graph, List<Integer> aliveWorkerIds) {
public void synchronize(Snapshot<T> snapshot) {
// TODO: SYNC should synchronize everything, not just the graph
masterServer.setGraph(graph);
masterServer.setWorkerIds(aliveWorkerIds);
snapshot.store();
masterServer.setGraph(snapshot.getGraph());
masterServer.setWorkerIds(snapshot.getAliveWorkerIds());
masterServer.setMailTable(snapshot.getMailTable());
}

/**
* do snapshot and sync the graph with other masters
*/
public void snapshotAndSync(Graph<T> graph) {
log.info("[Snapshot] doing a snapshot");
Snapshot<T> snapshot = Snapshot.createSnapshot(masterServer.getExecutionManager().getSuperstep(), masterServer.getExecutionManager().getSuperstep(), graph, masterServer.getExecutionManager().snapshotMailTable);
Snapshot<T> snapshot = Snapshot.createSnapshot(
masterServer.getExecutionManager().getSuperstep(),
masterServer.getExecutionManager().getSuperstep(),
graph,
this.masterServer.getWorkerIds(),
masterServer.getExecutionManager().snapshotMailTable
);
snapshot.store();
masterServer.getExecutionManager().snapshotMailTable.clear();
this.snapshots.add(snapshot);
this.syncGraph();
this.sendSynchronizeMessage(snapshot);
}

/**
* sync the graph with other masters
*/
private void syncGraph() {
MessageProducer.produceSyncMessage(this.masterServer.getGraph(), this.masterServer.getWorkerIds());
private void sendSynchronizeMessage(Snapshot<T> snapshot) {
MessageProducer.produceSyncMessage(snapshot);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ public void handleMessage(UUID key, Mail mail, int partition, long offset) throw
Map<String, Object> res = (Map<String, Object>) mail.getMessage().getContent();
setVertexGroup((VertexGroup<T>) res.get("PARTITION"));
PartitionStrategy partitionStrategy = (PartitionStrategy) res.get("PARTITION_STRATEGY");
List<Mail> incomingMails = (List<Mail>) res.get("MAILS");
if (!incomingMails.isEmpty()) {
getInMailQueue().addAll(incomingMails);
}
setPartitionStrategy(partitionStrategy);
// assign the server's hashmap to each vertex
Metrics.Superstep.initialize();
Expand Down

0 comments on commit 0701386

Please sign in to comment.