Skip to content

Commit

Permalink
[Update]:Change time location/del log
Browse files Browse the repository at this point in the history
  • Loading branch information
liangjunyi010 committed Dec 6, 2023
1 parent 5d1668a commit 4a1217c
Show file tree
Hide file tree
Showing 27 changed files with 964 additions and 261 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ build/

### data ###
/data/snapshot
/data/graph
/data/performance
6 changes: 3 additions & 3 deletions core/src/main/java/com/trilobita/core/common/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ public void store() {
File directory = new File(snapshotDirectory);
if (!directory.exists()) {
if (directory.mkdirs()) {
log.info("[Snapshot] Snapshot directory data/snapshot created successfully");
//log.info("[Snapshot] Snapshot directory data/snapshot created successfully");
} else {
log.info("[Snapshot] Failed to create snapshot directory");
//log.info("[Snapshot] Failed to create snapshot directory");
}
}
ObjectMapper objectMapper = new ObjectMapper();

try {
objectMapper.writeValue(new File(directory, "snapshot_superstep_" + this.superstep + ".json"), this);
log.info("[Snapshot] Snapshot stored");
//log.info("[Snapshot] Snapshot stored");
} catch (IOException e) {
e.printStackTrace();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public Set<String> getConsumerGroupsForTopic(String topic) throws ExecutionExcep
if (memberDescription.assignment().topicPartitions().stream()
.anyMatch(tp -> tp.topic().equals(topic))) {

log.info("Group " + group + " is subscribed to topic " + topic);
//log.info("Group " + group + " is subscribed to topic " + topic);
ret.add(group);
}
}
Expand Down Expand Up @@ -109,7 +109,7 @@ public Map<String, Set<String>> getConsumersForTopic(String topic) throws Execut
for (MemberDescription memberDescription : description.members()) {
if (memberDescription.assignment().topicPartitions().stream()
.anyMatch(tp -> tp.topic().equals(topic))) {
log.debug("Consumer " + memberDescription.consumerId() + " in group " + group + " is subscribed to topic " + topic);
//log.debug("Consumer " + memberDescription.consumerId() + " in group " + group + " is subscribed to topic " + topic);
consumersInGroup.add(memberDescription.consumerId());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ public void start() throws ExecutionException, InterruptedException {
if (!existing.contains(topic)) {
try {
messageAdmin.createIfNotExist(topic);
log.info("existing topic: {} do not contain {}! Creating, and then subscribe...", existing, topic);
//log.info("existing topic: {} do not contain {}! Creating, and then subscribe...", existing, topic);
} catch (ExecutionException | InterruptedException e) {
log.error("[MessageConsumer]", e);
//log.error("[MessageConsumer]", e);
}

}
if (runFlag) {
log.info("already listening to topic: {}!", topic);
//log.info("already listening to topic: {}!", topic);
return;
}
runFlag = true;
Expand All @@ -105,17 +105,17 @@ public void start() throws ExecutionException, InterruptedException {
int partition = consumerRecord.partition();
long offset = consumerRecord.offset();
if (DEBUG_LOG){
log.info("Consumer Record: Topic: {}, key: {}, value: {}, partition: {}, offset: {}",
topic, consumerRecord.key(), value, partition, offset
);
//log.info("Consumer Record: Topic: {}, key: {}, value: {}, partition: {}, offset: {}",
//topic, consumerRecord.key(), value, partition, offset
//);
}
// if (value != null){
messageHandler.handleMessage(UUID.fromString(consumerRecord.key()), value, partition, offset);
// }
}
}
} catch (ExecutionException | JsonProcessingException | InterruptedException e) {
log.error("[MessageConsumer]", e);
//log.error("[MessageConsumer]", e);
}
});
consumerThread.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,15 @@ public static void createTopic(String topic) {
try {
MessageAdmin.getInstance().createIfNotExist(topic);
} catch (ExecutionException | InterruptedException exception) {
log.error("produce create topic: {}", exception.getMessage());
//log.error("produce create topic: {}", exception.getMessage());
}
}

public static void createAndProduce(UUID key, Mail value, String topic) {
try {
MessageAdmin.getInstance().createIfNotExist(topic);
} catch (ExecutionException | InterruptedException exception) {
log.error("produce create topic: {}", exception.getMessage());
//log.error("produce create topic: {}", exception.getMessage());
}
doProduce(key, value, topic);
}
Expand All @@ -76,10 +76,10 @@ public static void doProduce(UUID key, Mail value, String topic) {
// MessageAdmin.getInstance().props)) {
producer.send(new ProducerRecord<>(topic, finalKey.toString(), value), (event, ex) -> {
if (ex != null) {
log.error("[Message] error producing message: ", ex);
//log.error("[Message] error producing message: ", ex);
} else {
if (LOG_FLAG) {
log.info("[Message] produced event to topic {}: key = {} value = {}", topic, finalKey, value);
//log.info("[Message] produced event to topic {}: key = {} value = {}", topic, finalKey, value);
}
}
});
Expand Down
Loading

0 comments on commit 4a1217c

Please sign in to comment.