Skip to content
This repository was archived by the owner on Sep 21, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions Ambrosia/Ambrosia.sln
Original file line number Diff line number Diff line change
Expand Up @@ -3,41 +3,45 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.27004.2006
MinimumVisualStudioVersion = 10.0.40219.1
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "adv-file-ops", "adv-file-ops\adv-file-ops.vcxproj", "{5852AC33-6B01-44F5-BAF3-2AAF796E8449}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{0BEADEF6-C937-465D-814B-726C3E2A22BA}"
ProjectSection(SolutionItems) = preProject
nuget.config = nuget.config
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ImmortalCoordinator", "..\ImmortalCoordinator\ImmortalCoordinator.csproj", "{5C94C516-377C-4113-8C5F-DF4A016D1B3A}"
ProjectSection(ProjectDependencies) = postProject
{5852AC33-6B01-44F5-BAF3-2AAF796E8449} = {5852AC33-6B01-44F5-BAF3-2AAF796E8449}
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Ambrosia", "Ambrosia\Ambrosia.csproj", "{F704AE0A-C37B-4D30-B9ED-0C76C62D66EC}"
ProjectSection(ProjectDependencies) = postProject
{5852AC33-6B01-44F5-BAF3-2AAF796E8449} = {5852AC33-6B01-44F5-BAF3-2AAF796E8449}
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CRA.ClientLibrary", "..\CRA\src\CRA.ClientLibrary\CRA.ClientLibrary.csproj", "{D1198E24-4E02-4586-832A-14E6035009B0}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Debug|x64 = Debug|x64
Release|Any CPU = Release|Any CPU
Release|x64 = Release|x64
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{5852AC33-6B01-44F5-BAF3-2AAF796E8449}.Debug|x64.ActiveCfg = Release|x64
{5852AC33-6B01-44F5-BAF3-2AAF796E8449}.Debug|x64.Build.0 = Release|x64
{5852AC33-6B01-44F5-BAF3-2AAF796E8449}.Release|x64.ActiveCfg = Release|x64
{5852AC33-6B01-44F5-BAF3-2AAF796E8449}.Release|x64.Build.0 = Release|x64
{5C94C516-377C-4113-8C5F-DF4A016D1B3A}.Debug|Any CPU.ActiveCfg = Debug|x64
{5C94C516-377C-4113-8C5F-DF4A016D1B3A}.Debug|x64.ActiveCfg = Debug|x64
{5C94C516-377C-4113-8C5F-DF4A016D1B3A}.Debug|x64.Build.0 = Debug|x64
{5C94C516-377C-4113-8C5F-DF4A016D1B3A}.Release|Any CPU.ActiveCfg = Release|x64
{5C94C516-377C-4113-8C5F-DF4A016D1B3A}.Release|x64.ActiveCfg = Release|x64
{5C94C516-377C-4113-8C5F-DF4A016D1B3A}.Release|x64.Build.0 = Release|x64
{F704AE0A-C37B-4D30-B9ED-0C76C62D66EC}.Debug|Any CPU.ActiveCfg = Debug|x64
{F704AE0A-C37B-4D30-B9ED-0C76C62D66EC}.Debug|x64.ActiveCfg = Debug|x64
{F704AE0A-C37B-4D30-B9ED-0C76C62D66EC}.Debug|x64.Build.0 = Debug|x64
{F704AE0A-C37B-4D30-B9ED-0C76C62D66EC}.Release|Any CPU.ActiveCfg = Release|x64
{F704AE0A-C37B-4D30-B9ED-0C76C62D66EC}.Release|x64.ActiveCfg = Release|x64
{F704AE0A-C37B-4D30-B9ED-0C76C62D66EC}.Release|x64.Build.0 = Release|x64
{D1198E24-4E02-4586-832A-14E6035009B0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{D1198E24-4E02-4586-832A-14E6035009B0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D1198E24-4E02-4586-832A-14E6035009B0}.Debug|x64.ActiveCfg = Debug|Any CPU
{D1198E24-4E02-4586-832A-14E6035009B0}.Debug|x64.Build.0 = Debug|Any CPU
{D1198E24-4E02-4586-832A-14E6035009B0}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D1198E24-4E02-4586-832A-14E6035009B0}.Release|Any CPU.Build.0 = Release|Any CPU
{D1198E24-4E02-4586-832A-14E6035009B0}.Release|x64.ActiveCfg = Release|Any CPU
{D1198E24-4E02-4586-832A-14E6035009B0}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
1,501 changes: 1,244 additions & 257 deletions Ambrosia/Ambrosia/Program.cs

