diff --git a/common/build.gradle b/common/build.gradle index 52b6f97..966bfb7 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -1,6 +1,8 @@ dependencies { modImplementation "net.fabricmc:fabric-loader:${rootProject.fabric_loader_version}" modApi "${rootProject.architectury_id}:architectury:${rootProject.architectury_version}" + + implementation 'io.lettuce:lettuce-core:6.2.3.RELEASE' } architectury { diff --git a/common/src/main/java/cn/zbx1425/third_party/data/redis/RedisChannelInterface.java b/common/src/main/java/cn/zbx1425/third_party/data/redis/RedisChannelInterface.java new file mode 100644 index 0000000..a0106bd --- /dev/null +++ b/common/src/main/java/cn/zbx1425/third_party/data/redis/RedisChannelInterface.java @@ -0,0 +1,78 @@ +package cn.zbx1425.third_party.data.redis; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.pubsub.RedisPubSubAdapter; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; + +import java.util.Map; + +public class RedisChannelInterface { + public RedisChannelMessageQueue Queue = new RedisChannelMessageQueue(); + private final StatefulRedisPubSubConnection channel; + + private final StatefulRedisConnection instance; + + private volatile boolean StartedListening = false; + + public RedisChannelInterface(String URI) { + this.channel = RedisClient.create(URI).connectPubSub(); + this.instance = RedisClient.create(URI).connect(); + } + + public String get(String Key) { + return this.instance.sync().get(Key); + } + + public void set(String Key, String Value) { + this.instance.sync().set(Key, Value); + } + + public void hset(String Key, Map Value) { + this.instance.sync().hset(Key, Value); + } + + public Map hgetall(String Key) { + return this.instance.sync().hgetall(Key); + } + + public void publish(String Channel, String Data) { + this.channel.sync().publish(Channel, Data); + } + + public synchronized void recvChannel(String[] Channels) { + if (StartedListening) { + this.stop(); + } + + RedisPubSubAdapter adapter = new RedisPubSubAdapter<>() { + @Override + public void message(String channel, String message) { + Queue.append(channel, message); + } + }; + + RedisPubSubAsyncCommands command = this.channel.async(); + + command.subscribe(Channels); + + this.channel.addListener(adapter); + + while (StartedListening) { + Thread.onSpinWait(); + } + + this.channel.removeListener(adapter); + } + + public void stop() { + this.StartedListening = false; + } + + public void close() { + this.channel.close(); + this.instance.close(); + } + +} diff --git a/common/src/main/java/cn/zbx1425/third_party/data/redis/RedisChannelMessageQueue.java b/common/src/main/java/cn/zbx1425/third_party/data/redis/RedisChannelMessageQueue.java new file mode 100644 index 0000000..3be8088 --- /dev/null +++ b/common/src/main/java/cn/zbx1425/third_party/data/redis/RedisChannelMessageQueue.java @@ -0,0 +1,20 @@ +package cn.zbx1425.third_party.data.redis; + +import java.util.ArrayList; +import java.util.List; + +//Todo: not implemented, even not a queue +public class RedisChannelMessageQueue { + private List CachedMessage = new ArrayList(); + public RedisChannelMessageQueue() {} + + public synchronized void append(String Channel, String Data) { + CachedMessage.add(Channel+"#"+Data); + } + + public synchronized String next() { + String ret = CachedMessage.get(0); + CachedMessage.remove(0); + return ret; + } +} diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/persist/CommentTable.java b/common/src/main/java/cn/zbx1425/worldcomment/data/persist/CommentTable.java index 21a5b4a..003f71e 100644 --- a/common/src/main/java/cn/zbx1425/worldcomment/data/persist/CommentTable.java +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/persist/CommentTable.java @@ -1,6 +1,7 @@ package cn.zbx1425.worldcomment.data.persist; import cn.zbx1425.worldcomment.data.CommentEntry; +import cn.zbx1425.worldcomment.data.synchronizer.*; import io.netty.buffer.Unpooled; import it.unimi.dsi.fastutil.longs.*; import it.unimi.dsi.fastutil.objects.ObjectIterator; @@ -25,15 +26,24 @@ public class CommentTable { Map> playerIndex = new HashMap<>(); Long2ObjectSortedMap timeIndex = new Long2ObjectAVLTreeMap<>(Comparator.reverseOrder()); - public CommentTable(Database db) { + private final Synchronizer synchronizer; + + public CommentTable(Database db, Synchronizer sync) { this.db = db; + this.synchronizer = sync; + + try { + this.synchronizer.sync(db.basePath.resolve("region")); + } catch (IOException e) { + //Todo: seems need to do something here + } + } public void load() throws IOException { regionIndex.clear(); playerIndex.clear(); timeIndex.clear(); - if (db.isHost) { try { Files.createDirectories(db.basePath.resolve("region")); } catch (FileAlreadyExistsException ignored) { @@ -53,9 +63,6 @@ public void load() throws IOException { } } } - } else { - - } } public void loadRegion(ResourceLocation dimension, long region, byte[] data, boolean fromFile) { @@ -130,10 +137,8 @@ public void insert(CommentEntry newEntry) throws IOException { } } - Path targetFile = getLevelRegionPath(newEntry.level, newEntry.region); - try (FileOutputStream oStream = new FileOutputStream(targetFile.toFile(), true)) { - newEntry.writeFileStream(oStream); - } + + this.synchronizer.update(newEntry, getLevelRegionPath(newEntry.level, newEntry.region)); } regionIndex.get(newEntry.level) .computeIfAbsent(newEntry.region.toLong(), ignored -> new ArrayList<>()) @@ -151,14 +156,12 @@ public void update(CommentEntry newEntry) throws IOException { if (regionData == null) return; for (CommentEntry existingEntry : regionData) { if (existingEntry.id == newEntry.id) { + //Todo: refactor this existingEntry.deleted = newEntry.deleted; existingEntry.like = newEntry.like; assert existingEntry.fileOffset > 0; if (db.isHost) { - Path targetFile = getLevelRegionPath(newEntry.level, newEntry.region); - try (RandomAccessFile oStream = new RandomAccessFile(targetFile.toFile(), "rw")) { - existingEntry.updateInFile(oStream); - } + this.synchronizer.update(existingEntry, getLevelRegionPath(newEntry.level, newEntry.region)); } } } diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/persist/Database.java b/common/src/main/java/cn/zbx1425/worldcomment/data/persist/Database.java index fac07d0..2efd084 100644 --- a/common/src/main/java/cn/zbx1425/worldcomment/data/persist/Database.java +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/persist/Database.java @@ -1,6 +1,7 @@ package cn.zbx1425.worldcomment.data.persist; import cn.zbx1425.worldcomment.data.Snowflake; +import cn.zbx1425.worldcomment.data.synchronizer.SingletonSynchronizer; import net.minecraft.server.MinecraftServer; import net.minecraft.world.level.storage.LevelResource; @@ -21,7 +22,7 @@ public class Database { public Database(MinecraftServer server) { this.server = server; this.basePath = Path.of(server.getWorldPath(LevelResource.ROOT).toString(), "world-comment"); - comments = new CommentTable(this); + comments = new CommentTable(this, new SingletonSynchronizer()); } public void load() throws IOException { diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/Command.java b/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/Command.java new file mode 100644 index 0000000..1c29a00 --- /dev/null +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/Command.java @@ -0,0 +1,31 @@ +package cn.zbx1425.worldcomment.data.synchronizer; + +public class Command { + public static final String COMMAND_CHANNEL = "WORLD_COMMENT_COMMAND_CHANNEL"; + + + public static String DataKey(String ID) { + if (ID.isEmpty()) { + return "WORLD_COMMENT_DATA#ALL"; + } + + return "WORLD_COMMENT_DATA#"+ID; + } + + public static String Request(String ID) { + if (ID.isEmpty()) { + return "REQUEST#ALL"; + } + + return "REQUEST#"+ID; + } + + public static String Update(String ID) { + return "UPDATE#"+ID; + } + + public static String Updated(String ID) { + return "UPDATED#"+ID; + } + +} diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/RedisSynchronizer.java b/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/RedisSynchronizer.java new file mode 100644 index 0000000..f5fbfd9 --- /dev/null +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/RedisSynchronizer.java @@ -0,0 +1,163 @@ +package cn.zbx1425.worldcomment.data.synchronizer; + +import cn.zbx1425.third_party.data.redis.RedisChannelInterface; +import cn.zbx1425.worldcomment.data.CommentEntry; +import io.netty.buffer.Unpooled; +import net.minecraft.network.FriendlyByteBuf; +import net.minecraft.resources.ResourceLocation; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; + +public class RedisSynchronizer implements Synchronizer { + + private final RedisChannelInterface redis; + + private final boolean host; + public RedisSynchronizer(String URI, boolean host) { + this.redis = new RedisChannelInterface(URI); + this.host = host; + } + + @Override + public void sync(Path path) throws IOException { + if (this.host) { + this.upload(path); + } else { + this.fetchAll(path); + } + } + + + @Override + public void update(CommentEntry entry, Path targetFile) throws IOException { + try (RandomAccessFile oStream = new RandomAccessFile(targetFile.toFile(), "rw")) { + entry.updateInFile(oStream); + } + + + if (this.host) { + this.redis.hset( + Command.DataKey(""), + new HashMap(){{ + put(String.valueOf(entry.id), entry.toString()); + }}); + + + this.redis.publish( + Command.COMMAND_CHANNEL, + Command.Updated(String.valueOf(entry.id))); + + } else { + this.redis.set(Command.DataKey( + String.valueOf(entry.id)), + entry.toString()); + + this.redis.publish( + Command.COMMAND_CHANNEL, + Command.Update(String.valueOf(entry.id))); + } + } + + private void upload(Path dirPath) throws IOException { + HashMap pendingUpload = new HashMap<>(); + + try { + Files.createDirectories(dirPath); + } catch (FileAlreadyExistsException ignored) { + + } + try (Stream levelFiles = Files.list(dirPath)) { + for (Path levelPath : levelFiles.toList()) { + ResourceLocation dimension = new ResourceLocation(levelPath.getFileName().toString().replace("+", ":")); + try (Stream files = Files.list(levelPath)) { + for (Path file : files.toList()) { + String[] fileNameParts = file.getFileName().toString().split("\\."); + if (fileNameParts.length != 4 || !fileNameParts[3].equals("bin")) continue; + byte[] data = Files.readAllBytes(file); + FriendlyByteBuf src = new FriendlyByteBuf(Unpooled.wrappedBuffer(data)); + while (src.readerIndex() < data.length) { + CommentEntry entry = new CommentEntry(dimension, src, false); + pendingUpload.put(String.valueOf(entry.id), entry.toString()); + } + } + } + } + } + + this.redis.hset( + Command.DataKey(""), + pendingUpload + ); + + } + + private Map fetchAll(Path dirPath) throws IOException { + Map exist = this.redis.hgetall(Command.DataKey("")); + + try { + Files.createDirectories(dirPath); + } catch (FileAlreadyExistsException ignored) { + + } + try (Stream levelFiles = Files.list(dirPath)) { + for (Path levelPath : levelFiles.toList()) { + ResourceLocation dimension = new ResourceLocation(levelPath.getFileName().toString().replace("+", ":")); + try (Stream files = Files.list(levelPath)) { + for (Path file : files.toList()) { + String[] fileNameParts = file.getFileName().toString().split("\\."); + if (fileNameParts.length != 4 || !fileNameParts[3].equals("bin")) continue; + byte[] data = Files.readAllBytes(file); + FriendlyByteBuf src = new FriendlyByteBuf(Unpooled.wrappedBuffer(data)); + while (src.readerIndex() < data.length) { + CommentEntry entry = new CommentEntry(dimension, src, false); + exist.remove(String.valueOf(entry.id)); + } + } + } + } + } + + exist.forEach((id, entry) -> { + CommentEntry commentEntry = new CommentEntry(); + this.update(commentEntry, getLevelRegionPath(commentEntry.level, commentEntry.region)); + }); + + + //cache cleaned or some other situation + if (false) { + sendFetchCommand(); + } + } + + private void sendFetchCommand() { + this.redis.publish( + Command.COMMAND_CHANNEL, + Command.Request("") + ); + } + + public class receiver extends Thread { + @Override + public void run() { + redis.recvChannel(new String[]{Command.COMMAND_CHANNEL}); + + //todo: set queue handler will be better? + while (true) { + String command = redis.Queue.next(); + + if (command.) { + continue; + } + } + + } + } + +} diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/SingletonSynchronizer.java b/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/SingletonSynchronizer.java new file mode 100644 index 0000000..4c5dfa4 --- /dev/null +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/SingletonSynchronizer.java @@ -0,0 +1,24 @@ +package cn.zbx1425.worldcomment.data.synchronizer; + +import cn.zbx1425.worldcomment.data.CommentEntry; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Path; + +//Todo: doing nothing currently +public class SingletonSynchronizer implements Synchronizer { + + + @Override + public void sync(Path path) throws IOException { + //do nothing + } + + @Override + public void update(CommentEntry entry, Path targetFile) throws IOException { + try (RandomAccessFile oStream = new RandomAccessFile(targetFile.toFile(), "rw")) { + entry.updateInFile(oStream); + } + } +} diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/Synchronizer.java b/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/Synchronizer.java new file mode 100644 index 0000000..69db911 --- /dev/null +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/Synchronizer.java @@ -0,0 +1,13 @@ +package cn.zbx1425.worldcomment.data.synchronizer; + +import cn.zbx1425.worldcomment.data.CommentEntry; + +import java.io.IOException; +import java.nio.file.Path; + +public interface Synchronizer { + + void sync(Path path) throws IOException; + + void update(CommentEntry newEntry, Path targetFile) throws IOException; +}