Skip to content

Commit

Permalink
fix: dosnapshot not sent in same superstep
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyanbao666 committed Dec 8, 2023
1 parent e8480da commit 76cdb03
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,12 @@ public static void produceStartSignal(boolean doSnapshot) {
*
* @param vertexValues vertex values to be stored as checkpoints
*/
public static <T> void produceFinishSignal(HashMap<Integer, Computable<T>> vertexValues, List<Mail> snapshotMails, boolean complete) {
public static <T> void produceFinishSignal(HashMap<Integer, Computable<T>> vertexValues, List<Mail> snapshotMails, boolean complete, int superstep) {
Map<String, Object> map = new HashMap<>();
map.put("VERTEX_VALUES", vertexValues);
map.put("COMPLETE", complete);
map.put("SNAPSHOT_MAILS", snapshotMails);
map.put("SUPERSTEP", superstep);
Message message = new Message(map);
Mail mail = new Mail(-1, message, Mail.MailType.FINISH_SIGNAL);
MessageProducer.produce(null, mail, mail.getMailType().ordinal());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.trilobita.core.common.Mail;
import com.trilobita.core.graph.VertexGroup;
import com.trilobita.core.messaging.MessageConsumer;
import com.trilobita.core.messaging.MessageProducer;
import com.trilobita.engine.server.masterserver.partition.strategy.PartitionStrategy;
import com.trilobita.core.exception.TrilobitaException;
import lombok.Getter;
Expand Down Expand Up @@ -38,6 +39,7 @@ public AbstractServer(int serverId, PartitionStrategy partitionStrategy) {
@Override
public void handleMessage(UUID key, Mail value, int partition, long offset) {
AbstractServer.this.inMailQueue.add(value);
MessageProducer.produce(null, new Mail(), "VALUE_RECEIVE_CONFIRM");
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ public class ExecutionManager<T> {
private final AtomicInteger nCompleteWorker = new AtomicInteger(0);
private final AtomicInteger nConfirmWorker = new AtomicInteger(0);
@Getter
private AtomicInteger superstep = new AtomicInteger(0);
private final AtomicInteger superstep = new AtomicInteger(0);

private Integer superstepSnapshot = 0;

public ExecutionManager(MasterServer<T> masterServer, int snapshotFrequency) {
this.masterServer = masterServer;
Expand Down Expand Up @@ -68,11 +70,17 @@ public void handleMessage(UUID key, Mail value, int partition, long offset) {
nConfirmWorker.getAndAdd(1);
if (nConfirmWorker.get() == masterServer.getWorkerIds().size()) {
// send start message to all workers
nFinishWorker.set(0);
nCompleteWorker.set(0);
nConfirmWorker.set(0);
masterServer.getHeartbeatManager().setIsHandlingFault(false);
MessageProducer.produce(null, new Mail(), "CONFIRM_START");
}
}
});

// check when a worker receives the message from other servers

// create the complete signal consumer
this.completeSignalConsumer = new MessageConsumer(Mail.MailType.FINISH_SIGNAL.ordinal(), masterServer.getServerId(), new MessageConsumer.MessageHandler() {
@Override
Expand All @@ -88,6 +96,11 @@ public void handleMessage(UUID key, Mail value, int partition, long offset) thro
HashMap<Integer, Computable<T>> vertexValues = (HashMap<Integer, Computable<T>>) content.get("VERTEX_VALUES");
List<Mail> snapshotMails = (List<Mail>) content.get("SNAPSHOT_MAILS");
boolean complete = (boolean) content.get("COMPLETE");
Integer workerSuperstep = (Integer) content.get("SUPERSTEP");
if (workerSuperstep > superstep.get()) {
return;
}
log.info("[Complete signal] {}", complete);
if (complete) {
nCompleteWorker.addAndGet(1);
}
Expand All @@ -108,18 +121,20 @@ public void handleMessage(UUID key, Mail value, int partition, long offset) thro
masterServer.getMasterFunctionableRunner().runFunctionableTasks();
Metrics.Superstep.computeMasterDuration();
Monitor.stopAndStartNewSuperstepMaster();
log.info("[Complete workers] {}", nCompleteWorker);
// check if the master needs to do a snapshot
if (isDoingSnapshot()) {
log.info("[Snapshot] Do Snapshot and Synchronization");
synchronizer.snapshotAndSync(masterServer.getGraph());
superstepSnapshot = superstep.get();
}
// check whether all workers have finished
if (nCompleteWorker.get() == masterServer.getWorkerIds().size()) {
log.info("[Complete] Task completes, the final graph is: {}", masterServer.getGraph());
masterServer.shutdown();
} else {
// start a new superstep
Thread.sleep(50);
Thread.sleep(500);
Metrics.Superstep.setMasterSuperStepStartTime();
superstep();
}
Expand Down Expand Up @@ -153,6 +168,7 @@ public void partitionGraph(List<Integer> aliveWorkerIds) {
nFinishWorker.set(0);
nCompleteWorker.set(0);
nConfirmWorker.set(0);
superstep.set(superstepSnapshot);
Map<Integer, VertexGroup<T>> vertexGroups = this.masterServer.getGraphPartitioner().partition(this.masterServer.getGraph(), aliveWorkerIds);
vertexGroups.forEach((workerId, vertexGroup) -> {
List<Mail> mails = new ArrayList<>();
Expand All @@ -165,6 +181,7 @@ public void partitionGraph(List<Integer> aliveWorkerIds) {
objectMap.put("PARTITION", vertexGroup);
objectMap.put("PARTITION_STRATEGY", this.masterServer.getGraphPartitioner().getPartitionStrategy());
objectMap.put("MAILS", mails);
objectMap.put("SUPERSTEP", superstep.get());
MessageProducer.producePartitionGraphMessage(objectMap, workerId);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.trilobita.engine.server.masterserver.MasterServer;
import com.trilobita.engine.server.masterserver.heartbeat.checker.HeartbeatChecker;
import com.trilobita.engine.server.util.HeartbeatSender;
import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -17,6 +18,7 @@
* It contains the fault handler for the workers and masters
*/
@Slf4j
@Data
public class HeartbeatManager {
MasterServer<?> masterServer;
List<Integer> masterIds;
Expand Down Expand Up @@ -59,7 +61,6 @@ public void handleFault(List<Integer> errors) {
log.info("[Fault] Alive worker ids: {}", masterServer.getWorkerIds());
masterServer.getExecutionManager().partitionGraph(masterServer.getWorkerIds());
log.info("[Fault] Finished fault handling");
isHandlingFault = false;
}
});
masterHeartBeatChecker = new HeartbeatChecker(masterServer.getServerId(), masterIds, false, new HeartbeatChecker.FaultHandler() {
Expand All @@ -73,7 +74,6 @@ public void handleFault(List<Integer> errors) {
masterServer.isPrimary = true;
masterServer.getExecutionManager().partitionGraph(masterServer.getWorkerIds());
masterServer.getMasterFunctionableRunner().becomePrimary();
isHandlingFault = false;
}
});
}
Expand All @@ -88,7 +88,7 @@ public void initializeHeartbeatConsumer() {
public void handleMessage(UUID key, Mail value, int partition, long offset) {
int senderId = (int) value.getMessage().getContent();
if (!masterServer.getWorkerIds().contains(senderId)) {
System.out.println(masterServer.getWorkerIds());
isHandlingFault = true;
masterServer.getWorkerIds().add(senderId);
workerHeartBeatChecker.getHeartbeats().put(senderId, true);
masterServer.getExecutionManager().partitionGraph(masterServer.getWorkerIds());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,31 +57,33 @@ public void handleMessage(UUID key, Mail value, int partition, long offset) thro
@Override
@SuppressWarnings("unchecked")
public void handleMessage(UUID key, Mail mail, int partition, long offset) throws InterruptedException, ExecutionException {
log.info("[Signal] Receive VertexGroup from master");
Monitor.start();
Metrics.setWorkerStartTime();
WorkerServer.this.executionManager.waitForFutures(); // in case of fault, repartition is needed
Map<String, Object> res = (Map<String, Object>) mail.getMessage().getContent();
setVertexGroup((VertexGroup<T>) res.get("PARTITION"));
PartitionStrategy partitionStrategy = (PartitionStrategy) res.get("PARTITION_STRATEGY");
List<Mail> incomingMails = (List<Mail>) res.get("MAILS");
if (!incomingMails.isEmpty()) {
getInMailQueue().addAll(incomingMails);
synchronized (WorkerServer.this) {
log.info("[Signal] Receive VertexGroup from master");
Monitor.start();
Metrics.setWorkerStartTime();
Map<String, Object> res = (Map<String, Object>) mail.getMessage().getContent();
setVertexGroup((VertexGroup<T>) res.get("PARTITION"));
PartitionStrategy partitionStrategy = (PartitionStrategy) res.get("PARTITION_STRATEGY");
List<Mail> incomingMails = (List<Mail>) res.get("MAILS");
superstep = (Integer) res.get("SUPERSTEP");
if (!incomingMails.isEmpty()) {
getInMailQueue().addAll(incomingMails);
}
setPartitionStrategy(partitionStrategy);
// assign the server's hashmap to each vertex
Metrics.Superstep.initialize();
List<Vertex<T>> vertices = vertexGroup.getVertices();
for (Vertex<T> vertex : vertices) {
Metrics.Superstep.incrementVertexNum(1);
Metrics.Superstep.incrementEdgeNum(vertex.getEdges().size());
vertex.setServerQueue(getOutMailQueue());
}
Message message = new Message();
message.setContent(WorkerServer.this.getServerId());
Mail mailToConfirmReceive = new Mail();
mailToConfirmReceive.setMessage(message);
MessageProducer.produce(null, mailToConfirmReceive,"CONFIRM_RECEIVE");
}
setPartitionStrategy(partitionStrategy);
// assign the server's hashmap to each vertex
Metrics.Superstep.initialize();
List<Vertex<T>> vertices = vertexGroup.getVertices();
for (Vertex<T> vertex : vertices) {
Metrics.Superstep.incrementVertexNum(1);
Metrics.Superstep.incrementEdgeNum(vertex.getEdges().size());
vertex.setServerQueue(getOutMailQueue());
}
Message message = new Message();
message.setContent(WorkerServer.this.getServerId());
Mail mailToConfirmReceive = new Mail();
mailToConfirmReceive.setMessage(message);
MessageProducer.produce(null, mailToConfirmReceive,"CONFIRM_RECEIVE");
}
});

Expand Down Expand Up @@ -124,8 +126,7 @@ public void start() throws InterruptedException, ExecutionException {
/**
* Execute the superstep
*/
private void superstep(boolean doSnapshot) throws InterruptedException {
superstep++;
private synchronized void superstep(boolean doSnapshot) throws InterruptedException {
log.info("[Superstep] Start Superstep {}", superstep);
Metrics.Superstep.setSuperstepStartTime();
this.executionManager.setDoSnapshot(doSnapshot);
Expand All @@ -141,6 +142,7 @@ private void superstep(boolean doSnapshot) throws InterruptedException {
sendCompleteSignal(doSnapshot, stop);
Metrics.Superstep.computeSuperstepDuration();
Monitor.stopAndStartNewSuperstep();
superstep++;
}

@Override
Expand Down Expand Up @@ -168,11 +170,12 @@ public void shutdown() throws InterruptedException {
*/
public void sendCompleteSignal(boolean doSnapshot, boolean complete) {
log.info("[Superstep] Superstep {} completed", superstep);
log.info("[Complete] {}, [DoSnapshot] {}", complete, doSnapshot);
log.info("#############################################################################################");
if (doSnapshot) {
MessageProducer.produceFinishSignal(this.vertexGroup.getVertexValues(), this.snapshotMails, complete);
MessageProducer.produceFinishSignal(this.vertexGroup.getVertexValues(), this.snapshotMails, complete, superstep);
} else {
MessageProducer.produceFinishSignal(new HashMap<>(), new ArrayList<>(), false);
MessageProducer.produceFinishSignal(new HashMap<>(), new ArrayList<>(), false, superstep);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static Graph<Double> createVertices(){

public static Graph<Double> createVerticesFromFile(){
try {
return GraphLoader.loadShortestPathGraph("data/graph/Graph-901.csv");
return GraphLoader.loadShortestPathGraph("data/graph/Graph-451.csv");
} catch (IOException e) {
log.error(e.getMessage());
return ShortestPathMasterRunner.createVertices();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public void startSuperstep() {
@Override
public void compute() {
try {
Thread.sleep(10);
Thread.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand Down

0 comments on commit 76cdb03

Please sign in to comment.