Replies: 1 comment 4 replies
-
As stated previously, log entry is based on DTO concept represented by
Let me provide a basic example of the log entry which holds a dictionary where keys and values are represented by strings. public readonly struct MyLogEntry : IRaftLogEntry
{
public const short Prefix = 1;
private readonly Dictionary<string, string> payload;
public MyLogEntry(Dictionary<string, string> payload, long term)
{
Term = term;
Timestamp = DateTimeOffset.Now;
this.payload = payload;
}
public long Term { get; }
public DateTimeOffset Timestamp { get; }
long? IDataTransferObject.Length // optional implementation, may return null
{
get
{
// compute length of the serialized data, in bytes
long result = Count; // 4 bytes for count
for (var (key, value) in payload)
{
result += Encoding.UTF8,GetByteCount(key) + Encoding.UTF8.GetByteCount(value);
}
return result;
}
}
public async ValueTask WriteToAsync<TWriter>(TWriter writer, CancellationToken token)
where TWriter : notnull, IAsyncBinaryWriter
{
// write prefix to recognize this type of the log entry
await wrtier.WriteInt16Async(Prefix, true, token);
// write the number of entries
await writer.WriteInt32Async(payload.Count, true, token);
// write the entries
var context = new EncodingContext(Encoding.UTF8, true);
foreach (var (key, value) in payload)
{
await writer.WriteAsync(key.AsMemory(), context, StringLengthEncoding.Plain, token);
await writer.WriteAsync(key.AsMemory(), context, StringLengthEncoding.Plain, token);
}
}
} Now we have custom log entry that wraps the dictionary and serialization logic for it. Each custom log entry must be distinguishable for other types. In this example I'm using a prefix represented by 16-bit signed integer. The custom log entry can be instantiated and added to the log using one of the
public async ValueTask<Dictionary<string, string>> DecodeAsync<TReader>(TReader reader, CancellationToken token);
where TReader : notnull, IAsyncBinaryReader
{
var count = await reader.ReadInt32Async(true, token);
var result = new Dictionary<string, string>(count);
// deserialize entries
var context = new DecodingContext(Encoding.UTF8, true);
while (count-- > 0)
{
string key = await reader.ReadStringAsync(StringLengthEncoding.Plain, context, token);
string value = await reader.ReadStringAsync(StringLengthEncoding.Plain, context, token);
result.Add(key, value);
}
return result;
} Log entry metadata such as Note that the entry ID is not included in the deserialization logic. This is happening because reading of the entry prefix is a responsibility of the interpreter: public class MyInterpreter : IDataTransferObject.IDecoder<int>
{
public async ValueTask<int> DecodeAsync<TReader>(TReader reader, CancellationToken token)
where TReader : notnull, IAsyncBinaryReader
{
var prefix = await reader.ReadInt16Async(true, token);
switch (prefix)
{
case MyLogEntry.Prefix:
var dictionary = await MyLogEntry.DecodeAsync(reader, token);
// interpretation logic here
default:
throw new Exception($"Unknown command {prefix}");
}
return prefix;
}
} Now it's possible to use this interpreter inside of the class derived from public class MyPersistentState : PersistentState
{
private readonly MyInterpreter interpreter = new MyInterpreter();
private ValueTask<int> InterpretAsync<TEntry>(TEntry entry)
where TEntry : struct, IRaftLogEntry
=> entry.GetObjectDataAsync(interpreter, CancellationToken.None);
protected override async ValueTask ApplyAsync(LogEntry entry)
{
if (entry.IsSnapshot)
{
// interpret snapshot
}
else
{
await InterpretAsync(entry);
}
}
} |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
I think I need some clarification regarding the SnapshotBuilder pattern.
Here is how I understand it right now:
public override ValueTask WriteToAsync<TWriter>(TWriter writer, CancellationToken token)
. All logic needed to persist a snapshot should be placed in there.LogEntry
is passed to theApplyAsync(LogEntry entry)
method of theSnapshotBuilder
. TheSnapshotBuilder
then restores the state using the passed in entry.ApplyAsync(LogEntry entry)
method of thePersistentState
is never called with a LogEntry of typeIsSnapshot
, those only get passed to theSnapshotBuilder
.Are these assumptions correct?
Beta Was this translation helpful? Give feedback.
All reactions