diff --git a/Ambrosia/Ambrosia.sln b/Ambrosia/Ambrosia.sln index f8e79b57..da4f809f 100644 --- a/Ambrosia/Ambrosia.sln +++ b/Ambrosia/Ambrosia.sln @@ -3,41 +3,45 @@ Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio 15 VisualStudioVersion = 15.0.27004.2006 MinimumVisualStudioVersion = 10.0.40219.1 -Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "adv-file-ops", "adv-file-ops\adv-file-ops.vcxproj", "{5852AC33-6B01-44F5-BAF3-2AAF796E8449}" -EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{0BEADEF6-C937-465D-814B-726C3E2A22BA}" ProjectSection(SolutionItems) = preProject nuget.config = nuget.config EndProjectSection EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ImmortalCoordinator", "..\ImmortalCoordinator\ImmortalCoordinator.csproj", "{5C94C516-377C-4113-8C5F-DF4A016D1B3A}" - ProjectSection(ProjectDependencies) = postProject - {5852AC33-6B01-44F5-BAF3-2AAF796E8449} = {5852AC33-6B01-44F5-BAF3-2AAF796E8449} - EndProjectSection EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Ambrosia", "Ambrosia\Ambrosia.csproj", "{F704AE0A-C37B-4D30-B9ED-0C76C62D66EC}" - ProjectSection(ProjectDependencies) = postProject - {5852AC33-6B01-44F5-BAF3-2AAF796E8449} = {5852AC33-6B01-44F5-BAF3-2AAF796E8449} - EndProjectSection +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "CRA.ClientLibrary", "..\CRA\src\CRA.ClientLibrary\CRA.ClientLibrary.csproj", "{D1198E24-4E02-4586-832A-14E6035009B0}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU Debug|x64 = Debug|x64 + Release|Any CPU = Release|Any CPU Release|x64 = Release|x64 EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution - {5852AC33-6B01-44F5-BAF3-2AAF796E8449}.Debug|x64.ActiveCfg = Release|x64 - {5852AC33-6B01-44F5-BAF3-2AAF796E8449}.Debug|x64.Build.0 = Release|x64 - {5852AC33-6B01-44F5-BAF3-2AAF796E8449}.Release|x64.ActiveCfg = Release|x64 - {5852AC33-6B01-44F5-BAF3-2AAF796E8449}.Release|x64.Build.0 = Release|x64 + {5C94C516-377C-4113-8C5F-DF4A016D1B3A}.Debug|Any CPU.ActiveCfg = Debug|x64 {5C94C516-377C-4113-8C5F-DF4A016D1B3A}.Debug|x64.ActiveCfg = Debug|x64 {5C94C516-377C-4113-8C5F-DF4A016D1B3A}.Debug|x64.Build.0 = Debug|x64 + {5C94C516-377C-4113-8C5F-DF4A016D1B3A}.Release|Any CPU.ActiveCfg = Release|x64 {5C94C516-377C-4113-8C5F-DF4A016D1B3A}.Release|x64.ActiveCfg = Release|x64 {5C94C516-377C-4113-8C5F-DF4A016D1B3A}.Release|x64.Build.0 = Release|x64 + {F704AE0A-C37B-4D30-B9ED-0C76C62D66EC}.Debug|Any CPU.ActiveCfg = Debug|x64 {F704AE0A-C37B-4D30-B9ED-0C76C62D66EC}.Debug|x64.ActiveCfg = Debug|x64 {F704AE0A-C37B-4D30-B9ED-0C76C62D66EC}.Debug|x64.Build.0 = Debug|x64 + {F704AE0A-C37B-4D30-B9ED-0C76C62D66EC}.Release|Any CPU.ActiveCfg = Release|x64 {F704AE0A-C37B-4D30-B9ED-0C76C62D66EC}.Release|x64.ActiveCfg = Release|x64 {F704AE0A-C37B-4D30-B9ED-0C76C62D66EC}.Release|x64.Build.0 = Release|x64 + {D1198E24-4E02-4586-832A-14E6035009B0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {D1198E24-4E02-4586-832A-14E6035009B0}.Debug|Any CPU.Build.0 = Debug|Any CPU + {D1198E24-4E02-4586-832A-14E6035009B0}.Debug|x64.ActiveCfg = Debug|Any CPU + {D1198E24-4E02-4586-832A-14E6035009B0}.Debug|x64.Build.0 = Debug|Any CPU + {D1198E24-4E02-4586-832A-14E6035009B0}.Release|Any CPU.ActiveCfg = Release|Any CPU + {D1198E24-4E02-4586-832A-14E6035009B0}.Release|Any CPU.Build.0 = Release|Any CPU + {D1198E24-4E02-4586-832A-14E6035009B0}.Release|x64.ActiveCfg = Release|Any CPU + {D1198E24-4E02-4586-832A-14E6035009B0}.Release|x64.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/Ambrosia/Ambrosia/Program.cs b/Ambrosia/Ambrosia/Program.cs index 15d47ba9..0fd54471 100644 --- a/Ambrosia/Ambrosia/Program.cs +++ b/Ambrosia/Ambrosia/Program.cs @@ -138,10 +138,24 @@ internal static void AmbrosiaSerialize(this ConcurrentDictionary AmbrosiaDeserialize(this ConcurrentDictionary dict, LogReader readFromStream) + internal static ConcurrentDictionary AmbrosiaDeserialize(this ConcurrentDictionary dict, LogReader readFromStream, string serviceName) { var _retVal = new ConcurrentDictionary(); var dictCount = readFromStream.ReadIntFixed(); @@ -153,6 +167,22 @@ internal static ConcurrentDictionary AmbrosiaDese newRecord.LastProcessedID = seqNo; seqNo = readFromStream.ReadLongFixed(); newRecord.LastProcessedReplayableID = seqNo; + newRecord.ShardID = readFromStream.ReadLongFixed(); + Console.WriteLine("Deserialize {0} -> Shard ID {1}", myString, newRecord.ShardID); + var ancestorCount = readFromStream.ReadIntFixed(); + for (int j = 0; j < ancestorCount; j++) + { + var peerID = readFromStream.ReadLongFixed(); + var peerCount = readFromStream.ReadLongFixed(); + newRecord.AncestorsToIDs[peerID] = new ConcurrentDictionary>(); + for (int k = 0; k < peerCount; k++) + { + var shardID = readFromStream.ReadLongFixed(); + var lastProcessedID = readFromStream.ReadLongFixed(); + var lastProcessedReplayableID = readFromStream.ReadLongFixed(); + newRecord.AncestorsToIDs[peerID][shardID] = new Tuple(lastProcessedID, lastProcessedReplayableID); + } + } _retVal.TryAdd(myString, newRecord); } return _retVal; @@ -178,10 +208,11 @@ internal static void AmbrosiaSerialize(this ConcurrentDictionary AmbrosiaDeserialize(this ConcurrentDictionary dict, LogReader readFromStream, AmbrosiaRuntime thisAmbrosia) + internal static ConcurrentDictionary AmbrosiaDeserialize(this ConcurrentDictionary dict, LogReader readFromStream, AmbrosiaRuntime thisAmbrosia, string serviceName) { var _retVal = new ConcurrentDictionary(); var dictCount = readFromStream.ReadIntFixed(); @@ -193,10 +224,35 @@ internal static ConcurrentDictionary AmbrosiaDes newRecord.TrimTo = readFromStream.ReadLongFixed(); newRecord.ReplayableTrimTo = readFromStream.ReadLongFixed(); newRecord.BufferedOutput = EventBuffer.Deserialize(readFromStream, thisAmbrosia, newRecord); + newRecord.OutputTracker.AmbrosiaDeserialize(readFromStream); _retVal.TryAdd(myString, newRecord); } return _retVal; } + + internal static void AmbrosiaSerialize(this ConcurrentQueue queue, LogWriter writeToStream) + { + writeToStream.WriteIntFixed(queue.Count); + foreach (var tracker in queue) + { + writeToStream.WriteLongFixed(tracker.GlobalSeqID); + writeToStream.WriteLongFixed(tracker.OutputSeqID); + } + } + + internal static ConcurrentQueue AmbrosiaDeserialize(this ConcurrentQueue queue, LogReader readFromStream) + { + var _retVal = new ConcurrentQueue(); + var queueCount = readFromStream.ReadIntFixed(); + for (int i = 0; i < queueCount; i++) + { + var globalSeqID = readFromStream.ReadLongFixed(); + var outputSeqID = readFromStream.ReadLongFixed(); + var tracker = new OutputRecordTracker(globalSeqID, outputSeqID); + _retVal.Enqueue(tracker); + } + return _retVal; + } } // Note about this class: contention becomes significant when MaxBufferPages > ~50. This could be reduced by having page level locking. @@ -631,43 +687,70 @@ internal async Task SendAsync(Stream outputStream, return placeToStart; } + internal IEnumerator GetEnumerator() + { + return _bufferQ.GetEnumerator(); + } + + internal void Append(EventBuffer other) + { + AcquireTrimLock(2); + var bufferEnumerator = other.GetEnumerator(); + while (bufferEnumerator.MoveNext()) + { + var buffer = bufferEnumerator.Current; + var diff = buffer.HighestSeqNo - buffer.LowestSeqNo; + // Adjust sequence numbers + var writablePage = GetWritablePage(buffer.PageBytes.Length, buffer.LowestSeqNo); + writablePage.LowestSeqNo = buffer.LowestSeqNo; + writablePage.HighestSeqNo = buffer.HighestSeqNo; + writablePage.UnsentReplayableMessages += buffer.UnsentReplayableMessages; + writablePage.TotalReplayableMessages += buffer.TotalReplayableMessages; + // Copy the bytes into the page + writablePage.curLength += buffer.PageBytes.Length; + Buffer.BlockCopy(buffer.PageBytes, 0, writablePage.PageBytes, 0, buffer.PageBytes.Length); + ReleaseAppendLock(); + } + ReleaseTrimLock(); + } + internal async Task ReplayFromAsync(Stream outputStream, long firstSeqNo, bool reconnecting) { -/* if (reconnecting) - { - var bufferE = _bufferQ.GetEnumerator(); - while (bufferE.MoveNext()) - { - var curBuffer = bufferE.Current; - Debug.Assert(curBuffer.LowestSeqNo <= firstSeqNo); - int skipEvents = 0; - if (curBuffer.HighestSeqNo >= firstSeqNo) - { - // We need to send some or all of this buffer - skipEvents = (int)(Math.Max(0, firstSeqNo - curBuffer.LowestSeqNo)); - } - else - { - skipEvents = 0; - } - int bufferPos = 0; - AcquireAppendLock(2); - curBuffer.UnsentReplayableMessages = curBuffer.TotalReplayableMessages; - for (int i = 0; i < skipEvents; i++) - { - int eventSize = curBuffer.PageBytes.ReadBufferedInt(bufferPos); - var methodID = curBuffer.PageBytes.ReadBufferedInt(bufferPos + StreamCommunicator.IntSize(eventSize) + 2); - if (curBuffer.PageBytes[bufferPos + StreamCommunicator.IntSize(eventSize) + 2 + StreamCommunicator.IntSize(methodID)] != (byte)RpcTypes.RpcType.Impulse) + /* if (reconnecting) { - curBuffer.UnsentReplayableMessages--; - } - bufferPos += eventSize + StreamCommunicator.IntSize(eventSize); - } - ReleaseAppendLock(); - } - }*/ + var bufferE = _bufferQ.GetEnumerator(); + while (bufferE.MoveNext()) + { + var curBuffer = bufferE.Current; + Debug.Assert(curBuffer.LowestSeqNo <= firstSeqNo); + int skipEvents = 0; + if (curBuffer.HighestSeqNo >= firstSeqNo) + { + // We need to send some or all of this buffer + skipEvents = (int)(Math.Max(0, firstSeqNo - curBuffer.LowestSeqNo)); + } + else + { + skipEvents = 0; + } + int bufferPos = 0; + AcquireAppendLock(2); + curBuffer.UnsentReplayableMessages = curBuffer.TotalReplayableMessages; + for (int i = 0; i < skipEvents; i++) + { + int eventSize = curBuffer.PageBytes.ReadBufferedInt(bufferPos); + var methodID = curBuffer.PageBytes.ReadBufferedInt(bufferPos + StreamCommunicator.IntSize(eventSize) + 2); + if (curBuffer.PageBytes[bufferPos + StreamCommunicator.IntSize(eventSize) + 2 + StreamCommunicator.IntSize(methodID)] != (byte)RpcTypes.RpcType.Impulse) + { + curBuffer.UnsentReplayableMessages--; + } + bufferPos += eventSize + StreamCommunicator.IntSize(eventSize); + } + ReleaseAppendLock(); + } + }*/ var bufferEnumerator = _bufferQ.GetEnumerator(); // Scan through pages from head to tail looking for events to output while (bufferEnumerator.MoveNext()) @@ -681,7 +764,7 @@ internal async Task ReplayFromAsync(Stream outputStream, int bufferPos = 0; if (true) // BUGBUG We are temporarily disabling this optimization which avoids unnecessary locking as reconnecting is not a sufficient criteria: We found a case where input is arriving during reconnection where counting was getting disabled incorrectly. Further investigation is required. -// if (reconnecting) // BUGBUG We are temporarily disabling this optimization which avoids unnecessary locking as reconnecting is not a sufficient criteria: We found a case where input is arriving during reconnection where counting was getting disabled incorrectly. Further investigation is required. + // if (reconnecting) // BUGBUG We are temporarily disabling this optimization which avoids unnecessary locking as reconnecting is not a sufficient criteria: We found a case where input is arriving during reconnection where counting was getting disabled incorrectly. Further investigation is required. { // We need to reset how many replayable messages have been sent. We want to minimize the use of // this codepath because of the expensive locking, which can compete with new RPCs getting appended @@ -974,17 +1057,33 @@ internal void RebaseSeqNosInBuffer(long commitSeqNo, [DataContract] internal class InputConnectionRecord { + public ConcurrentDictionary>> AncestorsToIDs { get; set; } public NetworkStream DataConnectionStream { get; set; } public NetworkStream ControlConnectionStream { get; set; } [DataMember] public long LastProcessedID { get; set; } [DataMember] public long LastProcessedReplayableID { get; set; } + public long ShardID { get; set; } public InputConnectionRecord() { DataConnectionStream = null; LastProcessedID = 0; LastProcessedReplayableID = 0; + AncestorsToIDs = new ConcurrentDictionary>>(); + } + } + + internal class OutputRecordTracker + { + public long GlobalSeqID { get; set; } + public long OutputSeqID { get; set; } + public bool Received { get; set; } + public OutputRecordTracker(long globalSeqID, long outputSeqID) + { + GlobalSeqID = globalSeqID; + OutputSeqID = outputSeqID; + Received = false; } } @@ -1020,12 +1119,14 @@ internal class OutputConnectionRecord internal volatile bool ResettingConnection; internal object _trimLock = new object(); internal object _remoteTrimLock = new object(); + public ConcurrentQueue OutputTracker { get; set; } public OutputConnectionRecord(AmbrosiaRuntime inAmbrosia) { ReplayFrom = 0; DataWorkQ = new AsyncQueue(); ControlWorkQ = new AsyncQueue(); + OutputTracker = new ConcurrentQueue(); _sendsEnqueued = 0; TrimTo = -1; ReplayableTrimTo = -1; @@ -1040,6 +1141,40 @@ public OutputConnectionRecord(AmbrosiaRuntime inAmbrosia) WillResetConnection = inAmbrosia._createService; ConnectingAfterRestart = inAmbrosia._restartWithRecovery; } + + public void UpdateTracker() + { + try + { + var tracker = OutputTracker.First(); + var firstSeqID = tracker.OutputSeqID; + while (tracker.OutputSeqID < RemoteTrim) + { + tracker.Received = true; + OutputTracker.TryDequeue(out tracker); + tracker = OutputTracker.First(); + } + } + catch (InvalidOperationException) + { + // Queue is empty + } + } + + public void Trim(long lastProcessedID, long lastProcessedReplayableID) + { + // Must lock to atomically update due to race between ToControlStreamAsync, CheckpointAsync, and other functions + lock (_remoteTrimLock) + { + RemoteTrim = Math.Max(lastProcessedID, RemoteTrim); + RemoteTrimReplayable = Math.Max(lastProcessedReplayableID, RemoteTrimReplayable); + UpdateTracker(); + } + if (ControlWorkQ.IsEmpty) + { + ControlWorkQ.Enqueue(-2); + } + } } public class AmbrosiaRuntimeParams @@ -1057,6 +1192,9 @@ public class AmbrosiaRuntimeParams public string storageConnectionString; public long currentVersion; public long upgradeToVersion; + public long shardID; + public long[] oldShards; + public long[] newShards; } public static class AmbrosiaRuntimeParms @@ -1064,6 +1202,7 @@ public static class AmbrosiaRuntimeParms public static bool _looseAttach = false; } + public class AmbrosiaRuntime : VertexBase { #if _WINDOWS @@ -1169,7 +1308,7 @@ private string RetrieveServiceInfo(string key) // Used to hold the bytes which will go in the log. Note that two streams are passed in. The // log stream must write to durable storage and be flushable, while the second stream initiates // actual action taken after the message has been made durable. - private class Committer + internal class Committer { byte[] _buf; volatile byte[] _bufbak; @@ -1273,16 +1412,7 @@ private void SendInputWatermarks(ConcurrentDictionary uncommit outputs[kv.Key] = outputConnectionRecord; Console.WriteLine("Adding output:{0}", kv.Key); } - // Must lock to atomically update due to race with ToControlStreamAsync - lock (outputConnectionRecord._remoteTrimLock) - { - outputConnectionRecord.RemoteTrim = Math.Max(kv.Value.First, outputConnectionRecord.RemoteTrim); - outputConnectionRecord.RemoteTrimReplayable = Math.Max(kv.Value.Second, outputConnectionRecord.RemoteTrimReplayable); - } - if (outputConnectionRecord.ControlWorkQ.IsEmpty) - { - outputConnectionRecord.ControlWorkQ.Enqueue(-2); - } + outputConnectionRecord.Trim(kv.Value.First, kv.Value.Second); } } } @@ -1382,6 +1512,7 @@ private async Task Commit(byte[] buf, } catch (Exception e) { + Console.WriteLine(e.StackTrace); _myAmbrosia.OnError(5, e.Message); } _bufbak = buf; @@ -1934,6 +2065,50 @@ internal async Task AddInitialRowAsync(FlexReadBuffer serviceInitializationMessa } } + + /** + * This contains information associated with a given machine + **/ + internal class MachineState + { + public MachineState(long shardID) + { + Recovered = false; + ShardID = shardID; + } + public LogWriter CheckpointWriter { get; set; } + public Committer Committer { get; set; } + public ConcurrentDictionary Inputs { get; set; } + public long LastCommittedCheckpoint { get; set; } + public long LastLogFile { get; set; } + public AARole MyRole { get; set; } + public ConcurrentDictionary Outputs { get; set; } + public bool Recovered { get; set; } + public long ShardID { get; set; } + } + + internal void LoadAmbrosiaState(MachineState state) + { + state.CheckpointWriter = _checkpointWriter; + state.Committer = _committer; + state.Inputs = _inputs; + state.LastCommittedCheckpoint = _lastCommittedCheckpoint; + state.LastLogFile = _lastLogFile; + state.MyRole = _myRole; + state.Outputs = _outputs; + } + + internal void UpdateAmbrosiaState(MachineState state) + { + _checkpointWriter = state.CheckpointWriter; + _committer = state.Committer; + _inputs = state.Inputs; + _lastCommittedCheckpoint = state.LastCommittedCheckpoint; + _lastLogFile = state.LastLogFile; + _myRole = state.MyRole; + _outputs = state.Outputs; + } + public class AmbrosiaOutput : IAsyncVertexOutputEndpoint { AmbrosiaRuntime myRuntime; @@ -2004,8 +2179,15 @@ public async Task FromStreamAsync(Stream stream, string otherProcess, string oth } } + internal readonly object _globalOutputTrackerLock = new object(); + ConcurrentQueue _globalOutputTracker = new ConcurrentQueue(); + internal long _globalID = 0; ConcurrentDictionary _inputs; ConcurrentDictionary _outputs; + ConcurrentDictionary _serviceNames = new ConcurrentDictionary(); + // Input / output state of parent shards. This is needed for recovery. + ConcurrentDictionary _parentStates = new ConcurrentDictionary(); + ConcurrentDictionary _ancestors; internal int _localServiceReceiveFromPort; // specifiable on the command line internal int _localServiceSendToPort; // specifiable on the command line internal string _serviceName; // specifiable on the command line @@ -2019,6 +2201,9 @@ public async Task FromStreamAsync(Stream stream, string otherProcess, string oth bool _sharded; internal bool _createService; long _shardID; + Func _shardLocator; + long[] _oldShards; + long[] _newShards; bool _runningRepro; long _currentVersion; long _upgradeToVersion; @@ -2043,6 +2228,8 @@ public async Task FromStreamAsync(Stream stream, string otherProcess, string oth public const byte CountReplayableRPCBatchByte = 13; public const byte trimToByte = 14; public const byte becomingPrimaryByte = 15; + public const byte ancestorByte = 16; + public const byte shardTrimByte = 17; CRAClientLibrary _coral; @@ -2068,7 +2255,7 @@ public async Task FromStreamAsync(Stream stream, string otherProcess, string oth // true when this service is in an active/active configuration. False if set to single node bool _activeActive; - enum AARole { Primary, Secondary, Checkpointer }; + internal enum AARole { Primary, Secondary, Checkpointer, ReshardSecondary }; AARole _myRole; // Log size at which we start a new log file. This triggers a checkpoint, <= 0 if manual only checkpointing is done long _newLogTriggerSize; @@ -2079,8 +2266,9 @@ enum AARole { Primary, Secondary, Checkpointer }; // A handle to a file used for an upgrading secondary to bring down the primary and prevent primary promotion amongst secondaries. // As long as the write lock is held, no promotion can happen FileStream _killFileHandle = null; - - + // A handle to a file used to signal to other shards that this shard has finished upgrading. + // When all of these files exists, then all shards know re-sharding is complete. + FileStream _reshardFileHandle = null; const int UnexpectedError = 0; const int VersionMismatch = 1; @@ -2092,6 +2280,7 @@ enum AARole { Primary, Secondary, Checkpointer }; internal void OnError(int ErrNo, string ErrorMessage) { Console.WriteLine("FATAL ERROR " + ErrNo.ToString() + ": " + ErrorMessage); + Console.WriteLine("OH NO {0}", (DateTime.Now - new DateTime(1970, 1, 1)).TotalMilliseconds); Console.Out.Flush(); Console.Out.Flush(); _coral.KillLocalWorker(""); @@ -2206,8 +2395,8 @@ private void SetupAzureConnections() private void PrepareToRecoverOrStart() { IPAddress localIPAddress = Dns.GetHostEntry("localhost").AddressList[0]; - LogWriter.CreateDirectoryIfNotExists(_serviceLogPath + _serviceName + "_" + _currentVersion); - _logFileNameBase = Path.Combine(_serviceLogPath + _serviceName + "_" + _currentVersion, "server"); + LogWriter.CreateDirectoryIfNotExists(LogDirectory(_currentVersion)); + _logFileNameBase = LogFileNameBase(_currentVersion); SetupLocalServiceStreams(); if (!_runningRepro) { @@ -2251,16 +2440,45 @@ private async Task RecoverOrStartAsync(long checkpointToLoad = -1, CheckpointingService = false; Recovering = false; PrepareToRecoverOrStart(); + + if (_shardID < 3) + { + _createService = true; + } + if (!_runningRepro) { + Console.WriteLine("CHECKPOINT " + CheckpointName(_lastCommittedCheckpoint)); RuntimeChecksOnProcessStart(); } + + if (_sharded) + { + _ancestors = new ConcurrentDictionary(); + _ancestors[ServiceName()] = _oldShards; + } + // Determine if we are recovering - if (!_createService) + if (ServiceName() == "server-3") { Recovering = true; _restartWithRecovery = true; - await RecoverAsync(checkpointToLoad, testUpgrade); + if (_oldShards.Length > 0) + { + Console.WriteLine("HERE"); + var start = DateTime.Now; + RecoverFromShards(checkpointToLoad, testUpgrade); + var end = DateTime.Now; + Console.WriteLine("RecoverFromShards {0}", end - start); + } else + { + MachineState state = new MachineState(_shardID); + state.MyRole = AARole.Secondary; + // TODO: Need to figure out where _outputs should have been set if we replay re-sharding. + await RecoverAsync(state, checkpointToLoad, testUpgrade); + UpdateAmbrosiaState(state); + } + await PrepareToBecomePrimaryAsync(); Recovering = false; } else @@ -2269,65 +2487,80 @@ private async Task RecoverOrStartAsync(long checkpointToLoad = -1, } } - private async Task RecoverAsync(long checkpointToLoad = -1, bool testUpgrade = false) + private async Task RecoverAsync(MachineState state, long checkpointToLoad = -1, bool testUpgrade = false) { if (!_runningRepro) { // We are recovering - find the last committed checkpoint - _lastCommittedCheckpoint = long.Parse(RetrieveServiceInfo(InfoTitle("LastCommittedCheckpoint"))); + state.LastCommittedCheckpoint = long.Parse(RetrieveServiceInfo(InfoTitle("LastCommittedCheckpoint", state.ShardID))); + if (_sharded && state.MyRole != AARole.ReshardSecondary) + { + // Load our ancestor information + string ancestors = RetrieveServiceInfo(InfoTitle("Ancestors", state.ShardID)); + _ancestors[ServiceName()] = ancestors.Split(',').Select(n => long.Parse(n)).ToArray(); + } } else { // We are running a repro - _lastCommittedCheckpoint = checkpointToLoad; + state.LastCommittedCheckpoint = checkpointToLoad; } // Start from the log file associated with the last committed checkpoint - _lastLogFile = _lastCommittedCheckpoint; + state.LastLogFile = state.LastCommittedCheckpoint; if (_activeActive) { if (!_runningRepro) { // Determines the role as either secondary or checkpointer. If its a checkpointer, _commitBlobWriter holds the write lock on the last checkpoint - DetermineRole(); + DetermineRole(state); } else { // We are running a repro. Act as a secondary - _myRole = AARole.Secondary; + state.MyRole = AARole.Secondary; } } - using (LogReader checkpointStream = new LogReader(_logFileNameBase + "chkpt" + _lastCommittedCheckpoint.ToString())) + using (LogReader checkpointStream = new LogReader(CheckpointName(state.LastCommittedCheckpoint, -1, state.ShardID))) { // recover the checkpoint - Note that everything except the replay data must have been written successfully or we // won't think we have a valid checkpoint here. Since we can only be the secondary or checkpointer, the committer doesn't write to the replay log // Recover committer - _committer = new Committer(_localServiceSendToStream, _persistLogs, this, -1, checkpointStream); + state.Committer = new Committer(_localServiceSendToStream, _persistLogs, this, -1, checkpointStream); // Recover input connections - _inputs = _inputs.AmbrosiaDeserialize(checkpointStream); + state.Inputs = state.Inputs.AmbrosiaDeserialize(checkpointStream, ServiceName()); // Recover output connections - _outputs = _outputs.AmbrosiaDeserialize(checkpointStream, this); - UnbufferNonreplayableCalls(); + state.Outputs = state.Outputs.AmbrosiaDeserialize(checkpointStream, this, ServiceName()); + UnbufferNonreplayableCalls(state.Outputs); // Restore new service from checkpoint var serviceCheckpoint = new FlexReadBuffer(); FlexReadBuffer.Deserialize(checkpointStream, serviceCheckpoint); - _committer.SendCheckpointToRecoverFrom(serviceCheckpoint.Buffer, serviceCheckpoint.Length, checkpointStream); + state.Committer.SendCheckpointToRecoverFrom(serviceCheckpoint.Buffer, serviceCheckpoint.Length, checkpointStream); + if (_runningRepro) + { + // For replay, we need _outputs to be set for the local listener to work. + UpdateAmbrosiaState(state); + } } - using (LogReader replayStream = new LogReader(_logFileNameBase + "log" + _lastLogFile.ToString())) + using (LogReader replayStream = new LogReader(LogFileName(state.LastLogFile, -1, state.ShardID))) { - if (_myRole == AARole.Secondary && !_runningRepro) + if (state.MyRole == AARole.Secondary && !_runningRepro) { // If this is a secondary, set up the detector to detect when this instance becomes the primary - var t = DetectBecomingPrimaryAsync(); + var t = DetectBecomingPrimaryAsync(state); } if (testUpgrade) { // We are actually testing an upgrade. Must upgrade the service before replay - _committer.SendUpgradeRequest(); + state.Committer.SendUpgradeRequest(); } - await ReplayAsync(replayStream); + await ReplayAsync(replayStream, state); } + } + + private async Task PrepareToBecomePrimaryAsync() + { var readVersion = long.Parse(RetrieveServiceInfo(InfoTitle("CurrentVersion"))); if (_currentVersion != readVersion) { @@ -2351,13 +2584,226 @@ private async Task RecoverAsync(long checkpointToLoad = -1, bool testUpgrade = f await MoveServiceToNextLogFileAsync(); } - if (_activeActive) + if (ServiceName() == "server-2" || ServiceName() == "server-1") { // Start task to periodically check if someone's trying to upgrade (new Task(() => CheckForUpgradeAsync())).Start(); } } + private void RecoverFromShard(MachineState state, long shardID, long checkpointToLoad = -1, bool testUpgrade = false) + { + RecoverAsync(state, checkpointToLoad, testUpgrade).GetAwaiter(); + } + + private void AddAncestorInput(string key, long inputShardID, long peerID, long shardID, Tuple seq) + { + InputConnectionRecord inputConnectionRecord; + if (!_inputs.TryGetValue(key, out inputConnectionRecord)) + { + inputConnectionRecord = new InputConnectionRecord(); + inputConnectionRecord.ShardID = inputShardID; + _inputs[key] = inputConnectionRecord; + } + if (!inputConnectionRecord.AncestorsToIDs.ContainsKey(peerID)) { + inputConnectionRecord.AncestorsToIDs[peerID] = new ConcurrentDictionary>(); + } + inputConnectionRecord.AncestorsToIDs[peerID][shardID] = seq; + } + + private void AddAncestorInput(string sourceString) + { + var srcAncestors = _ancestors[sourceString]; + var dstAncestors = _ancestors[ServiceName()]; + var parts = ParseServiceName(sourceString); + var service = parts.Item1; + foreach (var srcID in srcAncestors) + { + var srcName = ServiceName(service, srcID); + InputConnectionRecord record; + if (!_inputs.TryGetValue(srcName, out record)) + { + record = new InputConnectionRecord(); + _inputs[srcName] = record; + } + // Add our records for the source ancestor inputs to the source Input. + AddAncestorInput(sourceString, parts.Item2, srcID, _shardID, new Tuple(record.LastProcessedID, record.LastProcessedReplayableID)); + // Merge the ancestors AncestorsToIDs records with the sourceString's records. + foreach (var srcID1 in record.AncestorsToIDs.Keys) + { + foreach (var destID1 in record.AncestorsToIDs[srcID1].Keys) + { + AddAncestorInput(sourceString, parts.Item2, srcID1, destID1, record.AncestorsToIDs[srcID1][destID1]); + } + } + } + } + + private void AddParentInputState(MachineState state) + { + foreach (var input in state.Inputs) + { + var record = input.Value; + // Add the parent's ancestor history + foreach (var peerID in record.AncestorsToIDs.Keys) + { + foreach (var shardID in record.AncestorsToIDs[peerID].Keys) + { + AddAncestorInput(input.Key, record.ShardID, peerID, shardID, record.AncestorsToIDs[peerID][shardID]); + } + } + // Add the parents history + AddAncestorInput(input.Key, record.ShardID, record.ShardID, state.ShardID, new Tuple(record.LastProcessedID, record.LastProcessedReplayableID)); + } + } + + private void AddParentStates(MachineState[] states) + { + foreach (var state in states) + { + AddParentInputState(state); + //AddParentOutputState(state); + } + } + + private void FinishResharding() + { + WaitForOtherShards(); + LockReshardFile(); + if (_newShards.Last() == _shardID) + { + KillParentShards(_oldShards); + } + } + + private void EstablishInputConnections(ConcurrentDictionary states) + { + Connect(ServiceName(), AmbrosiaDataOutputsName, ServiceName(), AmbrosiaDataInputsName); + Connect(ServiceName(), AmbrosiaControlOutputsName, ServiceName(), AmbrosiaControlInputsName); + Console.WriteLine("Establishing connections for " + ServiceName()); + var connectionResult1 = Connect(ServiceName(), AmbrosiaDataOutputsName, "client2-1", AmbrosiaDataInputsName); + var connectionResult2 = Connect(ServiceName(), AmbrosiaControlOutputsName, "client2-1", AmbrosiaControlInputsName); + var connectionResult3 = Connect("client2-1", AmbrosiaDataOutputsName, ServiceName(), AmbrosiaDataInputsName); + var connectionResult4 = Connect("client2-1", AmbrosiaControlOutputsName, ServiceName(), AmbrosiaControlInputsName); + if ((connectionResult1 != CRAErrorCode.Success) || (connectionResult2 != CRAErrorCode.Success) || + (connectionResult3 != CRAErrorCode.Success) || (connectionResult4 != CRAErrorCode.Success)) + { + + Console.WriteLine("Error attaching " + ServiceName() + " to " + "client2-1"); + // BUGBUG in tests. Should exit here. Fix tests then delete above line and replace with this OnError(0, "Error attaching " + _serviceName + " to " + destination); + } + connectionResult1 = Connect(ServiceName(), AmbrosiaDataOutputsName, "client1-1", AmbrosiaDataInputsName); + connectionResult2 = Connect(ServiceName(), AmbrosiaControlOutputsName, "client1-1", AmbrosiaControlInputsName); + connectionResult3 = Connect("client1-1", AmbrosiaDataOutputsName, ServiceName(), AmbrosiaDataInputsName); + connectionResult4 = Connect("client1-1", AmbrosiaControlOutputsName, ServiceName(), AmbrosiaControlInputsName); + if ((connectionResult1 != CRAErrorCode.Success) || (connectionResult2 != CRAErrorCode.Success) || + (connectionResult3 != CRAErrorCode.Success) || (connectionResult4 != CRAErrorCode.Success)) + { + + Console.WriteLine("Error attaching " + ServiceName() + " to " + "client1-1"); + // BUGBUG in tests. Should exit here. Fix tests then delete above line and replace with this OnError(0, "Error attaching " + _serviceName + " to " + destination); + } + /* foreach (var state in states.Values) + { + foreach (var kv in state.Inputs) + { + var destination = kv.Key; + if (destination == "") + { + destination = ServiceName(); + } + + List results = new List(); + Console.WriteLine("Establishing connection between {0} and {1}", ServiceName(), destination); + results.Add(Connect(ServiceName(), AmbrosiaDataOutputsName, destination, AmbrosiaDataInputsName)); + results.Add(Connect(ServiceName(), AmbrosiaControlOutputsName, destination, AmbrosiaControlInputsName)); + if (destination != ServiceName()) + { + results.Add(Connect(destination, AmbrosiaDataOutputsName, ServiceName(), AmbrosiaDataInputsName)); + results.Add(Connect(destination, AmbrosiaControlOutputsName, ServiceName(), AmbrosiaControlInputsName)); + } + + foreach (var result in results) + { + if (result != CRAErrorCode.Success) + { + Console.WriteLine("EstablishInputConnections: Error connecting to " + destination); + break; + } + } + } + }*/ + } + + private void RecoverFromShards(long checkpointToLoad = -1, bool testUpgrade = false) + { + _myRole = AARole.ReshardSecondary; + var threads = new Thread[_oldShards.Length]; + InsertOrReplaceServiceInfoRecord(InfoTitle("CurrentVersion"), _currentVersion.ToString()); + InsertOrReplaceServiceInfoRecord(InfoTitle("Ancestors"), string.Join(",", _ancestors[ServiceName()])); + + _inputs = new ConcurrentDictionary(); + _outputs = new ConcurrentDictionary(); + + EstablishInputConnections(_parentStates); + _committer = new Committer(_localServiceSendToStream, _persistLogs, this); + + for (int i = 0; i < _oldShards.Length; i++) + { + long shardID = _oldShards[i]; + MachineState state = new MachineState(shardID); + state.MyRole = AARole.ReshardSecondary; + _parentStates[shardID] = state; + if (i == 0) + { + var t = DetectBecomingPrimaryAsync(state); + } + threads[i] = new Thread(() => RecoverFromShard(state, shardID, checkpointToLoad, testUpgrade)) { IsBackground = true }; + threads[i].Start(); + } + + while (!_parentStates.Values.All(s => s.Recovered)) + { + Thread.Sleep(1000); + } + + AddParentStates(_parentStates.Values.ToArray()); + + // Wait for replay for all shards to occur + for (int i = 0; i < threads.Length; i++) + { + threads[i].Join(); + } + } + + private void WaitForOtherShards() + { + int index = -1; + for (int i = 0; i < _newShards.Length; i++) + { + if (_newShards[i] == _shardID) + { + index = i; + } + } + Debug.Assert(index != -1); + if (index > 0) + { + if (!File.Exists(ReshardFileName(_newShards[index - 1]))) + { + throw new Exception("Peer shard not done syncing."); + } + } + } + + private void KillParentShards(long[] shardIds) + { + for (int i = 0; i < shardIds.Length; i++) + { + LockKillFile(shardIds[i]); + } + } + private async Task StartAsync() { // We are starting for the first time. This is the primary @@ -2372,20 +2818,24 @@ private async Task StartAsync() _checkpointWriter = null; _committer = new Committer(_localServiceSendToStream, _persistLogs, this); - Connect(_serviceName, AmbrosiaDataOutputsName, _serviceName, AmbrosiaDataInputsName); - Connect(_serviceName, AmbrosiaControlOutputsName, _serviceName, AmbrosiaControlInputsName); + Connect(ServiceName(), AmbrosiaDataOutputsName, ServiceName(), AmbrosiaDataInputsName); + Connect(ServiceName(), AmbrosiaControlOutputsName, ServiceName(), AmbrosiaControlInputsName); await MoveServiceToNextLogFileAsync(true, true); InsertOrReplaceServiceInfoRecord(InfoTitle("CurrentVersion"), _currentVersion.ToString()); - if (_activeActive) + if (_sharded) + { + InsertOrReplaceServiceInfoRecord(InfoTitle("Ancestors"), string.Join(",", _ancestors[ServiceName()])); + } + if (_activeActive || _shardID > 0) { // Start task to periodically check if someone's trying to upgrade (new Task(() => CheckForUpgradeAsync())).Start(); } } - private void UnbufferNonreplayableCalls() + private void UnbufferNonreplayableCalls(ConcurrentDictionary outputs) { - foreach (var outputRecord in _outputs) + foreach (var outputRecord in outputs) { var newLastSeqNo = outputRecord.Value.BufferedOutput.TrimAndUnbufferNonreplayableCalls(outputRecord.Value.TrimTo, outputRecord.Value.ReplayableTrimTo); if (newLastSeqNo != -1) @@ -2397,8 +2847,8 @@ private void UnbufferNonreplayableCalls() internal void MoveServiceToUpgradeDirectory() { - LogWriter.CreateDirectoryIfNotExists(_serviceLogPath + _serviceName + "_" + _upgradeToVersion); - _logFileNameBase = Path.Combine(_serviceLogPath + _serviceName + "_" + _upgradeToVersion, "server"); + LogWriter.CreateDirectoryIfNotExists(RootDirectory(_upgradeToVersion)); + _logFileNameBase = LogFileNameBase(_upgradeToVersion); } public CRAErrorCode Connect(string fromProcessName, string fromEndpoint, string toProcessName, string toEndpoint) @@ -2411,56 +2861,114 @@ public CRAErrorCode Connect(string fromProcessName, string fromEndpoint, string return _coral.Connect(fromProcessName, fromEndpoint, toProcessName, toEndpoint); } - private LogWriter CreateNextOldVerLogFile() + private string ServiceName(string name = "", long shardID = -1) { - string newLogFileNameBaseForOldVersion = Path.Combine(_serviceLogPath + _serviceName + "_" + _currentVersion, "server"); - if (LogWriter.FileExists(newLogFileNameBaseForOldVersion + "log" + (_lastLogFile + 1).ToString())) + if (name == "") { - File.Delete(newLogFileNameBaseForOldVersion + "log" + (_lastLogFile + 1).ToString()); + name = _serviceName; } - LogWriter retVal = null; - try + + if (_sharded) { - retVal = new LogWriter(newLogFileNameBaseForOldVersion + "log" + (_lastLogFile + 1).ToString(), 1024 * 1024, 6); + if (shardID == -1) + { + shardID = _shardID; + } } - catch (Exception e) + return GetShardName(name, shardID); + } + + private Tuple ParseServiceName(string service) + { + if (service == "") { - OnError(0, "Error opening next log file:" + e.ToString()); + return new Tuple(_serviceName, _shardID); } - return retVal; + + var parts = service.Split('-'); + if (parts.Length == 1) + { + return new Tuple(parts[0], -1); + } + return new Tuple(parts[0], long.Parse(parts[1])); } - // Used to create a kill file meant to being down primaries and prevent promotion. Promotion prevention - // lasts until the returned file handle is released. - private void LockKillFile() + public static string GetShardName(string name, long shardID) { - _killFileHandle = new FileStream(_logFileNameBase + "killFile", FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read & ~FileShare.Inheritable); + if (shardID != -1) + { + name += "-" + shardID.ToString(); + } + return name; } - private void ReleaseAndTryCleanupKillFile() + private string RootDirectory(long version = -1) { - _killFileHandle.Dispose(); - _killFileHandle = null; - try + if (version == -1) { - // Try to delete the file. Someone may beat us to it. - File.Delete(_logFileNameBase + "killFile"); + if (_upgrading) + { + version = _upgradeToVersion; + } + else + { + version = _currentVersion; + } } - catch (Exception e) + + return _serviceLogPath + _serviceName + "_" + version; + } + + private string LogDirectory(long version = -1, long shardID = -1) + { + string shard = ""; + if (_sharded) { + if (shardID == -1) + { + shardID = _shardID; + } + shard = shardID.ToString(); } + + return Path.Combine(RootDirectory(version), shard); } - private LogWriter CreateNextLogFile() + private string LogFileNameBase(long version = -1, long shardID = -1) + { + return Path.Combine(LogDirectory(version, shardID), "server"); + } + + private string CheckpointName(long checkpoint, long version = -1, long shardID = -1) { - if (LogWriter.FileExists(_logFileNameBase + "log" + (_lastLogFile + 1).ToString())) + return LogFileNameBase(version, shardID) + "chkpt" + checkpoint.ToString(); + } + + private string LogFileName(long logFile, long version = -1, long shardID = -1) + { + return LogFileNameBase(version, shardID) + "log" + logFile.ToString(); + } + + private string KillFileName(long shardID = -1) + { + return LogFileNameBase(-1, shardID) + "killFile"; + } + + private string ReshardFileName(long shardID = -1) + { + return LogFileNameBase(-1, shardID) + "reshardFile"; + } + + private LogWriter CreateNextOldVerLogFile() + { + if (LogWriter.FileExists(LogFileName(_lastLogFile + 1, _currentVersion))) { - File.Delete(_logFileNameBase + "log" + (_lastLogFile + 1).ToString()); + File.Delete(LogFileName(_lastLogFile + 1, _currentVersion)); } LogWriter retVal = null; try { - retVal = new LogWriter(_logFileNameBase + "log" + (_lastLogFile + 1).ToString(), 1024 * 1024, 6); + retVal = new LogWriter(LogFileName(_lastLogFile + 1, _currentVersion), 1024 * 1024, 6); } catch (Exception e) { @@ -2469,12 +2977,65 @@ private LogWriter CreateNextLogFile() return retVal; } - private string InfoTitle(string prefix) + // Used to create a kill file meant to being down primaries and prevent promotion. Promotion prevention + // lasts until the returned file handle is released. + private void LockKillFile(long shardID = -1) { - var file = prefix; + _killFileHandle = LockFile(KillFileName(shardID)); + } + + private void LockReshardFile() + { + _reshardFileHandle = LockFile(ReshardFileName()); + } + + private FileStream LockFile(string file) + { + return new FileStream(file, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read & ~FileShare.Inheritable); + } + + private void ReleaseAndTryCleanupKillFile() + { + _killFileHandle.Dispose(); + _killFileHandle = null; + try + { + // Try to delete the file. Someone may beat us to it. + File.Delete(KillFileName()); + } + catch (Exception e) + { + } + } + + private LogWriter CreateNextLogFile() + { + if (LogWriter.FileExists(LogFileName(_lastLogFile + 1))) + { + File.Delete(LogFileName(_lastLogFile + 1)); + } + LogWriter retVal = null; + try + { + retVal = new LogWriter(LogFileName(_lastLogFile + 1), 1024 * 1024, 6); + } + catch (Exception e) + { + OnError(0, "Error opening next log file:" + e.ToString()); + } + return retVal; + } + + private string InfoTitle(string prefix, long shardID = -1) + { + var file = prefix; if (_sharded) { - file += _shardID.ToString(); + if (shardID == -1) + { + shardID = _shardID; + } + file += shardID.ToString(); } return file; } @@ -2519,39 +3080,40 @@ private async Task MoveServiceToNextLogFileAsync(bool firstStart = fa // it's after replace and moving to the next log file. Note that this will also have the effect // of shaking loose the initialization message, ensuring liveliness. await _committer.TryCommitAsync(_outputs); + TrimGlobalOutputTracker(); return oldVerLogHandle; } //============================================================================================================== // Insance compete over write permission for LOG file & CheckPoint file - private void DetermineRole() + private void DetermineRole(MachineState state) { if (_upgrading) { - _myRole = AARole.Secondary; + state.MyRole = AARole.Secondary; return; } try { // Compete for Checkpoint Write Permission - _checkpointWriter = new LogWriter(_logFileNameBase + "chkpt" + (_lastCommittedCheckpoint).ToString(), 1024 * 1024, 6, true); - _myRole = AARole.Checkpointer; // I'm a checkpointing secondary - var oldCheckpoint = _lastCommittedCheckpoint; - _lastCommittedCheckpoint = long.Parse(RetrieveServiceInfo(InfoTitle("LastCommittedCheckpoint"))); - if (oldCheckpoint != _lastCommittedCheckpoint) + state.CheckpointWriter = new LogWriter(CheckpointName(state.LastCommittedCheckpoint), 1024 * 1024, 6, true); + state.MyRole = AARole.Checkpointer; // I'm a checkpointing secondary + var oldCheckpoint = state.LastCommittedCheckpoint; + state.LastCommittedCheckpoint = long.Parse(RetrieveServiceInfo(InfoTitle("LastCommittedCheckpoint", state.ShardID))); + if (oldCheckpoint != state.LastCommittedCheckpoint) { - _checkpointWriter.Dispose(); + state.CheckpointWriter.Dispose(); throw new Exception("We got a handle on an old checkpoint. The checkpointer was alive when this instance started"); } } catch { - _checkpointWriter = null; - _myRole = AARole.Secondary; // I'm a secondary + state.CheckpointWriter = null; + state.MyRole = AARole.Secondary; // I'm a secondary } } - public async Task DetectBecomingPrimaryAsync() + internal async Task DetectBecomingPrimaryAsync(MachineState state) { // keep trying to take the write permission on LOG file // LOG write permission acquired only in case primary failed (is down) @@ -2559,10 +3121,10 @@ public async Task DetectBecomingPrimaryAsync() { try { - var oldLastLogFile = _lastLogFile; + var oldLastLogFile = state.LastLogFile; // Compete for log write permission - non destructive open for write - open for append - var lastLogFileStream = new LogWriter(_logFileNameBase + "log" + (oldLastLogFile).ToString(), 1024 * 1024, 6, true); - if (long.Parse(RetrieveServiceInfo(InfoTitle("LastLogFile"))) != oldLastLogFile) + var lastLogFileStream = new LogWriter(LogFileName(oldLastLogFile, -1, state.ShardID), 1024 * 1024, 6, true); + if (long.Parse(RetrieveServiceInfo(InfoTitle("LastLogFile", state.ShardID))) != oldLastLogFile) { // We got an old log. Try again lastLogFileStream.Dispose(); @@ -2570,7 +3132,7 @@ public async Task DetectBecomingPrimaryAsync() } // We got the lock! Set things up so we let go of the lock at the right moment // But first check if we got the lock because the version changed, in which case, we should commit suicide - var readVersion = long.Parse(RetrieveServiceInfo(InfoTitle("CurrentVersion"))); + var readVersion = long.Parse(RetrieveServiceInfo(InfoTitle("CurrentVersion", state.ShardID))); if (_currentVersion != readVersion) { @@ -2586,10 +3148,12 @@ public async Task DetectBecomingPrimaryAsync() } // Now we can really promote! - await _committer.SleepAsync(); - _committer.SwitchLogStreams(lastLogFileStream); - await _committer.WakeupAsync(); - _myRole = AARole.Primary; // this will stop and break the loop in the function replayInput_Sec() + await state.Committer.SleepAsync(); + state.Committer.SwitchLogStreams(lastLogFileStream); + await state.Committer.WakeupAsync(); + + state.MyRole = AARole.Primary; // this will stop and break the loop in the function replayInput_Sec() + Console.WriteLine("Primary {0}", (DateTime.Now - new DateTime(1970, 1, 1)).TotalMilliseconds); Console.WriteLine("\n\nNOW I'm Primary\n\n"); // if we are an upgrader : Time to release the kill file lock and cleanup. Note that since we have the log lock // everyone is prevented from promotion until we succeed or fail. @@ -2614,7 +3178,7 @@ public async Task DetectBecomingPrimaryAsync() } } - private async Task ReplayAsync(LogReader replayStream) + private async Task ReplayAsync(LogReader replayStream, MachineState state) { var tempBuf = new byte[100]; var tempBuf2 = new byte[100]; @@ -2636,7 +3200,7 @@ private async Task ReplayAsync(LogReader replayStream) replayStream.ReadAllRequiredBytes(headerBuf, 0, Committer.HeaderSize); headerBufStream.Position = 0; var commitID = headerBufStream.ReadIntFixed(); - if (commitID != _committer.CommitID) + if (commitID != state.Committer.CommitID) { throw new Exception("Committer didn't match. Must be incomplete record"); } @@ -2644,7 +3208,7 @@ private async Task ReplayAsync(LogReader replayStream) commitSize = headerBufStream.ReadIntFixed(); var checkBytes = headerBufStream.ReadLongFixed(); var writeSeqID = headerBufStream.ReadLongFixed(); - if (writeSeqID != _committer._nextWriteID) + if (writeSeqID != state.Committer._nextWriteID) { throw new Exception("Out of order page. Must be incomplete record"); } @@ -2656,7 +3220,7 @@ private async Task ReplayAsync(LogReader replayStream) } replayStream.Read(tempBuf, 0, commitSize); // Perform integrity check - long checkBytesCalc = _committer.CheckBytes(tempBuf, 0, commitSize); + long checkBytesCalc = state.Committer.CheckBytes(tempBuf, 0, commitSize); if (checkBytesCalc != checkBytes) { throw new Exception("Integrity check failed for page. Must be incomplete record"); @@ -2674,10 +3238,11 @@ private async Task ReplayAsync(LogReader replayStream) } replayStream.Read(tempBuf2, 0, inputNameSize); var inputName = Encoding.UTF8.GetString(tempBuf2, 0, inputNameSize); + var newLongPair = new LongPair(); newLongPair.First = replayStream.ReadLongFixed(); newLongPair.Second = replayStream.ReadLongFixed(); - committedInputDict[inputName] = newLongPair; + committedInputDict[inputName] = newLongPair; } // Read changes in trim to perform and reflect in _outputs watermarksToRead = replayStream.ReadInt(); @@ -2693,12 +3258,13 @@ private async Task ReplayAsync(LogReader replayStream) var inputName = Encoding.UTF8.GetString(tempBuf2, 0, inputNameSize); long seqNo = replayStream.ReadLongFixed(); trimDict[inputName] = seqNo; + } } catch { // Couldn't recover replay segment. Could be for a number of reasons. - if (!_activeActive || detectedEOL) + if (state.MyRole != AARole.ReshardSecondary && (!_activeActive || detectedEOL)) { // Leave replay and continue recovery. break; @@ -2706,40 +3272,46 @@ private async Task ReplayAsync(LogReader replayStream) if (detectedEOF) { // Move to the next log file for reading only. We may need to take a checkpoint - _lastLogFile++; + state.LastLogFile++; replayStream.Dispose(); - if (!LogWriter.FileExists(_logFileNameBase + "log" + _lastLogFile.ToString())) + if (!LogWriter.FileExists(LogFileName(state.LastLogFile, -1, state.ShardID))) { - OnError(MissingLog, "Missing log in replay " + _lastLogFile.ToString()); + OnError(MissingLog, "Missing log in replay " + state.LastLogFile.ToString()); } - replayStream = new LogReader(_logFileNameBase + "log" + _lastLogFile.ToString()); - if (_myRole == AARole.Checkpointer) + replayStream = new LogReader(LogFileName(state.LastLogFile, -1, state.ShardID)); + if (state.MyRole == AARole.Checkpointer) { // take the checkpoint associated with the beginning of the new log - await _committer.SleepAsync(); + // It's currently too disruptive to the code to pass in MachineState to + // CheckpointAsync, so we update the corresponding variables instead. + // This should be fine since the checkpointer should not replay from + // multiple logs in parallel. + UpdateAmbrosiaState(state); + _committer.SleepAsync(); _committer.QuiesceServiceWithSendCheckpointRequest(); await CheckpointAsync(); await _committer.WakeupAsync(); + LoadAmbrosiaState(state); } detectedEOF = false; continue; } - var myRoleBeforeEOLChecking = _myRole; + var myRoleBeforeEOLChecking = state.MyRole; replayStream.Position = logRecordPos; - var newLastLogFile = _lastLogFile; + var newLastLogFile = state.LastLogFile; if (_runningRepro) { - if (LogWriter.FileExists(_logFileNameBase + "log" + (_lastLogFile + 1).ToString())) + if (LogWriter.FileExists(LogFileName(state.LastLogFile + 1, -1, state.ShardID))) { // If there is a next file, then move to it - newLastLogFile = _lastLogFile + 1; + newLastLogFile = state.LastLogFile + 1; } } else { - newLastLogFile = long.Parse(RetrieveServiceInfo(InfoTitle("LastLogFile"))); + newLastLogFile = long.Parse(RetrieveServiceInfo(InfoTitle("LastLogFile", state.ShardID))); } - if (newLastLogFile > _lastLogFile) // a new log file has been written + if (newLastLogFile > state.LastLogFile) // a new log file has been written { // Someone started a new log. Try to read the last record again and then move to next file detectedEOF = true; @@ -2752,7 +3324,7 @@ private async Task ReplayAsync(LogReader replayStream) continue; } // The remaining case is that we hit the end of log, but someone is still writing to this file. Wait and try to read again, or kill the primary if we are trying to upgrade in an active/active scenario - if (_upgrading && _activeActive && _killFileHandle == null) + if ((state.MyRole == AARole.ReshardSecondary || (_upgrading && _activeActive)) && _killFileHandle == null) { // We need to write and hold the lock on the kill file. Recovery will continue until the primary dies and we have // fully processed the log. @@ -2760,7 +3332,14 @@ private async Task ReplayAsync(LogReader replayStream) { try { - LockKillFile(); + if (state.MyRole == AARole.ReshardSecondary) + { + FinishResharding(); + } + else + { + LockKillFile(); + } break; } catch (Exception e) @@ -2776,22 +3355,23 @@ private async Task ReplayAsync(LogReader replayStream) foreach (var kv in committedInputDict) { InputConnectionRecord inputConnectionRecord; - if (!_inputs.TryGetValue(kv.Key, out inputConnectionRecord)) + if (!state.Inputs.TryGetValue(kv.Key, out inputConnectionRecord)) { // Create input record and add it to the dictionary inputConnectionRecord = new InputConnectionRecord(); - _inputs[kv.Key] = inputConnectionRecord; + inputConnectionRecord.ShardID = ParseServiceName(kv.Key).Item2; + state.Inputs[kv.Key] = inputConnectionRecord; } inputConnectionRecord.LastProcessedID = kv.Value.First; inputConnectionRecord.LastProcessedReplayableID = kv.Value.Second; OutputConnectionRecord outputConnectionRecord; // this lock prevents conflict with output arriving from the local service during replay - lock (_outputs) + lock (state.Outputs) { - if (!_outputs.TryGetValue(kv.Key, out outputConnectionRecord)) + if (!state.Outputs.TryGetValue(kv.Key, out outputConnectionRecord)) { outputConnectionRecord = new OutputConnectionRecord(this); - _outputs[kv.Key] = outputConnectionRecord; + state.Outputs[kv.Key] = outputConnectionRecord; } } // this lock prevents conflict with output arriving from the local service during replay and ensures maximal cleaning @@ -2813,12 +3393,13 @@ private async Task ReplayAsync(LogReader replayStream) { OutputConnectionRecord outputConnectionRecord; // this lock prevents conflict with output arriving from the local service during replay - lock (_outputs) + lock (state.Outputs) { - if (!_outputs.TryGetValue(kv.Key, out outputConnectionRecord)) + if (!state.Outputs.TryGetValue(kv.Key, out outputConnectionRecord)) { + Console.WriteLine("OUTPUT " + ServiceName() + " " + kv.Key); outputConnectionRecord = new OutputConnectionRecord(this); - _outputs[kv.Key] = outputConnectionRecord; + state.Outputs[kv.Key] = outputConnectionRecord; } } // this lock prevents conflict with output arriving from the local service during replay and ensures maximal cleaning @@ -2832,12 +3413,13 @@ private async Task ReplayAsync(LogReader replayStream) // If this is the first replay segment, it invalidates the contents of the committer, which must be cleared. if (!clearedCommitterWrite) { - _committer.ClearNextWrite(); + state.Committer.ClearNextWrite(); clearedCommitterWrite = true; } // bump up the write ID in the committer in preparation for reading or writing the next page - _committer._nextWriteID++; + state.Committer._nextWriteID++; } + state.Recovered = true; } // Thread for listening to the local service @@ -2932,10 +3514,10 @@ void AttachTo(string destination) while (true) { Console.WriteLine("Attempting to attach", destination); - var connectionResult1 = Connect(_serviceName, AmbrosiaDataOutputsName, destination, AmbrosiaDataInputsName); - var connectionResult2 = Connect(_serviceName, AmbrosiaControlOutputsName, destination, AmbrosiaControlInputsName); - var connectionResult3 = Connect(destination, AmbrosiaDataOutputsName, _serviceName, AmbrosiaDataInputsName); - var connectionResult4 = Connect(destination, AmbrosiaControlOutputsName, _serviceName, AmbrosiaControlInputsName); + var connectionResult1 = Connect(ServiceName(), AmbrosiaDataOutputsName, destination, AmbrosiaDataInputsName); + var connectionResult2 = Connect(ServiceName(), AmbrosiaControlOutputsName, destination, AmbrosiaControlInputsName); + var connectionResult3 = Connect(destination, AmbrosiaDataOutputsName, ServiceName(), AmbrosiaDataInputsName); + var connectionResult4 = Connect(destination, AmbrosiaControlOutputsName, ServiceName(), AmbrosiaControlInputsName); if ((connectionResult1 == CRAErrorCode.Success) && (connectionResult2 == CRAErrorCode.Success) && (connectionResult3 == CRAErrorCode.Success) && (connectionResult4 == CRAErrorCode.Success)) { @@ -2946,6 +3528,27 @@ void AttachTo(string destination) } } + private string DestinationShard(string destination, long shardID = -1) + { + if (destination == "") + { + return destination; + } + + Console.WriteLine("DESTINATION " + destination); + if (_serviceNames.ContainsKey(destination)) + { + return _serviceNames[destination]; + } + + if (shardID == -1 && _sharded) + { + shardID = _shardLocator(BitConverter.ToInt32(Encoding.UTF8.GetBytes(destination), 0)); + } + + return ServiceName(destination, shardID); + } + private void ProcessSyncLocalMessage(ref FlexReadBuffer localServiceBuffer, FlexReadBuffer batchServiceBuffer) { var sizeBytes = localServiceBuffer.LengthLength; @@ -2954,6 +3557,7 @@ private void ProcessSyncLocalMessage(ref FlexReadBuffer localServiceBuffer, Flex switch (localServiceBuffer.Buffer[sizeBytes]) { case takeCheckpointByte: + // Handle take checkpoint messages - This is here for testing createCheckpointTask = new Task(new Action(MoveServiceToNextLogFileSimple)); createCheckpointTask.Start(); @@ -2971,6 +3575,7 @@ private void ProcessSyncLocalMessage(ref FlexReadBuffer localServiceBuffer, Flex case attachToByte: // Get dest string var destination = Encoding.UTF8.GetString(localServiceBuffer.Buffer, sizeBytes + 1, localServiceBuffer.Length - sizeBytes - 1); + destination = DestinationShard(destination); localServiceBuffer.ResetBuffer(); if (!_runningRepro) @@ -2982,16 +3587,16 @@ private void ProcessSyncLocalMessage(ref FlexReadBuffer localServiceBuffer, Flex } else { - Console.WriteLine("Attaching to {0}", destination); - var connectionResult1 = Connect(_serviceName, AmbrosiaDataOutputsName, destination, AmbrosiaDataInputsName); - var connectionResult2 = Connect(_serviceName, AmbrosiaControlOutputsName, destination, AmbrosiaControlInputsName); - var connectionResult3 = Connect(destination, AmbrosiaDataOutputsName, _serviceName, AmbrosiaDataInputsName); - var connectionResult4 = Connect(destination, AmbrosiaControlOutputsName, _serviceName, AmbrosiaControlInputsName); + Console.WriteLine("Attaching to {0} from {1}", destination, ServiceName()); + var connectionResult1 = Connect(ServiceName(), AmbrosiaDataOutputsName, destination, AmbrosiaDataInputsName); + var connectionResult2 = Connect(ServiceName(), AmbrosiaControlOutputsName, destination, AmbrosiaControlInputsName); + var connectionResult3 = Connect(destination, AmbrosiaDataOutputsName, ServiceName(), AmbrosiaDataInputsName); + var connectionResult4 = Connect(destination, AmbrosiaControlOutputsName, ServiceName(), AmbrosiaControlInputsName); if ((connectionResult1 != CRAErrorCode.Success) || (connectionResult2 != CRAErrorCode.Success) || (connectionResult3 != CRAErrorCode.Success) || (connectionResult4 != CRAErrorCode.Success)) { - Console.WriteLine("Error attaching " + _serviceName + " to " + destination); + Console.WriteLine("Error attaching " + ServiceName() + " to " + destination); // BUGBUG in tests. Should exit here. Fix tests then delete above line and replace with this OnError(0, "Error attaching " + _serviceName + " to " + destination); } } @@ -2999,6 +3604,7 @@ private void ProcessSyncLocalMessage(ref FlexReadBuffer localServiceBuffer, Flex break; case RPCBatchByte: + var restOfBatchOffset = sizeBytes + 1; var memStream = new MemoryStream(localServiceBuffer.Buffer, restOfBatchOffset, localServiceBuffer.Length - restOfBatchOffset); var numRPCs = memStream.ReadInt(); @@ -3022,6 +3628,7 @@ private void ProcessSyncLocalMessage(ref FlexReadBuffer localServiceBuffer, Flex break; case RPCByte: + ProcessRPC(localServiceBuffer); // Now process any pending RPC requests from the local service before going async again break; @@ -3082,21 +3689,41 @@ private void ProcessRPC(FlexReadBuffer RpcBuffer) { // Find the appropriate connection record string destination; - if (_lastShuffleDest.Length < destBytesSize) + if (_lastShuffleDest == null || _lastShuffleDest.Length < destBytesSize) { _lastShuffleDest = new byte[destBytesSize]; } Buffer.BlockCopy(RpcBuffer.Buffer, destOffset, _lastShuffleDest, 0, destBytesSize); _lastShuffleDestSize = destBytesSize; destination = Encoding.UTF8.GetString(RpcBuffer.Buffer, destOffset, destBytesSize); + destination = DestinationShard(destination); // locking to avoid conflict with stream reconnection immediately after replay and trim during replay lock (_outputs) { // During replay, the output connection won't exist if this is the first message ever and no trim record has been processed yet. if (!_outputs.TryGetValue(destination, out _shuffleOutputRecord)) { - _shuffleOutputRecord = new OutputConnectionRecord(this); - _outputs[destination] = _shuffleOutputRecord; + if (_oldShards.Length > 0) + { + // TODO: We're still replaying from potentially multiple shards. + // For now, we assume there is only one parent shard, but we need to update + // the client code to pass which shard sent this during recovery. + Debug.Assert(_parentStates.Count == 1); + MachineState parent = _parentStates.Values.First(); + + if (!parent.Outputs.TryGetValue(destination, out _shuffleOutputRecord)) + { + _shuffleOutputRecord = new OutputConnectionRecord(this); + _outputs[destination] = _shuffleOutputRecord; + } else + { + _lastShuffleDest = null; + } + } else + { + _shuffleOutputRecord = new OutputConnectionRecord(this); + _outputs[destination] = _shuffleOutputRecord; + } } } } @@ -3149,23 +3776,26 @@ private void ProcessRPC(FlexReadBuffer RpcBuffer) } } - private async Task ToDataStreamAsync(Stream writeToStream, - string destString, - CancellationToken ct) - + private async Task SyncOutputConnectionAsync(ConcurrentDictionary outputs, + string destString, + long commitSeqNo, + long commitSeqNoReplayable) { OutputConnectionRecord outputConnectionRecord; - if (destString.Equals(_serviceName)) + if (destString.Equals(ServiceName())) { destString = ""; } - lock (_outputs) + InputConnectionRecord inputConnectionRecord; + + + lock (outputs) { - if (!_outputs.TryGetValue(destString, out outputConnectionRecord)) + if (!outputs.TryGetValue(destString, out outputConnectionRecord)) { // Set up the output record for the first time and add it to the dictionary outputConnectionRecord = new OutputConnectionRecord(this); - _outputs[destString] = outputConnectionRecord; + outputs[destString] = outputConnectionRecord; Console.WriteLine("Adding output:{0}", destString); } else @@ -3173,20 +3803,14 @@ private async Task ToDataStreamAsync(Stream writeToStream, Console.WriteLine("restoring output:{0}", destString); } } + try { // Reset the output cursor if it exists outputConnectionRecord.BufferedOutput.AcquireTrimLock(2); outputConnectionRecord.placeInOutput = new EventBuffer.BuffersCursor(null, -1, 0); outputConnectionRecord.BufferedOutput.ReleaseTrimLock(); - // Process replay message - var inputFlexBuffer = new FlexReadBuffer(); - await FlexReadBuffer.DeserializeAsync(writeToStream, inputFlexBuffer, ct); - var sizeBytes = inputFlexBuffer.LengthLength; - // Get the seqNo of the replay/filter point - var commitSeqNo = StreamCommunicator.ReadBufferedLong(inputFlexBuffer.Buffer, sizeBytes + 1); - var commitSeqNoReplayable = StreamCommunicator.ReadBufferedLong(inputFlexBuffer.Buffer, sizeBytes + 1 + StreamCommunicator.LongSize(commitSeqNo)); - inputFlexBuffer.ResetBuffer(); + if (outputConnectionRecord.ConnectingAfterRestart) { // We've been through recovery (at least partially), and have scrubbed all ephemeral calls. Must now rebase @@ -3219,6 +3843,200 @@ private async Task ToDataStreamAsync(Stream writeToStream, } } outputConnectionRecord.LastSeqSentToReceiver = commitSeqNo - 1; + } + catch (Exception e) + { + // Cleanup held locks if necessary + await Task.Yield(); + var lockVal = outputConnectionRecord.BufferedOutput.ReadTrimLock(); + if (lockVal == 1 || lockVal == 2) + { + outputConnectionRecord.BufferedOutput.ReleaseTrimLock(); + } + var bufferLockVal = outputConnectionRecord.BufferedOutput.ReadAppendLock(); + if (bufferLockVal == 2) + { + outputConnectionRecord.BufferedOutput.ReleaseAppendLock(); + } + throw e; + } + return outputConnectionRecord; + } + + private OutputConnectionRecord GetOutputConnectionRecord(string destString) + { + OutputConnectionRecord record; + lock (_outputs) + { + if (!_outputs.TryGetValue(destString, out record)) + { + // Set up the output record for the first time and add it to the dictionary + record = new OutputConnectionRecord(this); + _outputs[destString] = record; + Console.WriteLine("Adding output:{0}", destString); + } + else + { + Console.WriteLine("Restoring output:{0}", destString); + } + } + return record; + } + + private void MergeOutputConnections(OutputConnectionRecord baseConnection, + OutputConnectionRecord mergeConnection, + long lastProcessedReplayableID) + { + lock (baseConnection) + { + long firstSeq = -1; + long lastSeq = -1; + lock (mergeConnection) + { + mergeConnection.BufferedOutput.Trim(lastProcessedReplayableID, ref mergeConnection.placeInOutput); + firstSeq = baseConnection.LastSeqNoFromLocalService + 1; + lastSeq = mergeConnection.BufferedOutput.AdjustFirstSeqNoTo(firstSeq); + baseConnection.LastSeqNoFromLocalService = lastSeq; + baseConnection.ResettingConnection = true; + baseConnection.BufferedOutput.Append(mergeConnection.BufferedOutput); + baseConnection.ResettingConnection = false; + } + for (var outputSeq = firstSeq; outputSeq <= lastSeq; outputSeq++) + { + OutputRecordTracker tracker; + lock (_globalOutputTrackerLock) + { + _globalID += 1; + tracker = new OutputRecordTracker(_globalID, outputSeq); + _globalOutputTracker.Enqueue(tracker); + } + baseConnection.OutputTracker.Enqueue(tracker); + } + } + } + + private OutputConnectionRecord SetupOutputConnectionRecord(string destString, ConcurrentDictionary>> ancestorsToIDs) { + string destServiceBase = _serviceName; + if (!destString.Equals(ServiceName())) + { + destServiceBase = destString.Split('-')[0]; + } + OutputConnectionRecord record = GetOutputConnectionRecord(destString); + foreach (var srcID in ancestorsToIDs.Keys) + { + ConcurrentDictionary parentOutput; + if (_parentStates.ContainsKey(srcID)) + { + parentOutput = _parentStates[srcID].Outputs; + } + else + { + parentOutput = new ConcurrentDictionary(); + } + foreach (var destID in ancestorsToIDs[srcID].Keys) + { + var lastProcessedID = ancestorsToIDs[srcID][destID].Item1; + var lastProcessedReplayableID = ancestorsToIDs[srcID][destID].Item2; + string destService = DestinationShard(destServiceBase, destID); + + // Copy the parent output to the new output record + lock (parentOutput) + { + OutputConnectionRecord parentRecord; + if (parentOutput.TryRemove(destService, out parentRecord)) + { + MergeOutputConnections(record, parentRecord, lastProcessedReplayableID); + } + } + + // Copy old output to new output record + lock (_outputs) + { + OutputConnectionRecord oldRecord; + if (destService != destString && _outputs.TryRemove(destService, out oldRecord)) + { + MergeOutputConnections(record, oldRecord, lastProcessedReplayableID); + } + } + } + } + return record; + } + + private async Task ToDataStreamAsync(Stream writeToStream, + string destString, + CancellationToken ct) + { + if (destString.Equals(ServiceName())) + { + destString = ""; + } + if (destString == "server-3") + { + _serviceNames["server-1"] = "server-3"; + _serviceNames["server-2"] = "server-3"; + _lastShuffleDest = null; + } else + + if (_sharded) + { + Serializer.SerializeAncestorMessage(writeToStream, ancestorByte, _ancestors[ServiceName()]); + } + // Process replay message + var result = await Serializer.DeserializeReplayMessageAsync(writeToStream, ct); + // Get the seqNo of the replay/filter point + var commitSeqNo = result.Item1; + var commitSeqNoReplayable = result.Item2; + var ancestorsToIDs = new ConcurrentDictionary>>(); + + if (_sharded) + { + ancestorsToIDs = result.Item3; + } + + var outputConnectionRecord = SetupOutputConnectionRecord(destString, ancestorsToIDs); + if (_sharded) + { + Serializer.SerializeShardTrimMessage(writeToStream, shardTrimByte); + } + + try + { + // Reset the output cursor if it exists + outputConnectionRecord.BufferedOutput.AcquireTrimLock(2); + outputConnectionRecord.placeInOutput = new EventBuffer.BuffersCursor(null, -1, 0); + outputConnectionRecord.BufferedOutput.ReleaseTrimLock(); + if (outputConnectionRecord.ConnectingAfterRestart) + { + // We've been through recovery (at least partially), and have scrubbed all ephemeral calls. Must now rebase + // seq nos using the markers which were sent by the listener. Must first take locks to ensure no interference + lock (outputConnectionRecord) + { + // Don't think I actually need this lock, but can't hurt and shouldn't affect perf. + outputConnectionRecord.BufferedOutput.AcquireTrimLock(2); + outputConnectionRecord.BufferedOutput.RebaseSeqNosInBuffer(commitSeqNo, commitSeqNoReplayable); + outputConnectionRecord.LastSeqNoFromLocalService += commitSeqNo - commitSeqNoReplayable; + outputConnectionRecord.ConnectingAfterRestart = false; + outputConnectionRecord.BufferedOutput.ReleaseTrimLock(); + } + } + // If recovering, make sure event replay will be filtered out + outputConnectionRecord.ReplayFrom = commitSeqNo; + + if (outputConnectionRecord.WillResetConnection) + { + // Register our immediate intent to set the connection. This unblocks output writers + outputConnectionRecord.ResettingConnection = true; + // This lock avoids interference with buffering RPCs + lock (outputConnectionRecord) + { + // If first reconnect/connect after reset, simply adjust the seq no for the first sent message to the received commit seq no + outputConnectionRecord.ResettingConnection = false; + outputConnectionRecord.LastSeqNoFromLocalService = outputConnectionRecord.BufferedOutput.AdjustFirstSeqNoTo(commitSeqNo); + outputConnectionRecord.WillResetConnection = false; + } + } + outputConnectionRecord.LastSeqSentToReceiver = commitSeqNo - 1; // Enqueue a replay send if (outputConnectionRecord._sendsEnqueued == 0) @@ -3278,25 +4096,17 @@ private async Task ToControlStreamAsync(Stream writeToStream, CancellationToken ct) { - OutputConnectionRecord outputConnectionRecord; - if (destString.Equals(_serviceName)) + if (destString.Equals(ServiceName())) { destString = ""; } - lock (_outputs) + if (destString == "server-3") { - if (!_outputs.TryGetValue(destString, out outputConnectionRecord)) - { - // Set up the output record for the first time and add it to the dictionary - outputConnectionRecord = new OutputConnectionRecord(this); - _outputs[destString] = outputConnectionRecord; - Console.WriteLine("Adding output:{0}", destString); - } - else - { - Console.WriteLine("restoring output:{0}", destString); - } + _serviceNames["server-1"] = "server-3"; + _serviceNames["server-2"] = "server-3"; } + + OutputConnectionRecord outputConnectionRecord = GetOutputConnectionRecord(destString); // Process remote trim message var inputFlexBuffer = new FlexReadBuffer(); await FlexReadBuffer.DeserializeAsync(writeToStream, inputFlexBuffer, ct); @@ -3363,18 +4173,17 @@ private async Task ToControlStreamAsync(Stream writeToStream, } private async Task SendReplayMessageAsync(Stream sendToStream, - long lastProcessedID, - long lastProcessedReplayableID, + InputConnectionRecord input, CancellationToken ct) { // Send FilterTo message to the destination command stream - // Write message size - sendToStream.WriteInt(1 + StreamCommunicator.LongSize(lastProcessedID) + StreamCommunicator.LongSize(lastProcessedReplayableID)); - // Write message type - sendToStream.WriteByte(replayFromByte); - // Write the output filter seqNo for the other side - sendToStream.WriteLong(lastProcessedID); - sendToStream.WriteLong(lastProcessedReplayableID); + Serializer.SerializeReplayMessage( + sendToStream, + replayFromByte, + input.LastProcessedID + 1, + input.LastProcessedReplayableID + 1, + input.AncestorsToIDs + ); await sendToStream.FlushAsync(ct); } @@ -3398,7 +4207,7 @@ private async Task FromDataStreamAsync(Stream readFromStream, CancellationToken ct) { InputConnectionRecord inputConnectionRecord; - if (sourceString.Equals(_serviceName)) + if (sourceString.Equals(ServiceName())) { sourceString = ""; } @@ -3406,15 +4215,36 @@ private async Task FromDataStreamAsync(Stream readFromStream, { // Create input record and add it to the dictionary inputConnectionRecord = new InputConnectionRecord(); + inputConnectionRecord.ShardID = ParseServiceName(sourceString).Item2; _inputs[sourceString] = inputConnectionRecord; Console.WriteLine("Adding input:{0}", sourceString); + if (sourceString == "server-3") + { + _serviceNames["server-1"] = "server-3"; + _serviceNames["server-2"] = "server-3"; + } } else { Console.WriteLine("restoring input:{0}", sourceString); } inputConnectionRecord.DataConnectionStream = (NetworkStream)readFromStream; - await SendReplayMessageAsync(readFromStream, inputConnectionRecord.LastProcessedID + 1, inputConnectionRecord.LastProcessedReplayableID + 1, ct); + if (_sharded) + { + // Process ancestor list + _ancestors[sourceString] = await Serializer.DeserializeAncestorMessageAsync(readFromStream, ct); + AddAncestorInput(sourceString); + // The last destination may be an ancestor shard, in which case we need to reshuffle. + _lastShuffleDestSize = 0; + Console.WriteLine("Ancestors of shard " + sourceString + " are " + string.Join(",", _ancestors[sourceString])); + } + await SendReplayMessageAsync(readFromStream, inputConnectionRecord, ct); + if (_sharded) + { + await Serializer.DeserializeShardTrimMessageAsync(readFromStream, ct); + // Clear ancestor history + _inputs[sourceString].AncestorsToIDs.Clear(); + } // Create new input task for monitoring new input Task inputTask; inputTask = InputDataListenerAsync(inputConnectionRecord, sourceString, ct); @@ -3426,7 +4256,7 @@ private async Task FromControlStreamAsync(Stream readFromStream, CancellationToken ct) { InputConnectionRecord inputConnectionRecord; - if (sourceString.Equals(_serviceName)) + if (sourceString.Equals(ServiceName())) { sourceString = ""; } @@ -3434,6 +4264,7 @@ private async Task FromControlStreamAsync(Stream readFromStream, { // Create input record and add it to the dictionary inputConnectionRecord = new InputConnectionRecord(); + inputConnectionRecord.ShardID = ParseServiceName(sourceString).Item2; _inputs[sourceString] = inputConnectionRecord; Console.WriteLine("Adding input:{0}", sourceString); } @@ -3470,6 +4301,7 @@ private async Task InputDataListenerAsync(InputConnectionRecord inputRecord, while (true) { await FlexReadBuffer.DeserializeAsync(inputRecord.DataConnectionStream, inputFlexBuffer, ct); + Console.WriteLine("Receive data from " + inputName); await ProcessInputMessageAsync(inputRecord, inputName, inputFlexBuffer); } } @@ -3624,14 +4456,14 @@ private async Task ProcessInputMessageAsync(InputConnectionRecord inputRecord, private LogWriter OpenNextCheckpointFile() { - if (LogWriter.FileExists(_logFileNameBase + "chkpt" + (_lastCommittedCheckpoint + 1).ToString())) + if (LogWriter.FileExists(CheckpointName(_lastCommittedCheckpoint + 1))) { - File.Delete(_logFileNameBase + (_lastCommittedCheckpoint + 1).ToString()); + File.Delete(CheckpointName(_lastCommittedCheckpoint + 1)); } LogWriter retVal = null; try { - retVal = new LogWriter(_logFileNameBase + "chkpt" + (_lastCommittedCheckpoint + 1).ToString(), 1024 * 1024, 6); + retVal = new LogWriter(CheckpointName(_lastCommittedCheckpoint + 1), 1024 * 1024, 6); } catch (Exception e) { @@ -3642,13 +4474,50 @@ private LogWriter OpenNextCheckpointFile() private void CleanupOldCheckpoint() { - var fileNameToDelete = _logFileNameBase + (_lastCommittedCheckpoint - 1).ToString(); + var fileNameToDelete = CheckpointName(_lastCommittedCheckpoint - 1); if (LogWriter.FileExists(fileNameToDelete)) { File.Delete(fileNameToDelete); } } + private void TrimGlobalOutputTracker() + { + lock (_globalOutputTrackerLock) + { + try + { + var tracker = _globalOutputTracker.First(); + while (tracker.Received) + { + _globalOutputTracker.TryDequeue(out tracker); + tracker = _globalOutputTracker.First(); + } + } + catch (InvalidOperationException) + { + // Queue is empty + } + + } + } + + public void TrimOutput(string key, long lastProcessedID, long lastProcessedReplayableID) + { +<<<<<<< Updated upstream +======= + Console.WriteLine("*X* Trimming {0} up to ({1}, {2})", key, lastProcessedID, lastProcessedReplayableID); +>>>>>>> Stashed changes + OutputConnectionRecord outputConnectionRecord; + if (!_outputs.TryGetValue(key, out outputConnectionRecord)) + { + outputConnectionRecord = new OutputConnectionRecord(this); + _outputs[key] = outputConnectionRecord; + } + outputConnectionRecord.Trim(lastProcessedID, lastProcessedReplayableID); + TrimGlobalOutputTracker(); + } + // This method takes a checkpoint and bumps the counter. It DOES NOT quiesce anything public async Task CheckpointAsync() { @@ -3687,22 +4556,7 @@ public async Task CheckpointAsync() // successfully written foreach (var kv in _inputs) { - OutputConnectionRecord outputConnectionRecord; - if (!_outputs.TryGetValue(kv.Key, out outputConnectionRecord)) - { - outputConnectionRecord = new OutputConnectionRecord(this); - _outputs[kv.Key] = outputConnectionRecord; - } - // Must lock to atomically update due to race with ToControlStreamAsync - lock (outputConnectionRecord._remoteTrimLock) - { - outputConnectionRecord.RemoteTrim = Math.Max(kv.Value.LastProcessedID, outputConnectionRecord.RemoteTrim); - outputConnectionRecord.RemoteTrimReplayable = Math.Max(kv.Value.LastProcessedReplayableID, outputConnectionRecord.RemoteTrimReplayable); - } - if (outputConnectionRecord.ControlWorkQ.IsEmpty) - { - outputConnectionRecord.ControlWorkQ.Enqueue(-2); - } + TrimOutput(kv.Key, kv.Value.LastProcessedID, kv.Value.LastProcessedReplayableID); } if (oldCheckpointWriter != null) @@ -3720,6 +4574,18 @@ public AmbrosiaRuntime() : base() { } + public long KeyHashToShard(long hash) + { + if (!_sharded) + { + return -1; + } + + new Exception(""); + + return 1; + } + public override void Initialize(object param) { // Workaround because of parameter type limitation in CRA @@ -3730,7 +4596,41 @@ public override void Initialize(object param) p = (AmbrosiaRuntimeParams)xmlSerializer.Deserialize(textReader); } - bool sharded = false; + _runningRepro = false; + bool sharded = p.shardID > 0; + _shardID = p.shardID; + _shardLocator = KeyHashToShard; + _oldShards = p.oldShards; + _newShards = p.newShards; + + _serviceNames["server-1"] = "server-1"; + _serviceNames["server-2"] = "server-2"; + + Initialize( + p.serviceReceiveFromPort, + p.serviceSendToPort, + p.serviceName, + p.serviceLogPath, + p.createService, + p.pauseAtStart, + p.persistLogs, + p.activeActive, + p.logTriggerSizeMB, + p.storageConnectionString, + p.currentVersion, + p.upgradeToVersion, + sharded + ); + } + + public void Initialize(object param, long[] oldShards, long shardID, Func shardMap) + { + var p = (AmbrosiaRuntimeParams)param; + bool runningRepro = true; + bool sharded = true; + _shardID = shardID; + _shardLocator = shardMap; + _oldShards = oldShards; Initialize( p.serviceReceiveFromPort, @@ -3749,18 +4649,21 @@ public override void Initialize(object param) ); } - internal void RuntimeChecksOnProcessStart() + internal void RuntimeChecksOnProcessStart(long shardID = -1) { - if (!_createService) + // When we split / merge shards, these checks will not be valid as the logs for the + // new shards will not exist yet. Instead for the sharded case, we do these checks + // when we recover from the set of old shards. + if (!_createService && _oldShards.Length == 0) { long readVersion = -1; try { - readVersion = long.Parse(RetrieveServiceInfo(InfoTitle("CurrentVersion"))); + readVersion = long.Parse(RetrieveServiceInfo(InfoTitle("CurrentVersion", shardID))); } catch { - OnError(VersionMismatch, "Version mismatch on process start: Expected " + _currentVersion + " was: " + RetrieveServiceInfo(InfoTitle("CurrentVersion"))); + OnError(VersionMismatch, "Version mismatch on process start: Expected " + _currentVersion + " was: " + RetrieveServiceInfo(InfoTitle("CurrentVersion", shardID))); } if (_currentVersion != readVersion) { @@ -3768,24 +4671,22 @@ internal void RuntimeChecksOnProcessStart() } if (!_runningRepro) { - if (long.Parse(RetrieveServiceInfo(InfoTitle("LastCommittedCheckpoint"))) < 1) + if (long.Parse(RetrieveServiceInfo(InfoTitle("LastCommittedCheckpoint", shardID))) < 1) { OnError(MissingCheckpoint, "No checkpoint in metadata"); } } - if (!LogWriter.DirectoryExists(_serviceLogPath + _serviceName + "_" + _currentVersion)) + if (!LogWriter.DirectoryExists(LogDirectory(_currentVersion))) { OnError(MissingCheckpoint, "No checkpoint/logs directory"); } - var lastCommittedCheckpoint = long.Parse(RetrieveServiceInfo(InfoTitle("LastCommittedCheckpoint"))); - if (!LogWriter.FileExists(Path.Combine(_serviceLogPath + _serviceName + "_" + _currentVersion, - "server" + "chkpt" + lastCommittedCheckpoint))) + var lastCommittedCheckpoint = long.Parse(RetrieveServiceInfo(InfoTitle("LastCommittedCheckpoint", shardID))); + if (!LogWriter.FileExists(CheckpointName(lastCommittedCheckpoint, -1, shardID))) { OnError(MissingCheckpoint, "Missing checkpoint " + lastCommittedCheckpoint.ToString()); } - if (!LogWriter.FileExists(Path.Combine(_serviceLogPath + _serviceName + "_" + _currentVersion, - "server" + "log" + lastCommittedCheckpoint))) + if (!LogWriter.FileExists(LogFileName(lastCommittedCheckpoint, -1, shardID))) { OnError(MissingLog, "Missing log " + lastCommittedCheckpoint.ToString()); } @@ -3829,13 +4730,17 @@ bool sharded _serviceName = serviceName; _storageConnectionString = storageConnectionString; _sharded = sharded; + if (_sharded) + { + Console.WriteLine("Running instance with shard ID " + _shardID.ToString()); + } _coral = ClientLibrary; Console.WriteLine("Logs directory: {0}", _serviceLogPath); if (createService == null) { - if (LogWriter.DirectoryExists(_serviceLogPath + _serviceName + "_" + _currentVersion)) + if (LogWriter.DirectoryExists(RootDirectory())) { createService = false; } @@ -3858,7 +4763,8 @@ internal void InitializeRepro(string serviceName, int version, bool testUpgrade, int serviceReceiveFromPort, - int serviceSendToPort) + int serviceSendToPort, + long shardID) { _localServiceReceiveFromPort = serviceReceiveFromPort; _localServiceSendToPort = serviceSendToPort; @@ -3868,8 +4774,11 @@ internal void InitializeRepro(string serviceName, _activeActive = true; _serviceLogPath = serviceLogPath; _serviceName = serviceName; - _sharded = false; + _shardID = shardID; + _sharded = shardID != -1; _createService = false; + _oldShards = new long[0]; // TODO: May need to change this. + _shardLocator = KeyHashToShard; RecoverOrStartAsync(checkpointToLoad, testUpgrade).Wait(); } } @@ -3892,6 +4801,44 @@ class Program private static long _logTriggerSizeMB = 1000; private static int _currentVersion = 0; private static long _upgradeVersion = -1; + private static long _shardID = -1; + private static string _oldShards = ""; + private static string _newShards = ""; + + private static void InstantiateVertex(CRAClientLibrary client, string instanceName, string vertexName, string vertexDefinition, object vertexParameter, bool sharded) + { + CRAErrorCode result; + if (!sharded) + { + result = client.InstantiateVertex(instanceName, vertexName, vertexDefinition, vertexParameter); + } else + { + ConcurrentDictionary vertexShards = new ConcurrentDictionary(); + vertexShards[instanceName] = 1; + result = client.InstantiateShardedVertex(vertexName, vertexDefinition, vertexParameter, vertexShards); + } + if (result != CRAErrorCode.Success) + { + throw new Exception(); + } + } + + /* private static void DefineVertex(CRAClientLibrary client, string vertexDefinition, bool sharded) + { + CRAErrorCode result; + if (!sharded) + { + result = client.DefineVertex(param.AmbrosiaBinariesLocation, () => new AmbrosiaRuntime()); + } + /* if (client.DefineVertex(param.AmbrosiaBinariesLocation, () => new AmbrosiaRuntime()) != CRAErrorCode.Success) + { + throw new Exception(); + } + if (result != CRAErrorCode.Success) + { + throw new Exception(); + } + }(*/ static void Main(string[] args) { @@ -3902,9 +4849,10 @@ static void Main(string[] args) case LocalAmbrosiaRuntimeModes.DebugInstance: var myRuntime = new AmbrosiaRuntime(); myRuntime.InitializeRepro(_instanceName, _serviceLogPath, _checkpointToLoad, _currentVersion, - _isTestingUpgrade, _serviceReceiveFromPort, _serviceSendToPort); + _isTestingUpgrade, _serviceReceiveFromPort, _serviceSendToPort, _shardID); return; case LocalAmbrosiaRuntimeModes.AddReplica: + case LocalAmbrosiaRuntimeModes.AddShard: case LocalAmbrosiaRuntimeModes.RegisterInstance: if (_runtimeMode == LocalAmbrosiaRuntimeModes.AddReplica) { @@ -3913,7 +4861,8 @@ static void Main(string[] args) var client = new CRAClientLibrary(Environment.GetEnvironmentVariable("AZURE_STORAGE_CONN_STRING")); client.DisableArtifactUploading(); - var replicaName = $"{_instanceName}{_replicaNumber}"; + string shardName = AmbrosiaRuntime.GetShardName(_instanceName, _shardID); + var replicaName = $"{shardName}_{_replicaNumber}"; AmbrosiaRuntimeParams param = new AmbrosiaRuntimeParams(); param.createService = _recoveryMode == AmbrosiaRecoveryModes.A ? (bool?)null @@ -3930,14 +4879,12 @@ static void Main(string[] args) param.serviceLogPath = _serviceLogPath; param.AmbrosiaBinariesLocation = _binariesLocation; param.storageConnectionString = Environment.GetEnvironmentVariable("AZURE_STORAGE_CONN_STRING"); + param.shardID = _shardID; + param.oldShards = ParseLongs(_oldShards); + param.newShards = ParseLongs(_newShards); try { - if (client.DefineVertex(param.AmbrosiaBinariesLocation, () => new AmbrosiaRuntime()) != CRAErrorCode.Success) - { - throw new Exception(); - } - // Workaround because of limitation in parameter serialization in CRA XmlSerializer xmlSerializer = new XmlSerializer(param.GetType()); string serializedParams; @@ -3947,14 +4894,20 @@ static void Main(string[] args) serializedParams = textWriter.ToString(); } - if (client.InstantiateVertex(replicaName, param.serviceName, param.AmbrosiaBinariesLocation, serializedParams) != CRAErrorCode.Success) +<<<<<<< Updated upstream + InstantiateVertex(client, replicaName, shardName, param.AmbrosiaBinariesLocation, serializedParams, _shardID > 0); +======= + if (client.InstantiateVertex(replicaName, shardName, param.AmbrosiaBinariesLocation, serializedParams) != CRAErrorCode.Success) { throw new Exception(); } - client.AddEndpoint(param.serviceName, AmbrosiaRuntime.AmbrosiaDataInputsName, true, true); - client.AddEndpoint(param.serviceName, AmbrosiaRuntime.AmbrosiaDataOutputsName, false, true); - client.AddEndpoint(param.serviceName, AmbrosiaRuntime.AmbrosiaControlInputsName, true, true); - client.AddEndpoint(param.serviceName, AmbrosiaRuntime.AmbrosiaControlOutputsName, false, true); + Console.WriteLine("Creating endpoints for {0}", shardName); +>>>>>>> Stashed changes + + client.AddEndpoint(shardName, AmbrosiaRuntime.AmbrosiaDataInputsName, true, true); + client.AddEndpoint(shardName, AmbrosiaRuntime.AmbrosiaDataOutputsName, false, true); + client.AddEndpoint(shardName, AmbrosiaRuntime.AmbrosiaControlInputsName, true, true); + client.AddEndpoint(shardName, AmbrosiaRuntime.AmbrosiaControlOutputsName, false, true); } catch (Exception e) { @@ -3967,6 +4920,23 @@ static void Main(string[] args) } } + private static long[] ParseLongs(string s) + { + if (s.Trim() == "") + { + return new long[] { }; + } + string[] shards = s.Split(','); + long[] ids = new long[shards.Length]; + + for (int i = 0; i < shards.Length; i++) + { + ids[i] = long.Parse(shards[i]); + } + return ids; + + } + private static void ParseAndValidateOptions(string[] args) { var options = ParseOptions(args, out var shouldShowHelp); @@ -3983,6 +4953,7 @@ private static OptionSet ParseOptions(string[] args, out bool shouldShowHelp) { "rp|receivePort=", "The service receive from port [REQUIRED].", rp => _serviceReceiveFromPort = int.Parse(rp) }, { "sp|sendPort=", "The service send to port. [REQUIRED]", sp => _serviceSendToPort = int.Parse(sp) }, { "l|log=", "The service log path.", l => _serviceLogPath = l }, + {"si|shardID=", "The shard ID of the instance", si => _shardID = long.Parse(si) }, }; var helpOption = new OptionSet @@ -4009,6 +4980,12 @@ private static OptionSet ParseOptions(string[] args, out bool shouldShowHelp) { "r|replicaNum=", "The replica # [REQUIRED].", r => _replicaNumber = int.Parse(r) }, }.AddMany(registerInstanceOptionSet); + var addShardOptionSet = new OptionSet + { + {"os|oldShards=", "Comma separated list of shards to recover from [REQUIRED].", os => _oldShards = os }, + {"ns|newShards=", "Comma separated list of new shards being created [REQUIRED].", ns => _newShards = ns } + }.AddMany(registerInstanceOptionSet); + var debugInstanceOptionSet = basicOptions.AddMany(new OptionSet { { "c|checkpoint=", "The checkpoint # to load.", c => _checkpointToLoad = long.Parse(c) }, @@ -4018,6 +4995,7 @@ private static OptionSet ParseOptions(string[] args, out bool shouldShowHelp) registerInstanceOptionSet = registerInstanceOptionSet.AddMany(helpOption); addReplicaOptionSet = addReplicaOptionSet.AddMany(helpOption); + addShardOptionSet = addShardOptionSet.AddMany(helpOption); debugInstanceOptionSet = debugInstanceOptionSet.AddMany(helpOption); @@ -4025,6 +5003,7 @@ private static OptionSet ParseOptions(string[] args, out bool shouldShowHelp) { { LocalAmbrosiaRuntimeModes.RegisterInstance, registerInstanceOptionSet}, { LocalAmbrosiaRuntimeModes.AddReplica, addReplicaOptionSet}, + { LocalAmbrosiaRuntimeModes.AddShard, addShardOptionSet }, { LocalAmbrosiaRuntimeModes.DebugInstance, debugInstanceOptionSet}, }; @@ -4056,6 +5035,7 @@ private static OptionSet ParseOptions(string[] args, out bool shouldShowHelp) public enum LocalAmbrosiaRuntimeModes { AddReplica, + AddShard, RegisterInstance, DebugInstance, } @@ -4091,6 +5071,13 @@ private static void ValidateOptions(OptionSet options, bool shouldShowHelp) errorMessage += "Replica number is required.\n"; } } + if (_runtimeMode == LocalAmbrosiaRuntimeModes.AddShard) + { + if (_shardID == -1) + { + errorMessage += "Shard ID is required.\n"; + } + } // handles the case when an upgradeversion is not specified if (_upgradeVersion == -1) diff --git a/Ambrosia/Ambrosia/Serializer.cs b/Ambrosia/Ambrosia/Serializer.cs new file mode 100644 index 00000000..8f8127b4 --- /dev/null +++ b/Ambrosia/Ambrosia/Serializer.cs @@ -0,0 +1,152 @@ +using System; +using System.Collections.Concurrent; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Ambrosia +{ + public class Serializer + { + private const int messageTypeSize = 1; + + public static void SerializeAncestorMessage(Stream stream, byte messageType, long[] ancestors) + { + var numAncestors = ancestors.Length; + var messageSize = messageTypeSize + StreamCommunicator.LongSize(numAncestors); + foreach (var ancestor in ancestors) + { + messageSize += StreamCommunicator.LongSize(ancestor); + } + + // Write message size + stream.WriteInt(messageSize); + // Write message type + stream.WriteByte(messageType); + // Write number of ancestors + stream.WriteInt(numAncestors); + // Write ancestors + foreach(var ancestor in ancestors) + { + stream.WriteLong(ancestor); + } + } + + public static async Task DeserializeAncestorMessageAsync(Stream stream, CancellationToken ct) + { + var inputFlexBuffer = new FlexReadBuffer(); + await FlexReadBuffer.DeserializeAsync(stream, inputFlexBuffer, ct); + var sizeBytes = inputFlexBuffer.LengthLength; + // Get the seqNo of the replay/filter point + var offset = messageTypeSize + sizeBytes; + var numAncestors = StreamCommunicator.ReadBufferedInt(inputFlexBuffer.Buffer, offset); + offset += StreamCommunicator.IntSize(numAncestors); + var ancestors = new long[numAncestors]; + for (int i = 0; i < numAncestors; i++) + { + ancestors[i] = StreamCommunicator.ReadBufferedLong(inputFlexBuffer.Buffer, offset); + offset += StreamCommunicator.LongSize(ancestors[i]); + } + return ancestors; + } + + public static void SerializeReplayMessage(Stream stream, + byte messageType, + long lastProcessedID, + long lastProcessedReplayableID, + ConcurrentDictionary>> ancestorsToIDs) + { + var dictCount = ancestorsToIDs.Count; + var messageSize = messageTypeSize + StreamCommunicator.LongSize(lastProcessedID) + + StreamCommunicator.LongSize(lastProcessedReplayableID) + + StreamCommunicator.LongSize(dictCount); + + foreach (var peerID in ancestorsToIDs.Keys) + { + messageSize += StreamCommunicator.LongSize(peerID); + messageSize += StreamCommunicator.LongSize(ancestorsToIDs[peerID].Count); + foreach (var shardID in ancestorsToIDs[peerID].Keys) + { + messageSize += StreamCommunicator.LongSize(shardID); + messageSize += StreamCommunicator.LongSize(ancestorsToIDs[peerID][shardID].Item1 + 1); + messageSize += StreamCommunicator.LongSize(ancestorsToIDs[peerID][shardID].Item2 + 1); + } + } + + // Write message size + stream.WriteInt(messageSize); + // Write message type + stream.WriteByte(messageType); + // Write the output filter seqNo for the other side + stream.WriteLong(lastProcessedID); + stream.WriteLong(lastProcessedReplayableID); + + // For the sharded case, send replay values for other shards + stream.WriteInt(dictCount); + foreach (var peerID in ancestorsToIDs.Keys) + { + stream.WriteLong(peerID); + stream.WriteInt(ancestorsToIDs[peerID].Count); + foreach (var shardID in ancestorsToIDs[peerID].Keys) + { + stream.WriteLong(shardID); + stream.WriteLong(ancestorsToIDs[peerID][shardID].Item1); + stream.WriteLong(ancestorsToIDs[peerID][shardID].Item2); + } + } + } + + public static async Task>>>> DeserializeReplayMessageAsync(Stream stream, CancellationToken ct) + { + var inputFlexBuffer = new FlexReadBuffer(); + await FlexReadBuffer.DeserializeAsync(stream, inputFlexBuffer, ct); + var sizeBytes = inputFlexBuffer.LengthLength; + // Get the seqNo of the replay/filter point + var offset = messageTypeSize + sizeBytes; + var lastProcessedID = StreamCommunicator.ReadBufferedLong(inputFlexBuffer.Buffer, offset); + offset += StreamCommunicator.LongSize(lastProcessedID); + var lastProcessedReplayableID = StreamCommunicator.ReadBufferedLong(inputFlexBuffer.Buffer, offset); + offset += StreamCommunicator.LongSize(lastProcessedReplayableID); + var numPeers = StreamCommunicator.ReadBufferedInt(inputFlexBuffer.Buffer, offset); + offset += StreamCommunicator.IntSize(numPeers); + var ancestorsToIDs = new ConcurrentDictionary>>(); + for (int i = 0; i < numPeers; i++) + { + long peerID = StreamCommunicator.ReadBufferedLong(inputFlexBuffer.Buffer, offset); + offset += StreamCommunicator.LongSize(peerID); + int numShards = StreamCommunicator.ReadBufferedInt(inputFlexBuffer.Buffer, offset); + offset += StreamCommunicator.IntSize(numShards); + ancestorsToIDs[peerID] = new ConcurrentDictionary>(); + for (int j = 0; j < numShards; j++) + { + long shardID = StreamCommunicator.ReadBufferedLong(inputFlexBuffer.Buffer, offset); + offset += StreamCommunicator.LongSize(shardID); + long shardLastProcessedID = StreamCommunicator.ReadBufferedLong(inputFlexBuffer.Buffer, offset); + offset += StreamCommunicator.LongSize(shardLastProcessedID); + long shardLastProcessedReplayableID = StreamCommunicator.ReadBufferedLong(inputFlexBuffer.Buffer, offset); + offset += StreamCommunicator.LongSize(shardLastProcessedReplayableID); + ancestorsToIDs[peerID][shardID] = new Tuple(shardLastProcessedID, shardLastProcessedReplayableID); + } + } + inputFlexBuffer.ResetBuffer(); + return Tuple.Create(lastProcessedID, lastProcessedReplayableID, ancestorsToIDs); + } + + public static void SerializeShardTrimMessage(Stream stream, byte messageType) + { + var messageSize = messageTypeSize; + + // Write message size + stream.WriteInt(messageSize); + + // Write message type + stream.WriteByte(messageType); + } + + public static async Task DeserializeShardTrimMessageAsync(Stream stream, CancellationToken ct) + { + var inputFlexBuffer = new FlexReadBuffer(); + await FlexReadBuffer.DeserializeAsync(stream, inputFlexBuffer, ct); + } + } +} diff --git a/Ambrosia/AmbrosiaTests/AmbrosiaTests.csproj b/Ambrosia/AmbrosiaTests/AmbrosiaTests.csproj new file mode 100644 index 00000000..69016b40 --- /dev/null +++ b/Ambrosia/AmbrosiaTests/AmbrosiaTests.csproj @@ -0,0 +1,27 @@ + + + + netcoreapp2.1 + + false + + + + x64 + + + + x64 + + + + + + + + + + + + + diff --git a/Ambrosia/AmbrosiaTests/SerializerTests.cs b/Ambrosia/AmbrosiaTests/SerializerTests.cs new file mode 100644 index 00000000..3e5c7eb4 --- /dev/null +++ b/Ambrosia/AmbrosiaTests/SerializerTests.cs @@ -0,0 +1,100 @@ +using Microsoft.VisualStudio.TestTools.UnitTesting; +using Ambrosia; +using System; +using System.Collections.Concurrent; +using System.IO; +using System.Threading; + +namespace AmbrosiaTests +{ + [TestClass] + public class SerializerTests + { + [TestMethod] + public void TestAncestorMessage() + { + long[] expectedAncestors = new long[3] { 3, 17, 39 }; + using (var stream = new MemoryStream()) + { + Serializer.SerializeAncestorMessage( + stream, + AmbrosiaRuntime.ancestorByte, + expectedAncestors + ); + stream.Flush(); + stream.Position = 0; + var actualAncestors = Serializer.DeserializeAncestorMessageAsync(stream, new CancellationToken()).GetAwaiter().GetResult(); + CollectionAssert.AreEqual(expectedAncestors, actualAncestors); + } + } + + [TestMethod] + public void TestNonShardReplayMessage() + { + long expectedLastProcessedID = 14; + long expectedLastProcessedReplayableID = 9; + var ancestorsToIDs = new ConcurrentDictionary>>(); + + using (var stream = new MemoryStream()) + { + Serializer.SerializeReplayMessage( + stream, + AmbrosiaRuntime.replayFromByte, + expectedLastProcessedID, + expectedLastProcessedReplayableID, + ancestorsToIDs + ); + stream.Flush(); + stream.Position = 0; + var result = Serializer.DeserializeReplayMessageAsync(stream, new CancellationToken()).GetAwaiter().GetResult(); + Assert.AreEqual(expectedLastProcessedID, result.Item1); + Assert.AreEqual(expectedLastProcessedReplayableID, result.Item2); + var actualShardToLastID = result.Item3; + Assert.AreEqual(actualShardToLastID.Count, 0); + } + } + + [TestMethod] + public void TestShardReplayMessage() + { + long expectedLastProcessedID = 14; + long expectedLastProcessedReplayableID = 9; + var ancestorsToIDs = new ConcurrentDictionary>>(); + ancestorsToIDs[2] = new ConcurrentDictionary>(); + ancestorsToIDs[2][1] = new Tuple(11, 7); + ancestorsToIDs[2][5] = new Tuple(9, 4); + ancestorsToIDs[4] = new ConcurrentDictionary>(); + ancestorsToIDs[4][3] = new Tuple(8, 6); + ancestorsToIDs[4][7] = new Tuple(10, 5); + + using (var stream = new MemoryStream()) + { + Serializer.SerializeReplayMessage( + stream, + AmbrosiaRuntime.replayFromByte, + expectedLastProcessedID, + expectedLastProcessedReplayableID, + ancestorsToIDs + ); + stream.Flush(); + stream.Position = 0; + var result = Serializer.DeserializeReplayMessageAsync(stream, new CancellationToken()).GetAwaiter().GetResult(); + Assert.AreEqual(expectedLastProcessedID, result.Item1); + Assert.AreEqual(expectedLastProcessedReplayableID, result.Item2); + var actualShardToLastID = result.Item3; + Assert.AreEqual(actualShardToLastID.Count, ancestorsToIDs.Count); + + foreach (var peerID in ancestorsToIDs.Keys) + { + Assert.IsTrue(actualShardToLastID.ContainsKey(peerID)); + Assert.AreEqual(actualShardToLastID[peerID].Count, ancestorsToIDs[peerID].Count); + foreach (var shardID in ancestorsToIDs[peerID].Keys) + { + Assert.IsTrue(actualShardToLastID[peerID].ContainsKey(shardID)); + Assert.AreEqual(actualShardToLastID[peerID][shardID], ancestorsToIDs[peerID][shardID]); + } + } + } + } + } +} diff --git a/AmbrosiaTest/AmbrosiaTest/AmbrosiaTest.csproj b/AmbrosiaTest/AmbrosiaTest/AmbrosiaTest.csproj index 0b2bee08..c7a44d86 100644 --- a/AmbrosiaTest/AmbrosiaTest/AmbrosiaTest.csproj +++ b/AmbrosiaTest/AmbrosiaTest/AmbrosiaTest.csproj @@ -78,6 +78,7 @@ + diff --git a/AmbrosiaTest/AmbrosiaTest/Cmp/shardunitendtoendtest_AMB1.cmp b/AmbrosiaTest/AmbrosiaTest/Cmp/shardunitendtoendtest_AMB1.cmp new file mode 100644 index 00000000..3797d308 --- /dev/null +++ b/AmbrosiaTest/AmbrosiaTest/Cmp/shardunitendtoendtest_AMB1.cmp @@ -0,0 +1 @@ +The CRA instance appears to be down. Restart it and this vertex will be instantiated automatically diff --git a/AmbrosiaTest/AmbrosiaTest/Cmp/shardunitendtoendtest_AMB2.cmp b/AmbrosiaTest/AmbrosiaTest/Cmp/shardunitendtoendtest_AMB2.cmp new file mode 100644 index 00000000..3797d308 --- /dev/null +++ b/AmbrosiaTest/AmbrosiaTest/Cmp/shardunitendtoendtest_AMB2.cmp @@ -0,0 +1 @@ +The CRA instance appears to be down. Restart it and this vertex will be instantiated automatically diff --git a/AmbrosiaTest/AmbrosiaTest/Cmp/shardunitendtoendtest_ClientJob.cmp b/AmbrosiaTest/AmbrosiaTest/Cmp/shardunitendtoendtest_ClientJob.cmp new file mode 100644 index 00000000..65203280 --- /dev/null +++ b/AmbrosiaTest/AmbrosiaTest/Cmp/shardunitendtoendtest_ClientJob.cmp @@ -0,0 +1,5 @@ +Bytes per RPC Throughput (GB/sec) +*X* 1024 0.0046468462919506 +Service Received 1024 MB so far +Bytes received: 1073741824 +DONE diff --git a/AmbrosiaTest/AmbrosiaTest/Cmp/shardunitendtoendtest_ClientJob_Verify.cmp b/AmbrosiaTest/AmbrosiaTest/Cmp/shardunitendtoendtest_ClientJob_Verify.cmp new file mode 100644 index 00000000..741adee4 --- /dev/null +++ b/AmbrosiaTest/AmbrosiaTest/Cmp/shardunitendtoendtest_ClientJob_Verify.cmp @@ -0,0 +1,5 @@ +Bytes per RPC Throughput (GB/sec) +*X* 1024 0.0178702141848639 +Service Received 1024 MB so far +Bytes received: 1073741824 +DONE diff --git a/AmbrosiaTest/AmbrosiaTest/Cmp/shardunitendtoendtest_Server.cmp b/AmbrosiaTest/AmbrosiaTest/Cmp/shardunitendtoendtest_Server.cmp new file mode 100644 index 00000000..49f4df51 --- /dev/null +++ b/AmbrosiaTest/AmbrosiaTest/Cmp/shardunitendtoendtest_Server.cmp @@ -0,0 +1,7 @@ +*X* At checkpoint, received 0 messages +*X* Becoming a primary now +*X* Server in Entry Point +*X* At checkpoint, received 969264 messages +Received 1024 MB so far +Bytes received: 1073741824 +DONE diff --git a/AmbrosiaTest/AmbrosiaTest/Cmp/shardunitendtoendtest_Server_Verify.cmp b/AmbrosiaTest/AmbrosiaTest/Cmp/shardunitendtoendtest_Server_Verify.cmp new file mode 100644 index 00000000..dcafc109 --- /dev/null +++ b/AmbrosiaTest/AmbrosiaTest/Cmp/shardunitendtoendtest_Server_Verify.cmp @@ -0,0 +1,4 @@ +*X* Server in Entry Point +Received 1024 MB so far +Bytes received: 1073741824 +DONE diff --git a/AmbrosiaTest/AmbrosiaTest/Shard_UnitTest.cs b/AmbrosiaTest/AmbrosiaTest/Shard_UnitTest.cs new file mode 100644 index 00000000..9e5cee79 --- /dev/null +++ b/AmbrosiaTest/AmbrosiaTest/Shard_UnitTest.cs @@ -0,0 +1,241 @@ +using Microsoft.VisualStudio.TestTools.UnitTesting; +using System; +using System.Configuration; +using System.Threading; +using System.Windows.Forms; + +namespace AmbrosiaTest +{ + [TestClass] + public class Shard_UnitTest + { + //************* Init Code ***************** + // NOTE: Need this bit of code at the top of every "[TestClass]" (per .cs test file) to get context \ details of the current test running + // NOTE: Make sure all names be "Azure Safe". No capital letters and no underscore. + [TestInitialize()] + public void Initialize() + { + Utilities MyUtils = new Utilities(); + MyUtils.TestInitialize(); + } + //************* Init Code ***************** + + [TestMethod] + public void Shard_UnitTest_BasicEndtoEnd_Test() + { + // Test that one shard per server and client works + string testName = "shardunitendtoendtest"; + string clientJobName = testName + "clientjob"; + string serverName = testName + "server"; + string ambrosiaLogDir = ConfigurationManager.AppSettings["AmbrosiaLogDirectory"] + "\\"; + string byteSize = "1073741824"; + + Utilities MyUtils = new Utilities(); + + // AMB1 - Job + string logOutputFileName_AMB1 = testName + "_AMB1.log"; + AMB_Settings AMB1 = new AMB_Settings + { + AMB_ServiceName = clientJobName, + AMB_PortAppReceives = "1000", + AMB_PortAMBSends = "1001", + AMB_ServiceLogPath = ambrosiaLogDir, + AMB_CreateService = "A", + AMB_PauseAtStart = "N", + AMB_PersistLogs = "Y", + AMB_NewLogTriggerSize = "1000", + AMB_ActiveActive = "N", + AMB_Version = "0", + AMB_ShardID = "1", + }; + MyUtils.CallAMB(AMB1, logOutputFileName_AMB1, AMB_ModeConsts.RegisterInstance); + + // AMB2 - Shard 1 + string logOutputFileName_AMB2 = testName + "_AMB2.log"; + AMB_Settings AMB2 = new AMB_Settings + { + AMB_ServiceName = serverName, + AMB_PortAppReceives = "2000", + AMB_PortAMBSends = "2001", + AMB_ServiceLogPath = ambrosiaLogDir, + AMB_CreateService = "A", + AMB_PauseAtStart = "N", + AMB_PersistLogs = "Y", + AMB_NewLogTriggerSize = "1000", + AMB_ActiveActive = "N", + AMB_Version = "0", + AMB_ShardID = "1", + }; + MyUtils.CallAMB(AMB2, logOutputFileName_AMB2, AMB_ModeConsts.RegisterInstance); + + // ImmCoord1 + string logOutputFileName_ImmCoord1 = testName + "_ImmCoord1.log"; + int ImmCoordProcessID1 = MyUtils.StartImmCoord(clientJobName, 1500, logOutputFileName_ImmCoord1, shardID: 1); + + // ImmCoord2 + string logOutputFileName_ImmCoord2 = testName + "_ImmCoord2.log"; + int ImmCoordProcessID2 = MyUtils.StartImmCoord(serverName, 2500, logOutputFileName_ImmCoord2, shardID: 1); + + // Client + string logOutputFileName_ClientJob = testName + "_ClientJob.log"; + int clientJobProcessID = MyUtils.StartPerfClientJob("1001", "1000", clientJobName, serverName, "1024", "1", logOutputFileName_ClientJob); + + // Give it a few seconds to start + Thread.Sleep(2000); + + // Server Call + string logOutputFileName_Server = testName + "_Server.log"; + int serverProcessID = MyUtils.StartPerfServer("2001", "2000", clientJobName, serverName, logOutputFileName_Server, 1, false); + + // Delay until client is done - also check Server just to make sure + bool pass = MyUtils.WaitForProcessToFinish(logOutputFileName_ClientJob, byteSize, 5, false, testName, true); // Number of bytes processed + pass = MyUtils.WaitForProcessToFinish(logOutputFileName_Server, byteSize, 5, false, testName, true); + + // Stop things to file is freed up and can be opened in verify + MyUtils.KillProcess(clientJobProcessID); + MyUtils.KillProcess(serverProcessID); + MyUtils.KillProcess(ImmCoordProcessID1); + MyUtils.KillProcess(ImmCoordProcessID2); + + // Verify AMB + MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_AMB1); + MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_AMB2); + + // Verify Client + MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_ClientJob); + + // Verify Server + MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_Server); + + // Verify integrity of Ambrosia logs by replaying + MyUtils.VerifyAmbrosiaLogFile(testName, Convert.ToInt64(byteSize), true, true, AMB1.AMB_Version, shardID: 1); + } + [TestMethod] + public void Shard_UnitTest_SingleReshardEndtoEnd_Test() + { + // Test that one shard per server and client works + string testName = "shardunitsinglereshardendtoendtest"; + string clientJobName = testName + "clientjob"; + string serverName = testName + "server"; + string ambrosiaLogDir = ConfigurationManager.AppSettings["AmbrosiaLogDirectory"] + "\\"; + string byteSize = "1073741824"; + + Utilities MyUtils = new Utilities(); + + // AMB1 - Job + string logOutputFileName_AMB1 = testName + "_AMB1.log"; + AMB_Settings AMB1 = new AMB_Settings + { + AMB_ServiceName = clientJobName, + AMB_PortAppReceives = "1000", + AMB_PortAMBSends = "1001", + AMB_ServiceLogPath = ambrosiaLogDir, + AMB_CreateService = "A", + AMB_PauseAtStart = "N", + AMB_PersistLogs = "Y", + AMB_NewLogTriggerSize = "1000", + AMB_ActiveActive = "N", + AMB_Version = "0", + AMB_ShardID = "1", + }; + MyUtils.CallAMB(AMB1, logOutputFileName_AMB1, AMB_ModeConsts.RegisterInstance); + + // AMB2 - Shard 1 + string logOutputFileName_AMB2 = testName + "_AMB2.log"; + AMB_Settings AMB2 = new AMB_Settings + { + AMB_ServiceName = serverName, + AMB_PortAppReceives = "2000", + AMB_PortAMBSends = "2001", + AMB_ServiceLogPath = ambrosiaLogDir, + AMB_CreateService = "A", + AMB_PauseAtStart = "N", + AMB_PersistLogs = "Y", + AMB_NewLogTriggerSize = "1000", + AMB_ActiveActive = "N", + AMB_Version = "0", + AMB_ShardID = "1", + }; + MyUtils.CallAMB(AMB2, logOutputFileName_AMB2, AMB_ModeConsts.RegisterInstance); + + // AMB 3 - Shard 2 + string logOutputFileName_AMB3 = testName + "_AMB3.log"; + AMB_Settings AMB3 = new AMB_Settings + { + AMB_ServiceName = serverName, + AMB_PortAppReceives = "3000", + AMB_PortAMBSends = "3001", + AMB_ServiceLogPath = ambrosiaLogDir, + AMB_CreateService = "A", + AMB_PauseAtStart = "N", + AMB_PersistLogs = "Y", + AMB_NewLogTriggerSize = "1000", + AMB_ActiveActive = "N", + AMB_Version = "0", + AMB_ShardID = "2", + AMB_OldShards = "1", + AMB_NewShards = "2" + }; + MyUtils.CallAMB(AMB3, logOutputFileName_AMB3, AMB_ModeConsts.AddShard); + + // ImmCoord1 + string logOutputFileName_ImmCoord1 = testName + "_ImmCoord1.log"; + int ImmCoordProcessID1 = MyUtils.StartImmCoord(clientJobName, 1500, logOutputFileName_ImmCoord1, shardID: 1); + + // ImmCoord2 + string logOutputFileName_ImmCoord2 = testName + "_ImmCoord2.log"; + int ImmCoordProcessID2 = MyUtils.StartImmCoord(serverName, 2500, logOutputFileName_ImmCoord2, shardID: 1); + + // Client + string logOutputFileName_ClientJob = testName + "_ClientJob.log"; + int clientJobProcessID = MyUtils.StartPerfClientJob("1001", "1000", clientJobName, serverName, "1024", "1", logOutputFileName_ClientJob); + + // Give it a few seconds to start + Thread.Sleep(1000); + + // First Server Call + string logOutputFileName_Server1 = testName + "_Server1.log"; + int serverProcessID1 = MyUtils.StartPerfServer("2001", "2000", clientJobName, serverName, logOutputFileName_Server1, 1, false); + + // Delay until client is done - also check Server just to make sure + //bool pass = MyUtils.WaitForProcessToFinish(logOutputFileName_ClientJob, byteSize, 5, false, testName, true); // Number of bytes processed + //pass = MyUtils.WaitForProcessToFinish(logOutputFileName_Server1, byteSize, 5, false, testName, true); + + // Give it 2 seconds to do something before killing it + //Thread.Sleep(2000); + Application.DoEvents(); // if don't do this ... system sees thread as blocked thread and throws message. + + // ImmCoord3 + string logOutputFileName_ImmCoord3 = testName + "_ImmCoord3.log"; + int ImmCoordProcessID3 = MyUtils.StartImmCoord(serverName, 3500, logOutputFileName_ImmCoord3, shardID: 2); + + // Second Server Call + string logOutputFileName_Server2 = testName + "_Server2.log"; + int serverProcessID2 = MyUtils.StartPerfServer("3001", "3000", clientJobName, serverName, logOutputFileName_Server2, 1, false); + bool pass = MyUtils.WaitForProcessToFinish(logOutputFileName_ClientJob, byteSize, 10, false, testName, true); // Number of bytes processed + pass = MyUtils.WaitForProcessToFinish(logOutputFileName_Server2, byteSize, 10, false, testName, true); + + // Stop things to file is freed up and can be opened in verify + MyUtils.KillProcess(clientJobProcessID); + MyUtils.KillProcess(serverProcessID1); + MyUtils.KillProcess(serverProcessID2); + MyUtils.KillProcess(ImmCoordProcessID1); + MyUtils.KillProcess(ImmCoordProcessID2); + MyUtils.KillProcess(ImmCoordProcessID3); + + // Verify AMB + MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_AMB1); + MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_AMB2); + MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_AMB3); + + // Verify Client + MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_ClientJob); + + // Verify Server 1 + MyUtils.VerifyTestOutputFileToCmpFile(logOutputFileName_Server2); + /* + // Verify integrity of Ambrosia logs by replaying + MyUtils.VerifyAmbrosiaLogFile(testName, Convert.ToInt64(byteSize), true, true, AMB1.AMB_Version, shardID: 1);*/ + } + } +} diff --git a/AmbrosiaTest/AmbrosiaTest/Utilities.cs b/AmbrosiaTest/AmbrosiaTest/Utilities.cs index ff88bcd5..4e25a955 100644 --- a/AmbrosiaTest/AmbrosiaTest/Utilities.cs +++ b/AmbrosiaTest/AmbrosiaTest/Utilities.cs @@ -28,11 +28,13 @@ public class AMB_Settings public string AMB_Version { get; set; } public string AMB_UpgradeToVersion { get; set; } public string AMB_ReplicaNumber { get; set; } - + public string AMB_ShardID { get; set; } + public string AMB_OldShards { get; set; } + public string AMB_NewShards { get; set; } } // These are the different modes of what the AMB is called - public enum AMB_ModeConsts { RegisterInstance, AddReplica, DebugInstance }; + public enum AMB_ModeConsts { RegisterInstance, AddReplica, DebugInstance, AddShard }; public class Utilities { @@ -460,7 +462,7 @@ public void VerifyTestOutputFileToCmpFile(string testOutputLogFile) // // Assumption: Test Output logs are .log and the cmp is the same file name but with .cmp extension //********************************************************************* - public void VerifyAmbrosiaLogFile(string testName, long numBytes, bool checkCmpFile, bool startWithFirstFile, string CurrentVersion, string optionalNumberOfClient = "", bool asyncTest = false) + public void VerifyAmbrosiaLogFile(string testName, long numBytes, bool checkCmpFile, bool startWithFirstFile, string CurrentVersion, string optionalNumberOfClient = "", bool asyncTest = false, long shardID = -1) { // Basically doing this for multi client stuff @@ -481,6 +483,11 @@ public void VerifyAmbrosiaLogFile(string testName, long numBytes, bool checkCmpF // used to get log file string ambrosiaClientLogDir = ConfigurationManager.AppSettings["AmbrosiaLogDirectory"] + "\\" + testName + "clientjob" + optionalMultiClientStartingPoint + "_" + CurrentVersion; string ambrosiaServerLogDir = ConfigurationManager.AppSettings["AmbrosiaLogDirectory"] + "\\" + testName + "server_" + CurrentVersion; + if (shardID != -1) + { + ambrosiaClientLogDir += "\\" + shardID.ToString(); + ambrosiaServerLogDir += "\\" + shardID.ToString(); + } string startingClientChkPtVersionNumber = "1"; string clientFirstFile = ""; @@ -576,8 +583,12 @@ public void VerifyAmbrosiaLogFile(string testName, long numBytes, bool checkCmpF AMB_Version = CurrentVersion.ToString(), AMB_TestingUpgrade = "N", AMB_PortAppReceives = "1000", - AMB_PortAMBSends = "1001" + AMB_PortAMBSends = "1001", }; + if (shardID != -1) + { + AMB1.AMB_ShardID = shardID.ToString(); + } CallAMB(AMB1, logOutputFileName_AMB1, AMB_ModeConsts.DebugInstance); // AMB for Server @@ -592,6 +603,10 @@ public void VerifyAmbrosiaLogFile(string testName, long numBytes, bool checkCmpF AMB_PortAppReceives = "2000", AMB_PortAMBSends = "2001" }; + if (shardID != -1) + { + AMB2.AMB_ShardID = shardID.ToString(); + } CallAMB(AMB2, logOutputFileName_AMB2, AMB_ModeConsts.DebugInstance); string logOutputFileName_ClientJob_Verify; @@ -634,7 +649,7 @@ public void VerifyAmbrosiaLogFile(string testName, long numBytes, bool checkCmpF } - public int StartImmCoord(string ImmCoordName, int portImmCoordListensAMB, string testOutputLogFile, bool ActiveActive=false, int replicaNum = 9999) + public int StartImmCoord(string ImmCoordName, int portImmCoordListensAMB, string testOutputLogFile, bool ActiveActive=false, int replicaNum = 9999, int shardID = -1) { // Launch the AMB process with these values @@ -647,6 +662,10 @@ public int StartImmCoord(string ImmCoordName, int portImmCoordListensAMB, string string argString = "-i=" + ImmCoordName + " -p=" + portImmCoordListensAMB.ToString(); + if (shardID != -1) + { + argString = argString + " -si=" + shardID.ToString(); + } // if Active Active then required to get replicanu if (ActiveActive) @@ -729,6 +748,9 @@ public void CallAMB(AMB_Settings AMBSettings, string testOutputLogFile, AMB_Mode if (AMBSettings.AMB_NewLogTriggerSize != null) argString = argString + " -lts=" + AMBSettings.AMB_NewLogTriggerSize; + if (AMBSettings.AMB_ShardID != null) + argString = argString + " -si=" + AMBSettings.AMB_ShardID; + break; case AMB_ModeConsts.AddReplica: @@ -789,6 +811,45 @@ public void CallAMB(AMB_Settings AMBSettings, string testOutputLogFile, AMB_Mode if (AMBSettings.AMB_TestingUpgrade != null && AMBSettings.AMB_TestingUpgrade != "N") argString = argString + " -tu"; + if (AMBSettings.AMB_ShardID != null) + argString = argString + " -si=" + AMBSettings.AMB_ShardID; + + break; + + case AMB_ModeConsts.AddShard: + argString = "AddShard " + "-si=" + AMBSettings.AMB_ShardID + " -i=" + AMBSettings.AMB_ServiceName + + " -rp=" + AMBSettings.AMB_PortAppReceives + " -sp=" + AMBSettings.AMB_PortAMBSends + + " -r=" + AMBSettings.AMB_ReplicaNumber + " -os=" + AMBSettings.AMB_OldShards + + " -ns=" + AMBSettings.AMB_NewShards; + + // add Service log path + if (AMBSettings.AMB_ServiceLogPath != null) + argString = argString + " -l=" + AMBSettings.AMB_ServiceLogPath; + + // add pause at start + if (AMBSettings.AMB_PauseAtStart != null && AMBSettings.AMB_PauseAtStart != "N") + argString = argString + " -ps"; + + // add no persist logs at start + if (AMBSettings.AMB_PersistLogs != null && AMBSettings.AMB_PersistLogs != "Y") + argString = argString + " -npl"; + + // add new log trigger size if it exists + if (AMBSettings.AMB_NewLogTriggerSize != null) + argString = argString + " -lts=" + AMBSettings.AMB_NewLogTriggerSize; + + // add active active + if (AMBSettings.AMB_ActiveActive != null && AMBSettings.AMB_ActiveActive != "N") + argString = argString + " -aa"; + + // add current version if it exists + if (AMBSettings.AMB_Version != null) + argString = argString + " -cv=" + AMBSettings.AMB_Version; + + // add upgrade version if it exists + if (AMBSettings.AMB_UpgradeToVersion != null) + argString = argString + " -uv=" + AMBSettings.AMB_UpgradeToVersion; + break; } diff --git a/AmbrosiaTest/AmbrosiaTest/app.config b/AmbrosiaTest/AmbrosiaTest/app.config index 96bcda5d..a3b83fb4 100644 --- a/AmbrosiaTest/AmbrosiaTest/app.config +++ b/AmbrosiaTest/AmbrosiaTest/app.config @@ -11,7 +11,7 @@ - + diff --git a/Clients/CSharp/AmbrosiaLibCS/Immortal.cs b/Clients/CSharp/AmbrosiaLibCS/Immortal.cs index b26198cf..3be43e38 100644 --- a/Clients/CSharp/AmbrosiaLibCS/Immortal.cs +++ b/Clients/CSharp/AmbrosiaLibCS/Immortal.cs @@ -65,6 +65,10 @@ public abstract class Immortal : IDisposable [CopyFromDeserializedImmortal] public SerializableCallCache CallCache = new SerializableCallCache(); + [DataMember] + [CopyFromDeserializedImmortal] + public int CommitID; + public SerializableCache TaskIdToSequenceNumber = new SerializableCache(); private ImmortalSerializerBase _immortalSerializer; @@ -395,7 +399,7 @@ protected async Task Dispatch(int bytesToRead = 0) #endif _cursor++; - await this.TakeCheckpointAsync(); + await this.TakeCheckpointAsync(true); this.IsPrimary = true; this.BecomingPrimary(); @@ -752,10 +756,15 @@ public async Task SaveTaskAsync() return true; } - private async Task TakeCheckpointAsync() + private async Task TakeCheckpointAsync(bool flushOutput = false) { // wait for quiesence _outputLock.Acquire(2); + if (flushOutput) + { + _ambrosiaSendToConnectionRecord.placeInOutput = + await _ambrosiaSendToConnectionRecord.BufferedOutput.SendAsync(_ambrosiaSendToStream, _ambrosiaSendToConnectionRecord.placeInOutput); + } _ambrosiaSendToConnectionRecord.BufferedOutput.LockOutputBuffer(); // Save current task state unless just resumed from a serialized task @@ -1387,6 +1396,8 @@ public void Start() else if (firstByte == AmbrosiaRuntime.takeCheckpointByte || firstByte == AmbrosiaRuntime.takeBecomingPrimaryCheckpointByte) { // Then this container is starting for the first time + // Set the commitID prior to taking the first checkpoint + MyImmortal.CommitID = commitID; if (firstByte == AmbrosiaRuntime.takeCheckpointByte) { #if DEBUG diff --git a/ImmortalCoordinator/Program.cs b/ImmortalCoordinator/Program.cs index e247bc05..48df8cfb 100644 --- a/ImmortalCoordinator/Program.cs +++ b/ImmortalCoordinator/Program.cs @@ -20,12 +20,14 @@ class Program private static string _secureNetworkClassName; private static bool _isActiveActive = false; private static int _replicaNumber = 0; + private static long _shardID = -1; static void Main(string[] args) { ParseAndValidateOptions(args); - var replicaName = $"{_instanceName}{_replicaNumber}"; + string shardName = AmbrosiaRuntime.GetShardName(_instanceName, _shardID); + var replicaName = $"{shardName}_{_replicaNumber}"; if (_ipAddress == null) { @@ -131,6 +133,7 @@ private static OptionSet ParseOptions(string[] args, out bool shouldShowHelp) { "ac|assemblyClass=", "The secure network assembly class.", ac => _secureNetworkClassName = ac }, { "ip|IPAddr=", "Override automatic self IP detection", i => _ipAddress = i }, { "h|help", "show this message and exit", h => showHelp = h != null }, + { "si|shardID=", "The shard ID", si => _shardID = long.Parse(si) }, }; try diff --git a/Samples/HelloWorld/Client1/Program.cs b/Samples/HelloWorld/Client1/Program.cs index 7e7da210..9455a1fc 100644 --- a/Samples/HelloWorld/Client1/Program.cs +++ b/Samples/HelloWorld/Client1/Program.cs @@ -5,6 +5,7 @@ using System; using System.Runtime.Serialization; using System.Threading.Tasks; +using System.Threading; namespace Client1 { @@ -40,8 +41,15 @@ protected override async Task OnFirstStart() { _server = GetProxy(_serverName); + for (int i = 0; i < 1000; i++) + { + Console.WriteLine("Client 1: Hello World {0}", 2*i); + _server.ReceiveMessageFork("Client 1: Hello World " + (2*i).ToString()); + Thread.Sleep(1000); + } - _server.ReceiveMessageFork("\n!! Client: Hello World 1!"); + + /*_server.ReceiveMessageFork("\n!! Client: Hello World 1!"); using (ConsoleColorScope.SetForeground(ConsoleColor.Yellow)) { @@ -56,7 +64,7 @@ protected override async Task OnFirstStart() using (ConsoleColorScope.SetForeground(ConsoleColor.Yellow)) { Console.WriteLine("\n!! Client: Press enter to shutdown."); - } + }*/ Console.ReadLine(); Program.finishedTokenQ.Enqueue(0); @@ -73,8 +81,8 @@ static void Main(string[] args) int receivePort = 1001; int sendPort = 1000; - string clientInstanceName = "client"; - string serverInstanceName = "server"; + string clientInstanceName = "client1"; + string serverInstanceName = "server-1"; if (args.Length >= 1) { diff --git a/Samples/HelloWorld/Client2/Program.cs b/Samples/HelloWorld/Client2/Program.cs index 949f2970..43fb76a2 100644 --- a/Samples/HelloWorld/Client2/Program.cs +++ b/Samples/HelloWorld/Client2/Program.cs @@ -34,12 +34,12 @@ void InputLoop() } } - protected override void BecomingPrimary() + /* protected override void BecomingPrimary() { Console.WriteLine("Finished initializing state/recovering"); Thread timerThread = new Thread(InputLoop); timerThread.Start(); - } + }*/ public async Task ReceiveKeyboardInputAsync(string input) { @@ -50,6 +50,34 @@ public async Task ReceiveKeyboardInputAsync(string input) protected override async Task OnFirstStart() { _server = GetProxy(_serverName); + + for (int i = 0; i < 1000; i++) + { + Console.WriteLine("Client 2: Hello World {0}", 2 * i + 1); + _server.ReceiveMessageFork("Client 2: Hello World " + (2 * i + 1).ToString()); + Thread.Sleep(1000); + } + + + /*_server.ReceiveMessageFork("\n!! Client: Hello World 1!"); + + using (ConsoleColorScope.SetForeground(ConsoleColor.Yellow)) + { + Console.WriteLine("\n!! Client: Sent message 1."); + Console.WriteLine("\n!! Client: Press enter to continue (will send 2&3)"); + } + + Console.ReadLine(); + _server.ReceiveMessageFork("\n!! Client: Hello World 2!"); + _server.ReceiveMessageFork("\n!! Client: Hello World 3!"); + + using (ConsoleColorScope.SetForeground(ConsoleColor.Yellow)) + { + Console.WriteLine("\n!! Client: Press enter to shutdown."); + }*/ + + Console.ReadLine(); + Program.finishedTokenQ.Enqueue(0); return true; } } @@ -62,10 +90,10 @@ static void Main(string[] args) { finishedTokenQ = new AsyncQueue(); - int receivePort = 1001; - int sendPort = 1000; - string clientInstanceName = "client"; - string serverInstanceName = "server"; + int receivePort = 3001; + int sendPort = 3000; + string clientInstanceName = "client2"; + string serverInstanceName = "server-2"; if (args.Length >= 1) { diff --git a/Samples/HelloWorld/Server/Program.cs b/Samples/HelloWorld/Server/Program.cs index 5478c64e..4b564d61 100644 --- a/Samples/HelloWorld/Server/Program.cs +++ b/Samples/HelloWorld/Server/Program.cs @@ -1,5 +1,5 @@ using Ambrosia; -using Server; +using Client1; using System; using System.Runtime.Serialization; using System.Threading; @@ -29,6 +29,9 @@ sealed class Server : Immortal, IServer [DataMember] int _messagesReceived = 0; + private IClient1Proxy _client1; + private IClient2Proxy _client2; + public Server() { } @@ -46,6 +49,8 @@ public async Task ReceiveMessageAsync(string message) protected override async Task OnFirstStart() { + _client1 = GetProxy("client1"); + _client2 = GetProxy("client2"); return true; } } @@ -64,6 +69,9 @@ static void Main(string[] args) { sendPort = int.Parse(args[1]); } + + + if (args.Length == 3) { serviceName = args[2];