Skip to content

Commit

Permalink
Fix redis sync errors
Browse files Browse the repository at this point in the history
  • Loading branch information
zbx1425 committed Aug 20, 2023
1 parent caac8e0 commit 56cc76d
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 43 deletions.
2 changes: 1 addition & 1 deletion common/src/main/java/cn/zbx1425/worldcomment/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@ public void save(Path configPath) throws IOException {
json.addProperty("uplinkUrl", uplinkUrl);
json.addProperty("uplinkAuthKey", uplinkAuthKey);

Files.writeString(configPath, new GsonBuilder().setPrettyPrinting().create().toJson(json.toString()));
Files.writeString(configPath, new GsonBuilder().setPrettyPrinting().create().toJson(json));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ public String toBinaryString() {
FriendlyByteBuf dest = new FriendlyByteBuf(Unpooled.buffer());
dest.writeResourceLocation(level);
writeBuffer(dest, false);
return Base64.encodeBase64String(dest.array());
byte[] destArray = new byte[dest.writerIndex()];
dest.getBytes(0, destArray);
return Base64.encodeBase64String(destArray);
}

public static CommentEntry fromBinaryString(String str) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package cn.zbx1425.worldcomment.data.sync;

import cn.zbx1425.worldcomment.data.CommentEntry;
import cn.zbx1425.worldcomment.data.ServerWorldData;
import io.lettuce.core.api.StatefulRedisConnection;

public class RedisMessage {

public static final String COMMAND_CHANNEL = "WORLD_COMMENT_COMMAND_CHANNEL";

private static final String INSTANCE_ID = Long.toHexString(ServerWorldData.SNOWFLAKE.nextId());

public String initiator;
public String action;
public String content;

public RedisMessage(String action, String content) {
this.initiator = INSTANCE_ID;
this.action = action;
this.content = content;
}

public RedisMessage(String redisCommand) {
int firstHash = redisCommand.indexOf(':');
int lastHash = redisCommand.lastIndexOf(':');
this.action = redisCommand.substring(0, firstHash);
this.initiator = redisCommand.substring(firstHash + 1, lastHash);
this.content = redisCommand.substring(lastHash + 1);
}

public static RedisMessage insert(CommentEntry entry) {
return new RedisMessage("INSERT", entry.toBinaryString());
}

public static RedisMessage update(CommentEntry entry) {
return new RedisMessage("UPDATE", entry.toBinaryString());
}

public void publishAsync(StatefulRedisConnection<String, String> connection) {
connection.async().publish(COMMAND_CHANNEL, String.format("%s:%s:%s", action, initiator, content));
}

public boolean isFromSelf() {
return initiator.equals(INSTANCE_ID);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import cn.zbx1425.worldcomment.data.ServerWorldData;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap;
Expand All @@ -16,41 +17,47 @@

public class RedisSynchronizer implements Synchronizer {

private final StatefulRedisPubSubConnection<String, String> redisChannel;
private final StatefulRedisPubSubConnection<String, String> redisSub;
private final StatefulRedisConnection<String, String> redisConn;

public static final String HMAP_ALL_KEY = "WORLD_COMMENT_DATA_ALL";

private final ServerWorldData serverWorldData;

public RedisSynchronizer(String URI, ServerWorldData serverWorldData) {
redisConn = RedisClient.create(URI).connect();
redisChannel = RedisClient.create(URI).connectPubSub();
redisChannel.addListener(new Listener());
redisChannel.sync().subscribe(RedisCommand.COMMAND_CHANNEL);
redisSub = RedisClient.create(URI).connectPubSub();
redisSub.addListener(new Listener());
redisSub.sync().subscribe(RedisMessage.COMMAND_CHANNEL);

this.serverWorldData = serverWorldData;
}

@Override
public void kvWriteAll(Long2ObjectSortedMap<CommentEntry> all) {
RedisAsyncCommands<String, String> commands = redisConn.async();
commands.multi();
commands.del(HMAP_ALL_KEY);
HashMap<String, String> data = new HashMap<>();
for (CommentEntry entry : all.values()) {
data.put(Long.toHexString(entry.id), entry.toBinaryString());
}
redisConn.async().hset(RedisCommand.HMAP_ALL_ID, data);
commands.hset(HMAP_ALL_KEY, data);
commands.exec();
}

@Override
public void kvWriteEntry(CommentEntry newEntry) {
if (newEntry.deleted) {
redisConn.async().hdel(RedisCommand.HMAP_ALL_ID, Long.toHexString(newEntry.id));
redisConn.async().hdel(HMAP_ALL_KEY, Long.toHexString(newEntry.id));
} else {
redisConn.async().hset(RedisCommand.HMAP_ALL_ID, Long.toHexString(newEntry.id), newEntry.toBinaryString());
redisConn.async().hset(HMAP_ALL_KEY, Long.toHexString(newEntry.id), newEntry.toBinaryString());
}
}

@Override
public void notifyInsert(CommentEntry newEntry) {
redisChannel.async().publish(RedisCommand.COMMAND_CHANNEL, RedisCommand.Insert(newEntry.toBinaryString()));
RedisMessage.insert(newEntry).publishAsync(redisConn);
}

private void handleInsert(CommentEntry peerEntry) throws IOException {
Expand All @@ -59,7 +66,7 @@ private void handleInsert(CommentEntry peerEntry) throws IOException {

@Override
public void notifyUpdate(CommentEntry newEntry) {
redisChannel.async().publish(RedisCommand.COMMAND_CHANNEL, RedisCommand.Update(newEntry.toBinaryString()));
RedisMessage.update(newEntry).publishAsync(redisConn);
}

private void handleUpdate(CommentEntry peerEntry) throws IOException {
Expand All @@ -68,25 +75,27 @@ private void handleUpdate(CommentEntry peerEntry) throws IOException {

@Override
public void kvReadAllInto(CommentCache comments) throws IOException {
Map<String, String> data = redisChannel.sync().hgetall(RedisCommand.HMAP_ALL_ID);
Map<String, String> data = redisConn.sync().hgetall(HMAP_ALL_KEY);
for (String entry : data.values()) {
comments.insert(CommentEntry.fromBinaryString(entry));
}
}

@Override
public void close() {
redisChannel.close();
redisSub.close();
redisConn.close();
}

public class Listener implements RedisPubSubListener<String, String> {
@Override
public void message(String channel, String message) {
public void message(String channel, String rawMessage) {
RedisMessage message = new RedisMessage(rawMessage);
if (message.isFromSelf()) return;
try {
switch (RedisCommand.getAction(message)) {
case "INSERT" -> handleInsert(CommentEntry.fromBinaryString(RedisCommand.getContent(message)));
case "UPDATE" -> handleUpdate(CommentEntry.fromBinaryString(RedisCommand.getContent(message)));
switch (message.action) {
case "INSERT" -> handleInsert(CommentEntry.fromBinaryString(message.content));
case "UPDATE" -> handleUpdate(CommentEntry.fromBinaryString(message.content));
}
} catch (IOException ex) {
Main.LOGGER.error("Redis handler", ex);
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Done to increase the memory available to gradle.
org.gradle.jvmargs=-Xmx2G
org.gradle.jvmargs=-Xmx4G

# Mod Properties
mod_version=0.0.1
Expand Down

0 comments on commit 56cc76d

Please sign in to comment.