Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pre: 还没写完 #1

Merged
merged 1 commit into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions common/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
dependencies {
modImplementation "net.fabricmc:fabric-loader:${rootProject.fabric_loader_version}"
modApi "${rootProject.architectury_id}:architectury:${rootProject.architectury_version}"

implementation 'io.lettuce:lettuce-core:6.2.3.RELEASE'
}

architectury {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package cn.zbx1425.third_party.data.redis;

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.async.RedisPubSubAsyncCommands;

import java.util.Map;

public class RedisChannelInterface {
public RedisChannelMessageQueue Queue = new RedisChannelMessageQueue();
private final StatefulRedisPubSubConnection<String, String> channel;

private final StatefulRedisConnection<String, String> instance;

private volatile boolean StartedListening = false;

public RedisChannelInterface(String URI) {
this.channel = RedisClient.create(URI).connectPubSub();
this.instance = RedisClient.create(URI).connect();
}

public String get(String Key) {
return this.instance.sync().get(Key);
}

public void set(String Key, String Value) {
this.instance.sync().set(Key, Value);
}

public void hset(String Key, Map<String, String> Value) {
this.instance.sync().hset(Key, Value);
}

public Map<String, String> hgetall(String Key) {
return this.instance.sync().hgetall(Key);
}

public void publish(String Channel, String Data) {
this.channel.sync().publish(Channel, Data);
}

public synchronized void recvChannel(String[] Channels) {
if (StartedListening) {
this.stop();
}

RedisPubSubAdapter<String, String> adapter = new RedisPubSubAdapter<>() {
@Override
public void message(String channel, String message) {
Queue.append(channel, message);
}
};

RedisPubSubAsyncCommands<String, String> command = this.channel.async();

command.subscribe(Channels);

this.channel.addListener(adapter);

while (StartedListening) {
Thread.onSpinWait();
}

this.channel.removeListener(adapter);
}

public void stop() {
this.StartedListening = false;
}

public void close() {
this.channel.close();
this.instance.close();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package cn.zbx1425.third_party.data.redis;

import java.util.ArrayList;
import java.util.List;

//Todo: not implemented, even not a queue
public class RedisChannelMessageQueue {
private List<String> CachedMessage = new ArrayList<String>();
public RedisChannelMessageQueue() {}

public synchronized void append(String Channel, String Data) {
CachedMessage.add(Channel+"#"+Data);
}

public synchronized String next() {
String ret = CachedMessage.get(0);
CachedMessage.remove(0);
return ret;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cn.zbx1425.worldcomment.data.persist;

import cn.zbx1425.worldcomment.data.CommentEntry;
import cn.zbx1425.worldcomment.data.synchronizer.*;
import io.netty.buffer.Unpooled;
import it.unimi.dsi.fastutil.longs.*;
import it.unimi.dsi.fastutil.objects.ObjectIterator;
Expand All @@ -25,15 +26,24 @@ public class CommentTable {
Map<UUID, List<CommentEntry>> playerIndex = new HashMap<>();
Long2ObjectSortedMap<CommentEntry> timeIndex = new Long2ObjectAVLTreeMap<>(Comparator.reverseOrder());

public CommentTable(Database db) {
private final Synchronizer synchronizer;

public CommentTable(Database db, Synchronizer sync) {
this.db = db;
this.synchronizer = sync;

try {
this.synchronizer.sync(db.basePath.resolve("region"));
} catch (IOException e) {
//Todo: seems need to do something here
}

}

public void load() throws IOException {
regionIndex.clear();
playerIndex.clear();
timeIndex.clear();
if (db.isHost) {
try {
Files.createDirectories(db.basePath.resolve("region"));
} catch (FileAlreadyExistsException ignored) {
Expand All @@ -53,9 +63,6 @@ public void load() throws IOException {
}
}
}
} else {

}
}

public void loadRegion(ResourceLocation dimension, long region, byte[] data, boolean fromFile) {
Expand Down Expand Up @@ -130,10 +137,8 @@ public void insert(CommentEntry newEntry) throws IOException {

}
}
Path targetFile = getLevelRegionPath(newEntry.level, newEntry.region);
try (FileOutputStream oStream = new FileOutputStream(targetFile.toFile(), true)) {
newEntry.writeFileStream(oStream);
}

this.synchronizer.update(newEntry, getLevelRegionPath(newEntry.level, newEntry.region));
}
regionIndex.get(newEntry.level)
.computeIfAbsent(newEntry.region.toLong(), ignored -> new ArrayList<>())
Expand All @@ -151,14 +156,12 @@ public void update(CommentEntry newEntry) throws IOException {
if (regionData == null) return;
for (CommentEntry existingEntry : regionData) {
if (existingEntry.id == newEntry.id) {
//Todo: refactor this
existingEntry.deleted = newEntry.deleted;
existingEntry.like = newEntry.like;
assert existingEntry.fileOffset > 0;
if (db.isHost) {
Path targetFile = getLevelRegionPath(newEntry.level, newEntry.region);
try (RandomAccessFile oStream = new RandomAccessFile(targetFile.toFile(), "rw")) {
existingEntry.updateInFile(oStream);
}
this.synchronizer.update(existingEntry, getLevelRegionPath(newEntry.level, newEntry.region));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cn.zbx1425.worldcomment.data.persist;

import cn.zbx1425.worldcomment.data.Snowflake;
import cn.zbx1425.worldcomment.data.synchronizer.SingletonSynchronizer;
import net.minecraft.server.MinecraftServer;
import net.minecraft.world.level.storage.LevelResource;

Expand All @@ -21,7 +22,7 @@ public class Database {
public Database(MinecraftServer server) {
this.server = server;
this.basePath = Path.of(server.getWorldPath(LevelResource.ROOT).toString(), "world-comment");
comments = new CommentTable(this);
comments = new CommentTable(this, new SingletonSynchronizer());
}

public void load() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package cn.zbx1425.worldcomment.data.synchronizer;

public class Command {
public static final String COMMAND_CHANNEL = "WORLD_COMMENT_COMMAND_CHANNEL";


public static String DataKey(String ID) {
if (ID.isEmpty()) {
return "WORLD_COMMENT_DATA#ALL";
}

return "WORLD_COMMENT_DATA#"+ID;
}

public static String Request(String ID) {
if (ID.isEmpty()) {
return "REQUEST#ALL";
}

return "REQUEST#"+ID;
}

public static String Update(String ID) {
return "UPDATE#"+ID;
}

public static String Updated(String ID) {
return "UPDATED#"+ID;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package cn.zbx1425.worldcomment.data.synchronizer;

import cn.zbx1425.third_party.data.redis.RedisChannelInterface;
import cn.zbx1425.worldcomment.data.CommentEntry;
import io.netty.buffer.Unpooled;
import net.minecraft.network.FriendlyByteBuf;
import net.minecraft.resources.ResourceLocation;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

public class RedisSynchronizer implements Synchronizer {

private final RedisChannelInterface redis;

private final boolean host;
public RedisSynchronizer(String URI, boolean host) {
this.redis = new RedisChannelInterface(URI);
this.host = host;
}

@Override
public void sync(Path path) throws IOException {
if (this.host) {
this.upload(path);
} else {
this.fetchAll(path);
}
}


@Override
public void update(CommentEntry entry, Path targetFile) throws IOException {
try (RandomAccessFile oStream = new RandomAccessFile(targetFile.toFile(), "rw")) {
entry.updateInFile(oStream);
}


if (this.host) {
this.redis.hset(
Command.DataKey(""),
new HashMap<String, String>(){{
put(String.valueOf(entry.id), entry.toString());
}});


this.redis.publish(
Command.COMMAND_CHANNEL,
Command.Updated(String.valueOf(entry.id)));

} else {
this.redis.set(Command.DataKey(
String.valueOf(entry.id)),
entry.toString());

this.redis.publish(
Command.COMMAND_CHANNEL,
Command.Update(String.valueOf(entry.id)));
}
}

private void upload(Path dirPath) throws IOException {
HashMap<String, String> pendingUpload = new HashMap<>();

try {
Files.createDirectories(dirPath);
} catch (FileAlreadyExistsException ignored) {

}
try (Stream<Path> levelFiles = Files.list(dirPath)) {
for (Path levelPath : levelFiles.toList()) {
ResourceLocation dimension = new ResourceLocation(levelPath.getFileName().toString().replace("+", ":"));
try (Stream<Path> files = Files.list(levelPath)) {
for (Path file : files.toList()) {
String[] fileNameParts = file.getFileName().toString().split("\\.");
if (fileNameParts.length != 4 || !fileNameParts[3].equals("bin")) continue;
byte[] data = Files.readAllBytes(file);
FriendlyByteBuf src = new FriendlyByteBuf(Unpooled.wrappedBuffer(data));
while (src.readerIndex() < data.length) {
CommentEntry entry = new CommentEntry(dimension, src, false);
pendingUpload.put(String.valueOf(entry.id), entry.toString());
}
}
}
}
}

this.redis.hset(
Command.DataKey(""),
pendingUpload
);

}

private Map<String, String> fetchAll(Path dirPath) throws IOException {
Map<String, String> exist = this.redis.hgetall(Command.DataKey(""));

try {
Files.createDirectories(dirPath);
} catch (FileAlreadyExistsException ignored) {

}
try (Stream<Path> levelFiles = Files.list(dirPath)) {
for (Path levelPath : levelFiles.toList()) {
ResourceLocation dimension = new ResourceLocation(levelPath.getFileName().toString().replace("+", ":"));
try (Stream<Path> files = Files.list(levelPath)) {
for (Path file : files.toList()) {
String[] fileNameParts = file.getFileName().toString().split("\\.");
if (fileNameParts.length != 4 || !fileNameParts[3].equals("bin")) continue;
byte[] data = Files.readAllBytes(file);
FriendlyByteBuf src = new FriendlyByteBuf(Unpooled.wrappedBuffer(data));
while (src.readerIndex() < data.length) {
CommentEntry entry = new CommentEntry(dimension, src, false);
exist.remove(String.valueOf(entry.id));
}
}
}
}
}

exist.forEach((id, entry) -> {
CommentEntry commentEntry = new CommentEntry();
this.update(commentEntry, getLevelRegionPath(commentEntry.level, commentEntry.region));
});


//cache cleaned or some other situation
if (false) {
sendFetchCommand();
}
}

private void sendFetchCommand() {
this.redis.publish(
Command.COMMAND_CHANNEL,
Command.Request("")
);
}

public class receiver extends Thread {
@Override
public void run() {
redis.recvChannel(new String[]{Command.COMMAND_CHANNEL});

//todo: set queue handler will be better?
while (true) {
String command = redis.Queue.next();

if (command.) {

Check failure on line 155 in common/src/main/java/cn/zbx1425/worldcomment/data/synchronizer/RedisSynchronizer.java

View workflow job for this annotation

GitHub Actions / build (1.20.1)

[Task :common:compileJava] <identifier> expected if (command.) { ^
continue;
}
}

}
}

}
Loading
Loading