Large diffs are not rendered by default.

152 changes: 152 additions & 0 deletions Ambrosia/Ambrosia/Serializer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Ambrosia
{
public class Serializer
{
private const int messageTypeSize = 1;

public static void SerializeAncestorMessage(Stream stream, byte messageType, long[] ancestors)
{
var numAncestors = ancestors.Length;
var messageSize = messageTypeSize + StreamCommunicator.LongSize(numAncestors);
foreach (var ancestor in ancestors)
{
messageSize += StreamCommunicator.LongSize(ancestor);
}

// Write message size
stream.WriteInt(messageSize);
// Write message type
stream.WriteByte(messageType);
// Write number of ancestors
stream.WriteInt(numAncestors);
// Write ancestors
foreach(var ancestor in ancestors)
{
stream.WriteLong(ancestor);
}
}

public static async Task<long[]> DeserializeAncestorMessageAsync(Stream stream, CancellationToken ct)
{
var inputFlexBuffer = new FlexReadBuffer();
await FlexReadBuffer.DeserializeAsync(stream, inputFlexBuffer, ct);
var sizeBytes = inputFlexBuffer.LengthLength;
// Get the seqNo of the replay/filter point
var offset = messageTypeSize + sizeBytes;
var numAncestors = StreamCommunicator.ReadBufferedInt(inputFlexBuffer.Buffer, offset);
offset += StreamCommunicator.IntSize(numAncestors);
var ancestors = new long[numAncestors];
for (int i = 0; i < numAncestors; i++)
{
ancestors[i] = StreamCommunicator.ReadBufferedLong(inputFlexBuffer.Buffer, offset);
offset += StreamCommunicator.LongSize(ancestors[i]);
}
return ancestors;
}

public static void SerializeReplayMessage(Stream stream,
byte messageType,
long lastProcessedID,
long lastProcessedReplayableID,
ConcurrentDictionary<long, ConcurrentDictionary<long, Tuple<long, long>>> ancestorsToIDs)
{
var dictCount = ancestorsToIDs.Count;
var messageSize = messageTypeSize + StreamCommunicator.LongSize(lastProcessedID) +
StreamCommunicator.LongSize(lastProcessedReplayableID) +
StreamCommunicator.LongSize(dictCount);

foreach (var peerID in ancestorsToIDs.Keys)
{
messageSize += StreamCommunicator.LongSize(peerID);
messageSize += StreamCommunicator.LongSize(ancestorsToIDs[peerID].Count);
foreach (var shardID in ancestorsToIDs[peerID].Keys)
{
messageSize += StreamCommunicator.LongSize(shardID);
messageSize += StreamCommunicator.LongSize(ancestorsToIDs[peerID][shardID].Item1 + 1);
messageSize += StreamCommunicator.LongSize(ancestorsToIDs[peerID][shardID].Item2 + 1);
}
}

// Write message size
stream.WriteInt(messageSize);
// Write message type
stream.WriteByte(messageType);
// Write the output filter seqNo for the other side
stream.WriteLong(lastProcessedID);
stream.WriteLong(lastProcessedReplayableID);

// For the sharded case, send replay values for other shards
stream.WriteInt(dictCount);
foreach (var peerID in ancestorsToIDs.Keys)
{
stream.WriteLong(peerID);
stream.WriteInt(ancestorsToIDs[peerID].Count);
foreach (var shardID in ancestorsToIDs[peerID].Keys)
{
stream.WriteLong(shardID);
stream.WriteLong(ancestorsToIDs[peerID][shardID].Item1);
stream.WriteLong(ancestorsToIDs[peerID][shardID].Item2);
}
}
}

public static async Task<Tuple<long, long, ConcurrentDictionary<long, ConcurrentDictionary<long, Tuple<long, long>>>>> DeserializeReplayMessageAsync(Stream stream, CancellationToken ct)
{
var inputFlexBuffer = new FlexReadBuffer();
await FlexReadBuffer.DeserializeAsync(stream, inputFlexBuffer, ct);
var sizeBytes = inputFlexBuffer.LengthLength;
// Get the seqNo of the replay/filter point
var offset = messageTypeSize + sizeBytes;
var lastProcessedID = StreamCommunicator.ReadBufferedLong(inputFlexBuffer.Buffer, offset);
offset += StreamCommunicator.LongSize(lastProcessedID);
var lastProcessedReplayableID = StreamCommunicator.ReadBufferedLong(inputFlexBuffer.Buffer, offset);
offset += StreamCommunicator.LongSize(lastProcessedReplayableID);
var numPeers = StreamCommunicator.ReadBufferedInt(inputFlexBuffer.Buffer, offset);
offset += StreamCommunicator.IntSize(numPeers);
var ancestorsToIDs = new ConcurrentDictionary<long, ConcurrentDictionary<long, Tuple<long, long>>>();
for (int i = 0; i < numPeers; i++)
{
long peerID = StreamCommunicator.ReadBufferedLong(inputFlexBuffer.Buffer, offset);
offset += StreamCommunicator.LongSize(peerID);
int numShards = StreamCommunicator.ReadBufferedInt(inputFlexBuffer.Buffer, offset);
offset += StreamCommunicator.IntSize(numShards);
ancestorsToIDs[peerID] = new ConcurrentDictionary<long, Tuple<long, long>>();
for (int j = 0; j < numShards; j++)
{
long shardID = StreamCommunicator.ReadBufferedLong(inputFlexBuffer.Buffer, offset);
offset += StreamCommunicator.LongSize(shardID);
long shardLastProcessedID = StreamCommunicator.ReadBufferedLong(inputFlexBuffer.Buffer, offset);
offset += StreamCommunicator.LongSize(shardLastProcessedID);
long shardLastProcessedReplayableID = StreamCommunicator.ReadBufferedLong(inputFlexBuffer.Buffer, offset);
offset += StreamCommunicator.LongSize(shardLastProcessedReplayableID);
ancestorsToIDs[peerID][shardID] = new Tuple<long, long>(shardLastProcessedID, shardLastProcessedReplayableID);
}
}
inputFlexBuffer.ResetBuffer();
return Tuple.Create(lastProcessedID, lastProcessedReplayableID, ancestorsToIDs);
}

public static void SerializeShardTrimMessage(Stream stream, byte messageType)
{
var messageSize = messageTypeSize;

// Write message size
stream.WriteInt(messageSize);

// Write message type
stream.WriteByte(messageType);
}

public static async Task DeserializeShardTrimMessageAsync(Stream stream, CancellationToken ct)
{
var inputFlexBuffer = new FlexReadBuffer();
await FlexReadBuffer.DeserializeAsync(stream, inputFlexBuffer, ct);
}
}
}
27 changes: 27 additions & 0 deletions Ambrosia/AmbrosiaTests/AmbrosiaTests.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework>

