From cdbefc7565e71a2a3df4e27812b1b438559b7879 Mon Sep 17 00:00:00 2001 From: Polyneices <62021363+cute-rui@users.noreply.github.com> Date: Tue, 1 Aug 2023 22:19:42 +0800 Subject: [PATCH 1/2] =?UTF-8?q?pre:=20=E8=BF=98=E6=B2=A1=E5=86=99=E5=AE=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/build.gradle | 2 + .../data/redis/RedisChannelInterface.java | 78 +++++++++ .../data/redis/RedisChannelMessageQueue.java | 20 +++ .../data/persist/CommentTable.java | 29 ++-- .../worldcomment/data/persist/Database.java | 3 +- .../data/synchronizer/Command.java | 31 ++++ .../data/synchronizer/RedisSynchronizer.java | 163 ++++++++++++++++++ .../synchronizer/SingletonSynchronizer.java | 24 +++ .../data/synchronizer/Synchronizer.java | 13 ++ 9 files changed, 349 insertions(+), 14 deletions(-) create mode 100644 common/src/main/java/cn/zbx1425/third_party/data/redis/RedisChannelInterface.java create mode 100644 common/src/main/java/cn/zbx1425/third_party/data/redis/RedisChannelMessageQueue.java create mode 100644 common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/Command.java create mode 100644 common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/RedisSynchronizer.java create mode 100644 common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/SingletonSynchronizer.java create mode 100644 common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/Synchronizer.java diff --git a/common/build.gradle b/common/build.gradle index 52b6f97..966bfb7 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -1,6 +1,8 @@ dependencies { modImplementation "net.fabricmc:fabric-loader:${rootProject.fabric_loader_version}" modApi "${rootProject.architectury_id}:architectury:${rootProject.architectury_version}" + + implementation 'io.lettuce:lettuce-core:6.2.3.RELEASE' } architectury { diff --git a/common/src/main/java/cn/zbx1425/third_party/data/redis/RedisChannelInterface.java b/common/src/main/java/cn/zbx1425/third_party/data/redis/RedisChannelInterface.java new file mode 100644 index 0000000..a0106bd --- /dev/null +++ b/common/src/main/java/cn/zbx1425/third_party/data/redis/RedisChannelInterface.java @@ -0,0 +1,78 @@ +package cn.zbx1425.third_party.data.redis; + +import io.lettuce.core.RedisClient; +import io.lettuce.core.api.StatefulRedisConnection; +import io.lettuce.core.pubsub.RedisPubSubAdapter; +import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands; + +import java.util.Map; + +public class RedisChannelInterface { + public RedisChannelMessageQueue Queue = new RedisChannelMessageQueue(); + private final StatefulRedisPubSubConnection channel; + + private final StatefulRedisConnection instance; + + private volatile boolean StartedListening = false; + + public RedisChannelInterface(String URI) { + this.channel = RedisClient.create(URI).connectPubSub(); + this.instance = RedisClient.create(URI).connect(); + } + + public String get(String Key) { + return this.instance.sync().get(Key); + } + + public void set(String Key, String Value) { + this.instance.sync().set(Key, Value); + } + + public void hset(String Key, Map Value) { + this.instance.sync().hset(Key, Value); + } + + public Map hgetall(String Key) { + return this.instance.sync().hgetall(Key); + } + + public void publish(String Channel, String Data) { + this.channel.sync().publish(Channel, Data); + } + + public synchronized void recvChannel(String[] Channels) { + if (StartedListening) { + this.stop(); + } + + RedisPubSubAdapter adapter = new RedisPubSubAdapter<>() { + @Override + public void message(String channel, String message) { + Queue.append(channel, message); + } + }; + + RedisPubSubAsyncCommands command = this.channel.async(); + + command.subscribe(Channels); + + this.channel.addListener(adapter); + + while (StartedListening) { + Thread.onSpinWait(); + } + + this.channel.removeListener(adapter); + } + + public void stop() { + this.StartedListening = false; + } + + public void close() { + this.channel.close(); + this.instance.close(); + } + +} diff --git a/common/src/main/java/cn/zbx1425/third_party/data/redis/RedisChannelMessageQueue.java b/common/src/main/java/cn/zbx1425/third_party/data/redis/RedisChannelMessageQueue.java new file mode 100644 index 0000000..3be8088 --- /dev/null +++ b/common/src/main/java/cn/zbx1425/third_party/data/redis/RedisChannelMessageQueue.java @@ -0,0 +1,20 @@ +package cn.zbx1425.third_party.data.redis; + +import java.util.ArrayList; +import java.util.List; + +//Todo: not implemented, even not a queue +public class RedisChannelMessageQueue { + private List CachedMessage = new ArrayList(); + public RedisChannelMessageQueue() {} + + public synchronized void append(String Channel, String Data) { + CachedMessage.add(Channel+"#"+Data); + } + + public synchronized String next() { + String ret = CachedMessage.get(0); + CachedMessage.remove(0); + return ret; + } +} diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/persist/CommentTable.java b/common/src/main/java/cn/zbx1425/worldcomment/data/persist/CommentTable.java index 21a5b4a..003f71e 100644 --- a/common/src/main/java/cn/zbx1425/worldcomment/data/persist/CommentTable.java +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/persist/CommentTable.java @@ -1,6 +1,7 @@ package cn.zbx1425.worldcomment.data.persist; import cn.zbx1425.worldcomment.data.CommentEntry; +import cn.zbx1425.worldcomment.data.synchronizer.*; import io.netty.buffer.Unpooled; import it.unimi.dsi.fastutil.longs.*; import it.unimi.dsi.fastutil.objects.ObjectIterator; @@ -25,15 +26,24 @@ public class CommentTable { Map> playerIndex = new HashMap<>(); Long2ObjectSortedMap timeIndex = new Long2ObjectAVLTreeMap<>(Comparator.reverseOrder()); - public CommentTable(Database db) { + private final Synchronizer synchronizer; + + public CommentTable(Database db, Synchronizer sync) { this.db = db; + this.synchronizer = sync; + + try { + this.synchronizer.sync(db.basePath.resolve("region")); + } catch (IOException e) { + //Todo: seems need to do something here + } + } public void load() throws IOException { regionIndex.clear(); playerIndex.clear(); timeIndex.clear(); - if (db.isHost) { try { Files.createDirectories(db.basePath.resolve("region")); } catch (FileAlreadyExistsException ignored) { @@ -53,9 +63,6 @@ public void load() throws IOException { } } } - } else { - - } } public void loadRegion(ResourceLocation dimension, long region, byte[] data, boolean fromFile) { @@ -130,10 +137,8 @@ public void insert(CommentEntry newEntry) throws IOException { } } - Path targetFile = getLevelRegionPath(newEntry.level, newEntry.region); - try (FileOutputStream oStream = new FileOutputStream(targetFile.toFile(), true)) { - newEntry.writeFileStream(oStream); - } + + this.synchronizer.update(newEntry, getLevelRegionPath(newEntry.level, newEntry.region)); } regionIndex.get(newEntry.level) .computeIfAbsent(newEntry.region.toLong(), ignored -> new ArrayList<>()) @@ -151,14 +156,12 @@ public void update(CommentEntry newEntry) throws IOException { if (regionData == null) return; for (CommentEntry existingEntry : regionData) { if (existingEntry.id == newEntry.id) { + //Todo: refactor this existingEntry.deleted = newEntry.deleted; existingEntry.like = newEntry.like; assert existingEntry.fileOffset > 0; if (db.isHost) { - Path targetFile = getLevelRegionPath(newEntry.level, newEntry.region); - try (RandomAccessFile oStream = new RandomAccessFile(targetFile.toFile(), "rw")) { - existingEntry.updateInFile(oStream); - } + this.synchronizer.update(existingEntry, getLevelRegionPath(newEntry.level, newEntry.region)); } } } diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/persist/Database.java b/common/src/main/java/cn/zbx1425/worldcomment/data/persist/Database.java index fac07d0..2efd084 100644 --- a/common/src/main/java/cn/zbx1425/worldcomment/data/persist/Database.java +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/persist/Database.java @@ -1,6 +1,7 @@ package cn.zbx1425.worldcomment.data.persist; import cn.zbx1425.worldcomment.data.Snowflake; +import cn.zbx1425.worldcomment.data.synchronizer.SingletonSynchronizer; import net.minecraft.server.MinecraftServer; import net.minecraft.world.level.storage.LevelResource; @@ -21,7 +22,7 @@ public class Database { public Database(MinecraftServer server) { this.server = server; this.basePath = Path.of(server.getWorldPath(LevelResource.ROOT).toString(), "world-comment"); - comments = new CommentTable(this); + comments = new CommentTable(this, new SingletonSynchronizer()); } public void load() throws IOException { diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/Command.java b/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/Command.java new file mode 100644 index 0000000..1c29a00 --- /dev/null +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/Command.java @@ -0,0 +1,31 @@ +package cn.zbx1425.worldcomment.data.synchronizer; + +public class Command { + public static final String COMMAND_CHANNEL = "WORLD_COMMENT_COMMAND_CHANNEL"; + + + public static String DataKey(String ID) { + if (ID.isEmpty()) { + return "WORLD_COMMENT_DATA#ALL"; + } + + return "WORLD_COMMENT_DATA#"+ID; + } + + public static String Request(String ID) { + if (ID.isEmpty()) { + return "REQUEST#ALL"; + } + + return "REQUEST#"+ID; + } + + public static String Update(String ID) { + return "UPDATE#"+ID; + } + + public static String Updated(String ID) { + return "UPDATED#"+ID; + } + +} diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/RedisSynchronizer.java b/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/RedisSynchronizer.java new file mode 100644 index 0000000..f5fbfd9 --- /dev/null +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/RedisSynchronizer.java @@ -0,0 +1,163 @@ +package cn.zbx1425.worldcomment.data.synchronizer; + +import cn.zbx1425.third_party.data.redis.RedisChannelInterface; +import cn.zbx1425.worldcomment.data.CommentEntry; +import io.netty.buffer.Unpooled; +import net.minecraft.network.FriendlyByteBuf; +import net.minecraft.resources.ResourceLocation; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; + +public class RedisSynchronizer implements Synchronizer { + + private final RedisChannelInterface redis; + + private final boolean host; + public RedisSynchronizer(String URI, boolean host) { + this.redis = new RedisChannelInterface(URI); + this.host = host; + } + + @Override + public void sync(Path path) throws IOException { + if (this.host) { + this.upload(path); + } else { + this.fetchAll(path); + } + } + + + @Override + public void update(CommentEntry entry, Path targetFile) throws IOException { + try (RandomAccessFile oStream = new RandomAccessFile(targetFile.toFile(), "rw")) { + entry.updateInFile(oStream); + } + + + if (this.host) { + this.redis.hset( + Command.DataKey(""), + new HashMap(){{ + put(String.valueOf(entry.id), entry.toString()); + }}); + + + this.redis.publish( + Command.COMMAND_CHANNEL, + Command.Updated(String.valueOf(entry.id))); + + } else { + this.redis.set(Command.DataKey( + String.valueOf(entry.id)), + entry.toString()); + + this.redis.publish( + Command.COMMAND_CHANNEL, + Command.Update(String.valueOf(entry.id))); + } + } + + private void upload(Path dirPath) throws IOException { + HashMap pendingUpload = new HashMap<>(); + + try { + Files.createDirectories(dirPath); + } catch (FileAlreadyExistsException ignored) { + + } + try (Stream levelFiles = Files.list(dirPath)) { + for (Path levelPath : levelFiles.toList()) { + ResourceLocation dimension = new ResourceLocation(levelPath.getFileName().toString().replace("+", ":")); + try (Stream files = Files.list(levelPath)) { + for (Path file : files.toList()) { + String[] fileNameParts = file.getFileName().toString().split("\\."); + if (fileNameParts.length != 4 || !fileNameParts[3].equals("bin")) continue; + byte[] data = Files.readAllBytes(file); + FriendlyByteBuf src = new FriendlyByteBuf(Unpooled.wrappedBuffer(data)); + while (src.readerIndex() < data.length) { + CommentEntry entry = new CommentEntry(dimension, src, false); + pendingUpload.put(String.valueOf(entry.id), entry.toString()); + } + } + } + } + } + + this.redis.hset( + Command.DataKey(""), + pendingUpload + ); + + } + + private Map fetchAll(Path dirPath) throws IOException { + Map exist = this.redis.hgetall(Command.DataKey("")); + + try { + Files.createDirectories(dirPath); + } catch (FileAlreadyExistsException ignored) { + + } + try (Stream levelFiles = Files.list(dirPath)) { + for (Path levelPath : levelFiles.toList()) { + ResourceLocation dimension = new ResourceLocation(levelPath.getFileName().toString().replace("+", ":")); + try (Stream files = Files.list(levelPath)) { + for (Path file : files.toList()) { + String[] fileNameParts = file.getFileName().toString().split("\\."); + if (fileNameParts.length != 4 || !fileNameParts[3].equals("bin")) continue; + byte[] data = Files.readAllBytes(file); + FriendlyByteBuf src = new FriendlyByteBuf(Unpooled.wrappedBuffer(data)); + while (src.readerIndex() < data.length) { + CommentEntry entry = new CommentEntry(dimension, src, false); + exist.remove(String.valueOf(entry.id)); + } + } + } + } + } + + exist.forEach((id, entry) -> { + CommentEntry commentEntry = new CommentEntry(); + this.update(commentEntry, getLevelRegionPath(commentEntry.level, commentEntry.region)); + }); + + + //cache cleaned or some other situation + if (false) { + sendFetchCommand(); + } + } + + private void sendFetchCommand() { + this.redis.publish( + Command.COMMAND_CHANNEL, + Command.Request("") + ); + } + + public class receiver extends Thread { + @Override + public void run() { + redis.recvChannel(new String[]{Command.COMMAND_CHANNEL}); + + //todo: set queue handler will be better? + while (true) { + String command = redis.Queue.next(); + + if (command.) { + continue; + } + } + + } + } + +} diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/SingletonSynchronizer.java b/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/SingletonSynchronizer.java new file mode 100644 index 0000000..4c5dfa4 --- /dev/null +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/SingletonSynchronizer.java @@ -0,0 +1,24 @@ +package cn.zbx1425.worldcomment.data.synchronizer; + +import cn.zbx1425.worldcomment.data.CommentEntry; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.Path; + +//Todo: doing nothing currently +public class SingletonSynchronizer implements Synchronizer { + + + @Override + public void sync(Path path) throws IOException { + //do nothing + } + + @Override + public void update(CommentEntry entry, Path targetFile) throws IOException { + try (RandomAccessFile oStream = new RandomAccessFile(targetFile.toFile(), "rw")) { + entry.updateInFile(oStream); + } + } +} diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/Synchronizer.java b/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/Synchronizer.java new file mode 100644 index 0000000..69db911 --- /dev/null +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/Synchronizer.java @@ -0,0 +1,13 @@ +package cn.zbx1425.worldcomment.data.synchronizer; + +import cn.zbx1425.worldcomment.data.CommentEntry; + +import java.io.IOException; +import java.nio.file.Path; + +public interface Synchronizer { + + void sync(Path path) throws IOException; + + void update(CommentEntry newEntry, Path targetFile) throws IOException; +} From 64a7b5f76ff42dcdf7fabbc091e280427c804556 Mon Sep 17 00:00:00 2001 From: Polyneices <62021363+cute-rui@users.noreply.github.com> Date: Wed, 16 Aug 2023 20:43:19 +0800 Subject: [PATCH 2/2] =?UTF-8?q?pre:=E4=BB=8D=E7=84=B6=E6=B2=A1=E5=86=99?= =?UTF-8?q?=E5=AE=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.gradle | 3 + common/build.gradle | 2 - .../java/cn/zbx1425/worldcomment/Main.java | 12 +- .../worldcomment/data/CommentEntry.java | 27 ++ .../worldcomment/data/ServerWorldData.java | 29 +- .../data/persist/FileSerializer.java | 11 + .../worldcomment/data/sync/Command.java | 21 + .../data/sync/RedisChannelInterface.java | 13 +- .../data/sync/RedisChannelMessageQueue.java | 6 + .../data/sync/RedisSynchronizer.java | 402 +++++++++++------- .../data/sync/SingletonSynchronizer.java | 23 +- .../worldcomment/data/sync/Synchronizer.java | 12 +- .../mixin/CreativeModeTabsAccessor.java | 16 + .../main/resources/worldcomment.mixins.json | 5 +- forge/build.gradle | 2 + 15 files changed, 399 insertions(+), 185 deletions(-) create mode 100644 common/src/main/java/cn/zbx1425/worldcomment/mixin/CreativeModeTabsAccessor.java diff --git a/build.gradle b/build.gradle index 1a0b161..abffbdb 100644 --- a/build.gradle +++ b/build.gradle @@ -78,6 +78,9 @@ subprojects { officialMojangMappings() parchment("org.parchmentmc.data:parchment-${minecraft_version}:${parchment_version}@zip") } + + implementation 'io.lettuce:lettuce-core:6.2.3.RELEASE' + shadowCommon 'io.lettuce:lettuce-core:6.2.3.RELEASE' } } diff --git a/common/build.gradle b/common/build.gradle index 966bfb7..52b6f97 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -1,8 +1,6 @@ dependencies { modImplementation "net.fabricmc:fabric-loader:${rootProject.fabric_loader_version}" modApi "${rootProject.architectury_id}:architectury:${rootProject.architectury_version}" - - implementation 'io.lettuce:lettuce-core:6.2.3.RELEASE' } architectury { diff --git a/common/src/main/java/cn/zbx1425/worldcomment/Main.java b/common/src/main/java/cn/zbx1425/worldcomment/Main.java index f805fd4..d55537f 100644 --- a/common/src/main/java/cn/zbx1425/worldcomment/Main.java +++ b/common/src/main/java/cn/zbx1425/worldcomment/Main.java @@ -1,7 +1,10 @@ package cn.zbx1425.worldcomment; import cn.zbx1425.worldcomment.data.ServerWorldData; +import cn.zbx1425.worldcomment.data.sync.RedisSynchronizer; +import cn.zbx1425.worldcomment.data.sync.Synchronizer; import cn.zbx1425.worldcomment.item.CommentToolItem; +import cn.zbx1425.worldcomment.mixin.CreativeModeTabsAccessor; import cn.zbx1425.worldcomment.network.PacketCollectionRequestC2S; import cn.zbx1425.worldcomment.network.PacketEntryActionC2S; import cn.zbx1425.worldcomment.network.PacketRegionRequestC2S; @@ -10,10 +13,12 @@ import cn.zbx1425.worldcomment.util.RegistryObject; import net.minecraft.world.item.CreativeModeTabs; import net.minecraft.world.item.Item; +import net.minecraft.world.level.storage.LevelResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.file.Path; public class Main { @@ -25,7 +30,7 @@ public class Main { public static final RegistryObject ITEM_COMMENT_TOOL = new RegistryObject<>(CommentToolItem::new); public static void init(RegistriesWrapper registries) { - registries.registerItem("comment_tool", ITEM_COMMENT_TOOL, CreativeModeTabs.TOOLS_AND_UTILITIES); + registries.registerItem("comment_tool", ITEM_COMMENT_TOOL, CreativeModeTabsAccessor.getTOOLS_AND_UTILITIES()); ServerPlatform.registerNetworkReceiver( PacketRegionRequestC2S.IDENTIFIER, PacketRegionRequestC2S::handle); @@ -38,7 +43,10 @@ public static void init(RegistriesWrapper registries) { ServerPlatform.registerServerStartingEvent(server -> { try { - DATABASE = new ServerWorldData(server); + //Todo: config inject here + + Synchronizer synchronizer = new RedisSynchronizer("redis://192.168.1.148:6379/0", Path.of(server.getWorldPath(LevelResource.ROOT).toString(), "world-comment"), true); + DATABASE = new ServerWorldData(server, synchronizer); DATABASE.load(); } catch (IOException e) { LOGGER.error("Failed to open data storage", e); diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/CommentEntry.java b/common/src/main/java/cn/zbx1425/worldcomment/data/CommentEntry.java index a59a12d..2c8f57f 100644 --- a/common/src/main/java/cn/zbx1425/worldcomment/data/CommentEntry.java +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/CommentEntry.java @@ -1,6 +1,8 @@ package cn.zbx1425.worldcomment.data; import cn.zbx1425.worldcomment.data.network.ThumbImage; +import com.google.gson.Gson; +import com.google.gson.JsonObject; import io.netty.buffer.Unpooled; import net.minecraft.Util; import net.minecraft.core.BlockPos; @@ -111,4 +113,29 @@ public void updateInFile(RandomAccessFile oFile) throws IOException { oFile.writeInt(like); } + public String toJson() { + //Todo: wth is this + JsonObject json = new JsonObject(); + json.addProperty("id", id); + json.addProperty("timestamp", timestamp); + json.addProperty("level", level.toString()); + json.addProperty("region", region.toString()); + json.addProperty("location", location.toString()); + json.addProperty("initiator", initiator.toString()); + json.addProperty("initiatorName", initiatorName); + json.addProperty("messageType", messageType); + json.addProperty("message", message); + json.addProperty("image", image.url); + json.addProperty("thumb", image.thumbUrl); + json.addProperty("deleted", deleted); + json.addProperty("like", like); + return json.toString(); + } + + + public static CommentEntry fromJson(String json) { + Gson g = new Gson(); + + return g.fromJson(json, CommentEntry.class); + } } diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/ServerWorldData.java b/common/src/main/java/cn/zbx1425/worldcomment/data/ServerWorldData.java index f3beeb4..3e77dd8 100644 --- a/common/src/main/java/cn/zbx1425/worldcomment/data/ServerWorldData.java +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/ServerWorldData.java @@ -21,14 +21,14 @@ public class ServerWorldData { public final CommentCache comments = new CommentCache(); public final FileSerializer fileSerializer; - public final RedisSynchronizer synchronizer; + public final Synchronizer synchronizer; - public ServerWorldData(MinecraftServer server) { + public ServerWorldData(MinecraftServer server, Synchronizer synchronizer) { this.server = server; this.basePath = Path.of(server.getWorldPath(LevelResource.ROOT).toString(), "world-comment"); fileSerializer = new FileSerializer(basePath); //Todo: sync config inject - this.synchronizer = new RedisSynchronizer("", true, basePath) + this.synchronizer = synchronizer; } public void load() throws IOException { @@ -36,29 +36,20 @@ public void load() throws IOException { fileSerializer.loadInto(comments); synchronizer.kvWriteAll(comments.timeIndex); } else { + //will cover all data synchronizer.kvReadAllInto(comments); } } - public void insert(CommentEntry newEntry, boolean fromPeer) throws IOException { + public void insert(CommentEntry newEntry) throws IOException { comments.insert(newEntry); - if (isHost) { - fileSerializer.insert(newEntry); - synchronizer.kvWriteEntry(newEntry); - } - if (!fromPeer) { - synchronizer.notifyInsert(newEntry); - } + fileSerializer.insert(newEntry); + synchronizer.notifyInsert(newEntry); } - public void update(CommentEntry newEntry, boolean fromPeer) throws IOException { + public void update(CommentEntry newEntry) throws IOException { CommentEntry trustedEntry = comments.update(newEntry); - if (isHost) { - fileSerializer.update(trustedEntry); - synchronizer.kvWriteEntry(trustedEntry); - } - if (!fromPeer) { - synchronizer.notifyUpdate(trustedEntry); - } + fileSerializer.update(trustedEntry); + synchronizer.notifyUpdate(newEntry); } } diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/persist/FileSerializer.java b/common/src/main/java/cn/zbx1425/worldcomment/data/persist/FileSerializer.java index 16d1d34..a76f18f 100644 --- a/common/src/main/java/cn/zbx1425/worldcomment/data/persist/FileSerializer.java +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/persist/FileSerializer.java @@ -63,6 +63,17 @@ public void insert(CommentEntry newEntry) throws IOException { } } + public void cover(CommentEntry newEntry, boolean append) throws IOException { + try { + Files.createDirectory(getLevelPath(newEntry.level)); + } catch (FileAlreadyExistsException ignored) { } + + Path targetFile = getLevelRegionPath(newEntry.level, newEntry.region); + try (FileOutputStream oStream = new FileOutputStream(targetFile.toFile(), append)) { + newEntry.writeFileStream(oStream); + } + } + public void update(CommentEntry existingEntry) throws IOException { assert existingEntry.fileOffset > 0; Path targetFile = getLevelRegionPath(existingEntry.level, existingEntry.region); diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/sync/Command.java b/common/src/main/java/cn/zbx1425/worldcomment/data/sync/Command.java index e50412a..82f96f4 100644 --- a/common/src/main/java/cn/zbx1425/worldcomment/data/sync/Command.java +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/sync/Command.java @@ -21,4 +21,25 @@ public static String Updated(String ID) { return "UPDATED#"+ID; } + public static String Delete(String ID) { + return "DELETE#"+ID; + } + + public static long IsUpdated(String Command) { + if (!Command.startsWith("UPDATED#")) { + return 0; + }; + + return Long.parseLong(Command.substring(8)); + } + + public static long IsUpdate(String Command) { + if (!Command.startsWith("UPDATE#")) { + return 0; + }; + + return Long.parseLong(Command.substring(7)); + } + + } diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/sync/RedisChannelInterface.java b/common/src/main/java/cn/zbx1425/worldcomment/data/sync/RedisChannelInterface.java index 83ed684..640eca9 100644 --- a/common/src/main/java/cn/zbx1425/worldcomment/data/sync/RedisChannelInterface.java +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/sync/RedisChannelInterface.java @@ -8,6 +8,7 @@ import java.util.Map; +//Todo: Notice that this class is COMPLETELY SYNC currently public class RedisChannelInterface { public RedisChannelMessageQueue Queue = new RedisChannelMessageQueue(); private final StatefulRedisPubSubConnection channel; @@ -41,12 +42,20 @@ public void publish(String Channel, String Data) { this.channel.sync().publish(Channel, Data); } + public void del(String Key) { + this.instance.sync().del(Key); + } + + public void hdel(String Key, String Field) { + this.instance.sync().hdel(Key, Field); + } + public synchronized void recvChannel(String[] Channels) { if (StartedListening) { this.stop(); } - RedisPubSubAdapter adapter = new RedisPubSubAdapter<>() { + /*RedisPubSubAdapter adapter = new RedisPubSubAdapter<>() { @Override public void message(String channel, String message) { Queue.append(channel, message); @@ -63,7 +72,7 @@ public void message(String channel, String message) { Thread.onSpinWait(); } - this.channel.removeListener(adapter); + this.channel.removeListener(adapter);*/ } public void stop() { diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/sync/RedisChannelMessageQueue.java b/common/src/main/java/cn/zbx1425/worldcomment/data/sync/RedisChannelMessageQueue.java index 135be91..b9c4628 100644 --- a/common/src/main/java/cn/zbx1425/worldcomment/data/sync/RedisChannelMessageQueue.java +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/sync/RedisChannelMessageQueue.java @@ -13,6 +13,12 @@ public synchronized void append(String Channel, String Data) { } public synchronized String next() { + if (CachedMessage.isEmpty()) { + return ""; + } + + + String ret = CachedMessage.get(0); CachedMessage.remove(0); return ret; diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/sync/RedisSynchronizer.java b/common/src/main/java/cn/zbx1425/worldcomment/data/sync/RedisSynchronizer.java index b3fc1e0..088209c 100644 --- a/common/src/main/java/cn/zbx1425/worldcomment/data/sync/RedisSynchronizer.java +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/sync/RedisSynchronizer.java @@ -1,154 +1,248 @@ -package cn.zbx1425.worldcomment.data.sync; - -import cn.zbx1425.worldcomment.data.CommentEntry; -import cn.zbx1425.worldcomment.data.persist.FileSerializer; -import io.netty.buffer.Unpooled; -import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; -import net.minecraft.network.FriendlyByteBuf; -import net.minecraft.resources.ResourceLocation; - -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.file.FileAlreadyExistsException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.HashMap; -import java.util.Map; -import java.util.stream.Stream; - -public class RedisSynchronizer implements Synchronizer { - - private final RedisChannelInterface redis; - - private final FileSerializer Serializer; - - private final boolean host; - public RedisSynchronizer(String URI, boolean host, Path persist) { - this.redis = new RedisChannelInterface(URI); - this.host = host; - this.Serializer = new FileSerializer(persist) - - receiver hook = new receiver(); - hook.start(); - } - - @Override - public void sync(Path path) throws IOException { - if (this.host) { - this.upload(path); - } else { - this.fetchAll(path); - } - } - - - @Override - public void update(CommentEntry entry, Path targetFile) throws IOException { - try (RandomAccessFile oStream = new RandomAccessFile(targetFile.toFile(), "rw")) { - entry.updateInFile(oStream); - } - - - if (this.host) { - this.redis.hset( - Command.DataKey( - Command.ALL_DATA_ID - ), - new HashMap(){{ - put(String.valueOf(entry.id), entry.toString()); - }}); - - - this.redis.publish( - Command.COMMAND_CHANNEL, - Command.Updated(String.valueOf(entry.id))); - - } else { - this.redis.set(Command.DataKey( - String.valueOf(entry.id)), - entry.toString()); - - this.redis.publish( - Command.COMMAND_CHANNEL, - Command.Update(String.valueOf(entry.id))); - } - } - - public void kvWriteAll(Long2ObjectSortedMap all) throws IOException { - - - this.redis.hset( - Command.DataKey( - Command.ALL_DATA_ID - ), - pendingUpload - ); - - } - - public Map fetchAll(Path dirPath) throws IOException { - Map exist = this.redis.hgetall(Command.DataKey( - Command.ALL_DATA_ID - )); - - try { - Files.createDirectories(dirPath); - } catch (FileAlreadyExistsException ignored) { - - } - try (Stream levelFiles = Files.list(dirPath)) { - for (Path levelPath : levelFiles.toList()) { - ResourceLocation dimension = new ResourceLocation(levelPath.getFileName().toString().replace("+", ":")); - try (Stream files = Files.list(levelPath)) { - for (Path file : files.toList()) { - String[] fileNameParts = file.getFileName().toString().split("\\."); - if (fileNameParts.length != 4 || !fileNameParts[3].equals("bin")) continue; - byte[] data = Files.readAllBytes(file); - FriendlyByteBuf src = new FriendlyByteBuf(Unpooled.wrappedBuffer(data)); - while (src.readerIndex() < data.length) { - CommentEntry entry = new CommentEntry(dimension, src, false); - exist.remove(String.valueOf(entry.id)); - } - } - } - } - } - - exist.forEach((id, entry) -> { - CommentEntry commentEntry = new CommentEntry(); - this.update(commentEntry, getLevelRegionPath(commentEntry.level, commentEntry.region)); - }); - - - //cache cleaned or some other situation - if (false) { - sendFetchCommand(); - } - } - - private void sendFetchCommand() { - this.redis.publish( - Command.COMMAND_CHANNEL, - Command.Request( - Command.ALL_DATA_ID - )); - } - - public class receiver extends Thread { - @Override - public void run() { - redis.recvChannel(new String[]{Command.COMMAND_CHANNEL}); - - //todo: set queue handler will be better? - while (true) { - String command = redis.Queue.next(); - - if (command.) { - continue; - } - } - - } - } - -} +package cn.zbx1425.worldcomment.data.sync; + +import cn.zbx1425.worldcomment.data.CommentCache; +import cn.zbx1425.worldcomment.data.CommentEntry; +import cn.zbx1425.worldcomment.data.persist.FileSerializer; +import com.mojang.datafixers.TypeRewriteRule; +import io.netty.buffer.Unpooled; +import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; +import net.minecraft.network.FriendlyByteBuf; +import net.minecraft.resources.ResourceLocation; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +public class RedisSynchronizer implements Synchronizer { + + private final RedisChannelInterface redis; + + private final FileSerializer Serializer; + + private final boolean host; + + public RedisSynchronizer(String URI, Path persist, boolean host) { + this.redis = new RedisChannelInterface(URI); + this.Serializer = new FileSerializer(persist); + this.host = host; + + receiver hook = new receiver(); + hook.start(); + } + + @Override + public void kvWriteAll(Long2ObjectSortedMap all) { + HashMap data = new HashMap<>(); + + for (CommentEntry entry : all.values()) { + data.put(String.valueOf(entry.id), entry.toJson()); + } + + if (data.isEmpty()) { + return; + } + + this.redis.hset( + Command.DataKey( + Command.ALL_DATA_ID + ), + data + ); + + } + + + @Override + public void kvWriteEntry(CommentEntry trustedEntry) { + HashMap data = new HashMap<>(); + data.put(String.valueOf(trustedEntry.id), trustedEntry.toJson()); + + if (trustedEntry.deleted) { + this.redis.hdel( + Command.DataKey( + Command.ALL_DATA_ID + ), + String.valueOf(trustedEntry.id) + ); + return; + } + + + this.redis.hset( + Command.DataKey( + Command.ALL_DATA_ID + ), + data + ); + } + + public void notifyUpdated(CommentEntry trustedEntry) { + redis.publish( + Command.COMMAND_CHANNEL, + Command.Updated( + String.valueOf(trustedEntry.id) + ) + ); + } + + @Override + public void notifyUpdate(CommentEntry trustedEntry) { + if (!host) { + redis.set( + Command.DataKey( + String.valueOf(trustedEntry.id) + ), + trustedEntry.toJson()); + + redis.publish( + Command.COMMAND_CHANNEL, + Command.Update( + String.valueOf(trustedEntry.id) + ) + ); + + return; + } + + kvWriteEntry(trustedEntry); + notifyUpdated(trustedEntry); + + } + + @Override + public void notifyInsert(CommentEntry newEntry) { + if (!host) { + redis.set( + Command.DataKey( + String.valueOf(newEntry.id) + ), + newEntry.toJson()); + + + redis.publish( + Command.COMMAND_CHANNEL, + Command.Update( + String.valueOf(newEntry.id) + ) + ); + + return; + } + + kvWriteEntry(newEntry); + notifyUpdated(newEntry); + + } + + @Override + public void kvReadAllInto(CommentCache comments) throws IOException { + Map data = redis.hgetall( + Command.DataKey( + Command.ALL_DATA_ID + ) + ); + + for (String entry : data.values()) { + CommentEntry comment = CommentEntry.fromJson(entry); + Serializer.cover(comment, false); + } + } + + //Client action only + private void onUpdated(long id) throws IOException { + Map rawMap = redis.hgetall( + Command.DataKey( + Command.ALL_DATA_ID + ) + ); + + String raw = rawMap.get(String.valueOf(id)); + if (raw.isEmpty()) { + return; + } + + CommentEntry comment = CommentEntry.fromJson(raw); + + Serializer.cover(comment, true); + + } + + //Host action only + private void onUpdate(long id) throws IOException { + String raw = redis.get( + Command.DataKey( + String.valueOf(id) + ) + ); + + CommentEntry comment = CommentEntry.fromJson(raw); + + Serializer.cover(comment, true); + + kvWriteEntry(comment); + + redis.del( + Command.DataKey( + String.valueOf(id) + )); + + + + redis.publish( + Command.COMMAND_CHANNEL, + Command.Updated( + String.valueOf(id) + ) + ); + } + + public class receiver extends Thread { + @Override + public void run() { + redis.recvChannel(new String[]{Command.COMMAND_CHANNEL}); + + //todo: set queue handler will be better? + while (true) { + String command = redis.Queue.next(); + + if (command.isEmpty()) { + continue; + } + + if (!host) { + long id = Command.IsUpdated(command); + if (id == 0) { + continue; + } + + + try { + onUpdated(id); + } catch (IOException e) { + //Todo: do something + } + } else { + long id = Command.IsUpdate(command); + if (id == 0) { + continue; + } + + try { + onUpdate(id); + } catch (IOException e) { + //Todo: do something + } + } + + } + + } + } + +} diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/sync/SingletonSynchronizer.java b/common/src/main/java/cn/zbx1425/worldcomment/data/sync/SingletonSynchronizer.java index 60c9ece..777fbfe 100644 --- a/common/src/main/java/cn/zbx1425/worldcomment/data/sync/SingletonSynchronizer.java +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/sync/SingletonSynchronizer.java @@ -1,7 +1,9 @@ package cn.zbx1425.worldcomment.data.sync; +import cn.zbx1425.worldcomment.data.CommentCache; import cn.zbx1425.worldcomment.data.CommentEntry; import cn.zbx1425.worldcomment.data.persist.FileSerializer; +import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; import java.io.File; import java.io.IOException; @@ -18,12 +20,27 @@ public SingletonSynchronizer(Path persist) { } @Override - public void sync(Path path) throws IOException { - //do nothing + public void kvWriteEntry(CommentEntry trustedEntry) { + + } + + @Override + public void notifyUpdate(CommentEntry trustedEntry) { + + } + + @Override + public void notifyInsert(CommentEntry newEntry) { + + } + + @Override + public void kvReadAllInto(CommentCache comments) { + } @Override - public void update(CommentEntry entry, Path targetFile) throws IOException { + public void kvWriteAll(Long2ObjectSortedMap timeIndex) { } } diff --git a/common/src/main/java/cn/zbx1425/worldcomment/data/sync/Synchronizer.java b/common/src/main/java/cn/zbx1425/worldcomment/data/sync/Synchronizer.java index f812c05..fac26b7 100644 --- a/common/src/main/java/cn/zbx1425/worldcomment/data/sync/Synchronizer.java +++ b/common/src/main/java/cn/zbx1425/worldcomment/data/sync/Synchronizer.java @@ -1,13 +1,21 @@ package cn.zbx1425.worldcomment.data.sync; +import cn.zbx1425.worldcomment.data.CommentCache; import cn.zbx1425.worldcomment.data.CommentEntry; +import it.unimi.dsi.fastutil.longs.Long2ObjectSortedMap; import java.io.IOException; import java.nio.file.Path; public interface Synchronizer { - void sync(Path path) throws IOException; + void kvWriteEntry(CommentEntry trustedEntry); - void update(CommentEntry newEntry, Path targetFile) throws IOException; + void notifyUpdate(CommentEntry trustedEntry); + + void notifyInsert(CommentEntry newEntry); + + void kvReadAllInto(CommentCache comments) throws IOException; + + void kvWriteAll(Long2ObjectSortedMap timeIndex); } diff --git a/common/src/main/java/cn/zbx1425/worldcomment/mixin/CreativeModeTabsAccessor.java b/common/src/main/java/cn/zbx1425/worldcomment/mixin/CreativeModeTabsAccessor.java new file mode 100644 index 0000000..efea7e3 --- /dev/null +++ b/common/src/main/java/cn/zbx1425/worldcomment/mixin/CreativeModeTabsAccessor.java @@ -0,0 +1,16 @@ +package cn.zbx1425.worldcomment.mixin; + +import net.minecraft.resources.ResourceKey; +import net.minecraft.world.item.CreativeModeTab; +import net.minecraft.world.item.CreativeModeTabs; +import org.spongepowered.asm.mixin.Mixin; +import org.spongepowered.asm.mixin.gen.Accessor; + +@Mixin(CreativeModeTabs.class) +public interface CreativeModeTabsAccessor { + + @Accessor + static ResourceKey getTOOLS_AND_UTILITIES() { + throw new AssertionError(); + } +} diff --git a/common/src/main/resources/worldcomment.mixins.json b/common/src/main/resources/worldcomment.mixins.json index 15baca7..3a2dd74 100644 --- a/common/src/main/resources/worldcomment.mixins.json +++ b/common/src/main/resources/worldcomment.mixins.json @@ -11,5 +11,8 @@ ], "injectors": { "defaultRequire": 1 - } + }, + "mixins": [ + "CreativeModeTabsAccessor" + ] } diff --git a/forge/build.gradle b/forge/build.gradle index 9dcea39..2c3e948 100644 --- a/forge/build.gradle +++ b/forge/build.gradle @@ -25,6 +25,8 @@ dependencies { forge "net.minecraftforge:forge:${rootProject.forge_version}" modApi "${rootProject.architectury_id}:architectury-forge:${rootProject.architectury_version}" + forgeRuntimeLibrary 'io.lettuce:lettuce-core:6.2.3.RELEASE' + common(project(path: ":common", configuration: "namedElements")) { transitive false } shadowCommon(project(path: ":common", configuration: "transformProductionForge")) { transitive = false } }