Skip to content

Commit

Permalink
Merge branch 'distributed-dev' of https://github.com/cute-rui/world-c…
Browse files Browse the repository at this point in the history
  • Loading branch information
zbx1425 committed Aug 2, 2023
2 parents 710ecef + 09ebdda commit d984175
Show file tree
Hide file tree
Showing 9 changed files with 349 additions and 14 deletions.
2 changes: 2 additions & 0 deletions common/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
dependencies {
modImplementation "net.fabricmc:fabric-loader:${rootProject.fabric_loader_version}"
modApi "${rootProject.architectury_id}:architectury:${rootProject.architectury_version}"

implementation 'io.lettuce:lettuce-core:6.2.3.RELEASE'
}

architectury {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package cn.zbx1425.third_party.data.redis;

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;

import java.util.Map;

public class RedisChannelInterface {
public RedisChannelMessageQueue Queue = new RedisChannelMessageQueue();
private final StatefulRedisPubSubConnection<String, String> channel;

private final StatefulRedisConnection<String, String> instance;

private volatile boolean StartedListening = false;

public RedisChannelInterface(String URI) {
this.channel = RedisClient.create(URI).connectPubSub();
this.instance = RedisClient.create(URI).connect();
}

public String get(String Key) {
return this.instance.sync().get(Key);
}

public void set(String Key, String Value) {
this.instance.sync().set(Key, Value);
}

public void hset(String Key, Map<String, String> Value) {
this.instance.sync().hset(Key, Value);
}

public Map<String, String> hgetall(String Key) {
return this.instance.sync().hgetall(Key);
}

public void publish(String Channel, String Data) {
this.channel.sync().publish(Channel, Data);
}

public synchronized void recvChannel(String[] Channels) {
if (StartedListening) {
this.stop();
}

RedisPubSubAdapter<String, String> adapter = new RedisPubSubAdapter<>() {
@Override
public void message(String channel, String message) {
Queue.append(channel, message);
}
};

RedisPubSubAsyncCommands<String, String> command = this.channel.async();

command.subscribe(Channels);

this.channel.addListener(adapter);

while (StartedListening) {
Thread.onSpinWait();
}

this.channel.removeListener(adapter);
}

public void stop() {
this.StartedListening = false;
}

public void close() {
this.channel.close();
this.instance.close();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package cn.zbx1425.third_party.data.redis;

import java.util.ArrayList;
import java.util.List;

//Todo: not implemented, even not a queue
public class RedisChannelMessageQueue {
private List<String> CachedMessage = new ArrayList<String>();
public RedisChannelMessageQueue() {}

public synchronized void append(String Channel, String Data) {
CachedMessage.add(Channel+"#"+Data);
}

public synchronized String next() {
String ret = CachedMessage.get(0);
CachedMessage.remove(0);
return ret;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cn.zbx1425.worldcomment.data.persist;

import cn.zbx1425.worldcomment.data.CommentEntry;
import cn.zbx1425.worldcomment.data.synchronizer.*;
import io.netty.buffer.Unpooled;
import it.unimi.dsi.fastutil.longs.*;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
Expand All @@ -25,15 +26,24 @@ public class CommentTable {
Map<UUID, List<CommentEntry>> playerIndex = new HashMap<>();
Long2ObjectSortedMap<CommentEntry> timeIndex = new Long2ObjectAVLTreeMap<>(Comparator.reverseOrder());

public CommentTable(Database db) {
private final Synchronizer synchronizer;

public CommentTable(Database db, Synchronizer sync) {
this.db = db;
this.synchronizer = sync;

try {
this.synchronizer.sync(db.basePath.resolve("region"));
} catch (IOException e) {
//Todo: seems need to do something here
}

}

public void load() throws IOException {
regionIndex.clear();
playerIndex.clear();
timeIndex.clear();
if (db.isHost) {
try {
Files.createDirectories(db.basePath.resolve("region"));
} catch (FileAlreadyExistsException ignored) {
Expand All @@ -53,9 +63,6 @@ public void load() throws IOException {
}
}
}
} else {

}
}

public void loadRegion(ResourceLocation dimension, long region, byte[] data, boolean fromFile) {
Expand Down Expand Up @@ -130,10 +137,8 @@ public void insert(CommentEntry newEntry) throws IOException {

}
}
Path targetFile = getLevelRegionPath(newEntry.level, newEntry.region);
try (FileOutputStream oStream = new FileOutputStream(targetFile.toFile(), true)) {
newEntry.writeFileStream(oStream);
}

this.synchronizer.update(newEntry, getLevelRegionPath(newEntry.level, newEntry.region));
}
regionIndex.get(newEntry.level)
.computeIfAbsent(newEntry.region.toLong(), ignored -> new ArrayList<>())
Expand All @@ -151,14 +156,12 @@ public void update(CommentEntry newEntry) throws IOException {
if (regionData == null) return;
for (CommentEntry existingEntry : regionData) {
if (existingEntry.id == newEntry.id) {
//Todo: refactor this
existingEntry.deleted = newEntry.deleted;
existingEntry.like = newEntry.like;
assert existingEntry.fileOffset > 0;
if (db.isHost) {
Path targetFile = getLevelRegionPath(newEntry.level, newEntry.region);
try (RandomAccessFile oStream = new RandomAccessFile(targetFile.toFile(), "rw")) {
existingEntry.updateInFile(oStream);
}
this.synchronizer.update(existingEntry, getLevelRegionPath(newEntry.level, newEntry.region));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cn.zbx1425.worldcomment.data.persist;

import cn.zbx1425.worldcomment.data.Snowflake;
import cn.zbx1425.worldcomment.data.synchronizer.SingletonSynchronizer;
import net.minecraft.server.MinecraftServer;
import net.minecraft.world.level.storage.LevelResource;

Expand All @@ -21,7 +22,7 @@ public class Database {
public Database(MinecraftServer server) {
this.server = server;
this.basePath = Path.of(server.getWorldPath(LevelResource.ROOT).toString(), "world-comment");
comments = new CommentTable(this);
comments = new CommentTable(this, new SingletonSynchronizer());
}

public void load() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package cn.zbx1425.worldcomment.data.synchronizer;

public class Command {
public static final String COMMAND_CHANNEL = "WORLD_COMMENT_COMMAND_CHANNEL";


public static String DataKey(String ID) {
if (ID.isEmpty()) {
return "WORLD_COMMENT_DATA#ALL";
}

return "WORLD_COMMENT_DATA#"+ID;
}

public static String Request(String ID) {
if (ID.isEmpty()) {
return "REQUEST#ALL";
}

return "REQUEST#"+ID;
}

public static String Update(String ID) {
return "UPDATE#"+ID;
}

public static String Updated(String ID) {
return "UPDATED#"+ID;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package cn.zbx1425.worldcomment.data.synchronizer;

import cn.zbx1425.third_party.data.redis.RedisChannelInterface;
import cn.zbx1425.worldcomment.data.CommentEntry;
import io.netty.buffer.Unpooled;
import net.minecraft.network.FriendlyByteBuf;
import net.minecraft.resources.ResourceLocation;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

public class RedisSynchronizer implements Synchronizer {

private final RedisChannelInterface redis;

private final boolean host;
public RedisSynchronizer(String URI, boolean host) {
this.redis = new RedisChannelInterface(URI);
this.host = host;
}

@Override
public void sync(Path path) throws IOException {
if (this.host) {
this.upload(path);
} else {
this.fetchAll(path);
}
}


@Override
public void update(CommentEntry entry, Path targetFile) throws IOException {
try (RandomAccessFile oStream = new RandomAccessFile(targetFile.toFile(), "rw")) {
entry.updateInFile(oStream);
}


if (this.host) {
this.redis.hset(
Command.DataKey(""),
new HashMap<String, String>(){{
put(String.valueOf(entry.id), entry.toString());
}});


this.redis.publish(
Command.COMMAND_CHANNEL,
Command.Updated(String.valueOf(entry.id)));

} else {
this.redis.set(Command.DataKey(
String.valueOf(entry.id)),
entry.toString());

this.redis.publish(
Command.COMMAND_CHANNEL,
Command.Update(String.valueOf(entry.id)));
}
}

private void upload(Path dirPath) throws IOException {
HashMap<String, String> pendingUpload = new HashMap<>();

try {
Files.createDirectories(dirPath);
} catch (FileAlreadyExistsException ignored) {

}
try (Stream<Path> levelFiles = Files.list(dirPath)) {
for (Path levelPath : levelFiles.toList()) {
ResourceLocation dimension = new ResourceLocation(levelPath.getFileName().toString().replace("+", ":"));
try (Stream<Path> files = Files.list(levelPath)) {
for (Path file : files.toList()) {
String[] fileNameParts = file.getFileName().toString().split("\\.");
if (fileNameParts.length != 4 || !fileNameParts[3].equals("bin")) continue;
byte[] data = Files.readAllBytes(file);
FriendlyByteBuf src = new FriendlyByteBuf(Unpooled.wrappedBuffer(data));
while (src.readerIndex() < data.length) {
CommentEntry entry = new CommentEntry(dimension, src, false);
pendingUpload.put(String.valueOf(entry.id), entry.toString());
}
}
}
}
}

this.redis.hset(
Command.DataKey(""),
pendingUpload
);

}

private Map<String, String> fetchAll(Path dirPath) throws IOException {
Map<String, String> exist = this.redis.hgetall(Command.DataKey(""));

try {
Files.createDirectories(dirPath);
} catch (FileAlreadyExistsException ignored) {

}
try (Stream<Path> levelFiles = Files.list(dirPath)) {
for (Path levelPath : levelFiles.toList()) {
ResourceLocation dimension = new ResourceLocation(levelPath.getFileName().toString().replace("+", ":"));
try (Stream<Path> files = Files.list(levelPath)) {
for (Path file : files.toList()) {
String[] fileNameParts = file.getFileName().toString().split("\\.");
if (fileNameParts.length != 4 || !fileNameParts[3].equals("bin")) continue;
byte[] data = Files.readAllBytes(file);
FriendlyByteBuf src = new FriendlyByteBuf(Unpooled.wrappedBuffer(data));
while (src.readerIndex() < data.length) {
CommentEntry entry = new CommentEntry(dimension, src, false);
exist.remove(String.valueOf(entry.id));
}
}
}
}
}

exist.forEach((id, entry) -> {
CommentEntry commentEntry = new CommentEntry();
this.update(commentEntry, getLevelRegionPath(commentEntry.level, commentEntry.region));
});


//cache cleaned or some other situation
if (false) {
sendFetchCommand();
}
}

private void sendFetchCommand() {
this.redis.publish(
Command.COMMAND_CHANNEL,
Command.Request("")
);
}

public class receiver extends Thread {
@Override
public void run() {
redis.recvChannel(new String[]{Command.COMMAND_CHANNEL});

//todo: set queue handler will be better?
while (true) {
String command = redis.Queue.next();

if (command.) {

Check failure on line 155 in common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/RedisSynchronizer.java

View workflow job for this annotation

GitHub Actions / build (1.20.1)

[Task :common:compileJava] <identifier> expected if (command.) { ^
continue;
}
}

}
}

}
Loading

0 comments on commit d984175

Please sign in to comment.