<IsPackable>false</IsPackable>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<PlatformTarget>x64</PlatformTarget>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
<PlatformTarget>x64</PlatformTarget>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.9.0" />
<PackageReference Include="MSTest.TestAdapter" Version="1.3.2" />
<PackageReference Include="MSTest.TestFramework" Version="1.3.2" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Ambrosia\Ambrosia.csproj" />
</ItemGroup>

</Project>
100 changes: 100 additions & 0 deletions Ambrosia/AmbrosiaTests/SerializerTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Ambrosia;
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;

namespace AmbrosiaTests
{
[TestClass]
public class SerializerTests
{
[TestMethod]
public void TestAncestorMessage()
{
long[] expectedAncestors = new long[3] { 3, 17, 39 };
using (var stream = new MemoryStream())
{
Serializer.SerializeAncestorMessage(
stream,
AmbrosiaRuntime.ancestorByte,
expectedAncestors
);
stream.Flush();
stream.Position = 0;
var actualAncestors = Serializer.DeserializeAncestorMessageAsync(stream, new CancellationToken()).GetAwaiter().GetResult();
CollectionAssert.AreEqual(expectedAncestors, actualAncestors);
}
}

[TestMethod]
public void TestNonShardReplayMessage()
{
long expectedLastProcessedID = 14;
long expectedLastProcessedReplayableID = 9;
var ancestorsToIDs = new ConcurrentDictionary<long, ConcurrentDictionary<long, Tuple<long, long>>>();

using (var stream = new MemoryStream())
{
Serializer.SerializeReplayMessage(
stream,
AmbrosiaRuntime.replayFromByte,
expectedLastProcessedID,
expectedLastProcessedReplayableID,
ancestorsToIDs
);
stream.Flush();
stream.Position = 0;
var result = Serializer.DeserializeReplayMessageAsync(stream, new CancellationToken()).GetAwaiter().GetResult();
Assert.AreEqual(expectedLastProcessedID, result.Item1);
Assert.AreEqual(expectedLastProcessedReplayableID, result.Item2);
var actualShardToLastID = result.Item3;
Assert.AreEqual(actualShardToLastID.Count, 0);
}
}

[TestMethod]
public void TestShardReplayMessage()
{
long expectedLastProcessedID = 14;
long expectedLastProcessedReplayableID = 9;
var ancestorsToIDs = new ConcurrentDictionary<long, ConcurrentDictionary<long, Tuple<long, long>>>();
ancestorsToIDs[2] = new ConcurrentDictionary<long, Tuple<long, long>>();
ancestorsToIDs[2][1] = new Tuple<long, long>(11, 7);
ancestorsToIDs[2][5] = new Tuple<long, long>(9, 4);
ancestorsToIDs[4] = new ConcurrentDictionary<long, Tuple<long, long>>();
ancestorsToIDs[4][3] = new Tuple<long, long>(8, 6);
ancestorsToIDs[4][7] = new Tuple<long, long>(10, 5);

using (var stream = new MemoryStream())
{
Serializer.SerializeReplayMessage(
stream,
AmbrosiaRuntime.replayFromByte,
expectedLastProcessedID,
expectedLastProcessedReplayableID,
ancestorsToIDs
);
stream.Flush();
stream.Position = 0;
var result = Serializer.DeserializeReplayMessageAsync(stream, new CancellationToken()).GetAwaiter().GetResult();
Assert.AreEqual(expectedLastProcessedID, result.Item1);
Assert.AreEqual(expectedLastProcessedReplayableID, result.Item2);
var actualShardToLastID = result.Item3;
Assert.AreEqual(actualShardToLastID.Count, ancestorsToIDs.Count);

foreach (var peerID in ancestorsToIDs.Keys)
{
Assert.IsTrue(actualShardToLastID.ContainsKey(peerID));
Assert.AreEqual(actualShardToLastID[peerID].Count, ancestorsToIDs[peerID].Count);
foreach (var shardID in ancestorsToIDs[peerID].Keys)
{
Assert.IsTrue(actualShardToLastID[peerID].ContainsKey(shardID));
Assert.AreEqual(actualShardToLastID[peerID][shardID], ancestorsToIDs[peerID][shardID]);
}
}
}
}
}
}
1 change: 1 addition & 0 deletions AmbrosiaTest/AmbrosiaTest/AmbrosiaTest.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
<Compile Include="AMB_UnitTest.cs" />
<Compile Include="MTF_Test.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Shard_UnitTest.cs" />
<Compile Include="Utilities.cs" />
</ItemGroup>
<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The CRA instance appears to be down. Restart it and this vertex will be instantiated automatically
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
The CRA instance appears to be down. Restart it and this vertex will be instantiated automatically
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bytes per RPC Throughput (GB/sec)
*X* 1024 0.0046468462919506
Service Received 1024 MB so far
Bytes received: 1073741824
DONE
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bytes per RPC Throughput (GB/sec)
*X* 1024 0.0178702141848639
Service Received 1024 MB so far
Bytes received: 1073741824
DONE
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
*X* At checkpoint, received 0 messages
*X* Becoming a primary now
*X* Server in Entry Point
*X* At checkpoint, received 969264 messages
Received 1024 MB so far
Bytes received: 1073741824
DONE
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
*X* Server in Entry Point
Received 1024 MB so far
Bytes received: 1073741824
DONE
Loading