Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
75fdc96
Fix writer starvation in MemoryJournal by replacing ReaderWriterLockS…
Aaronontheweb Oct 8, 2025
717aad0
Merge branch 'dev' into claude-wt-InMemoryPersistence3
Aaronontheweb Oct 8, 2025
3420cf8
Merge branch 'dev' into claude-wt-InMemoryPersistence3
Aaronontheweb Oct 8, 2025
efb2113
Delete src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.…
Aaronontheweb Oct 8, 2025
83b2600
Add diagnostic logging to MemoryJournal to track lock contention
Aaronontheweb Oct 9, 2025
9f645dc
Add diagnostic logging to TestActor to trace message flow
Aaronontheweb Oct 9, 2025
570ec8f
Fix potential deadlock caused by Context.GetLogger() from async conti…
Aaronontheweb Oct 9, 2025
77c572e
Remove ConfigureAwait(false) from AsyncWriteJournal.ExecuteBatch
Aaronontheweb Oct 9, 2025
0a749ef
Fix ambiguous Debug reference by fully qualifying System.Diagnostics.…
Aaronontheweb Oct 9, 2025
1f7a2d2
Merge branch 'dev' into claude-wt-InMemoryPersistence3
Aaronontheweb Oct 9, 2025
2443a40
Merge branch 'dev' into claude-wt-InMemoryPersistence3
Aaronontheweb Oct 9, 2025
8083e13
Replace Debug.WriteLine with cached logger for diagnostic logging
Aaronontheweb Oct 9, 2025
983d9e7
Merge branch 'dev' into claude-wt-InMemoryPersistence3
Aaronontheweb Oct 10, 2025
007e079
Merge branch 'dev' into claude-wt-InMemoryPersistence3
Aaronontheweb Oct 10, 2025
e0eac39
Merge branch 'claude-wt-InMemoryPersistence3' of https://github.com/A…
Aaronontheweb Oct 10, 2025
653d1b4
remove public API changes
Aaronontheweb Oct 13, 2025
08555ae
Wrap MemoryJournal async methods in Task.Run to prevent thread starva…
Aaronontheweb Oct 14, 2025
0ccdfe7
Merge branch 'dev' into claude-wt-InMemoryPersistence3
Aaronontheweb Oct 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Akka.Persistence.Query.InMemory.Tests
public class InMemoryEventsByTagSpec : EventsByTagSpec
{
private static Config Config() => ConfigurationFactory.ParseString(@"
akka.loglevel = INFO
akka.loglevel = DEBUG
akka.persistence.journal.inmem {
event-adapters {
color-tagger = ""Akka.Persistence.TCK.Query.ColorFruitTagger, Akka.Persistence.TCK""
Expand Down
1,200 changes: 0 additions & 1,200 deletions src/core/Akka.API.Tests/verify/CoreAPISpec.ApprovePersistence.Core.verified.txt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ namespace Akka.Persistence.Journal
protected virtual System.Collections.Generic.Dictionary<string, long> DeletedTo { get; }
protected virtual System.Collections.Generic.List<Akka.Persistence.IPersistentRepresentation> EventLog { get; }
protected virtual System.Collections.Generic.Dictionary<string, System.Collections.Generic.List<Akka.Persistence.IPersistentRepresentation>> EventsByPersistenceId { get; }
protected virtual System.Threading.ReaderWriterLockSlim Lock { get; }
protected virtual object Lock { get; }
public System.Collections.Generic.IDictionary<string, System.Collections.Generic.LinkedList<Akka.Persistence.IPersistentRepresentation>> Add(Akka.Persistence.IPersistentRepresentation persistent) { }
public System.Collections.Generic.IDictionary<string, System.Collections.Generic.LinkedList<Akka.Persistence.IPersistentRepresentation>> Delete(string pid, long seqNr) { }
protected override System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken) { }
Expand Down Expand Up @@ -1185,7 +1185,7 @@ namespace Akka.Persistence.Journal
protected override System.Collections.Generic.Dictionary<string, long> DeletedTo { get; }
protected override System.Collections.Generic.List<Akka.Persistence.IPersistentRepresentation> EventLog { get; }
protected override System.Collections.Generic.Dictionary<string, System.Collections.Generic.List<Akka.Persistence.IPersistentRepresentation>> EventsByPersistenceId { get; }
protected override System.Threading.ReaderWriterLockSlim Lock { get; }
protected override object Lock { get; }
}
public struct SingleEventSequence : Akka.Persistence.Journal.IEventSequence, System.IEquatable<Akka.Persistence.Journal.IEventSequence>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,7 @@ namespace Akka.Persistence.Journal
protected virtual System.Collections.Generic.Dictionary<string, long> DeletedTo { get; }
protected virtual System.Collections.Generic.List<Akka.Persistence.IPersistentRepresentation> EventLog { get; }
protected virtual System.Collections.Generic.Dictionary<string, System.Collections.Generic.List<Akka.Persistence.IPersistentRepresentation>> EventsByPersistenceId { get; }
protected virtual System.Threading.ReaderWriterLockSlim Lock { get; }
protected virtual object Lock { get; }
public System.Collections.Generic.IDictionary<string, System.Collections.Generic.LinkedList<Akka.Persistence.IPersistentRepresentation>> Add(Akka.Persistence.IPersistentRepresentation persistent) { }
public System.Collections.Generic.IDictionary<string, System.Collections.Generic.LinkedList<Akka.Persistence.IPersistentRepresentation>> Delete(string pid, long seqNr) { }
protected override System.Threading.Tasks.Task DeleteMessagesToAsync(string persistenceId, long toSequenceNr, System.Threading.CancellationToken cancellationToken) { }
Expand Down Expand Up @@ -1184,7 +1184,7 @@ namespace Akka.Persistence.Journal
protected override System.Collections.Generic.Dictionary<string, long> DeletedTo { get; }
protected override System.Collections.Generic.List<Akka.Persistence.IPersistentRepresentation> EventLog { get; }
protected override System.Collections.Generic.Dictionary<string, System.Collections.Generic.List<Akka.Persistence.IPersistentRepresentation>> EventsByPersistenceId { get; }
protected override System.Threading.ReaderWriterLockSlim Lock { get; }
protected override object Lock { get; }
}
public struct SingleEventSequence : Akka.Persistence.Journal.IEventSequence, System.IEquatable<Akka.Persistence.Journal.IEventSequence>
{
Expand Down
16 changes: 15 additions & 1 deletion src/core/Akka.Persistence.TCK/Query/TestActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,35 @@ public TestActor(string persistenceId)

public override string PersistenceId { get; }

protected override void PreStart()
{
Log.Debug("[DIAG-ACTOR] TestActor {0} PreStart called", PersistenceId);
base.PreStart();
}

protected override void OnRecover(object message)
{
Log.Debug("[DIAG-ACTOR] TestActor {0} OnRecover: {1}", PersistenceId, message);
}

protected override void OnCommand(object message)
{
Log.Debug("[DIAG-ACTOR] TestActor {0} OnCommand received: {1}", PersistenceId, message);
switch (message)
{
case DeleteCommand delete:
DeleteMessages(delete.ToSequenceNr);
Become(WhileDeleting(Sender)); // need to wait for delete ACK to return
break;
case string cmd:
Log.Debug("[DIAG-ACTOR] TestActor {0} calling Persist for: {1}", PersistenceId, cmd);
var sender = Sender;
Persist(cmd, e => sender.Tell($"{e}-done"));
Persist(cmd, e =>
{
Log.Debug("[DIAG-ACTOR] TestActor {0} Persist callback executing for: {1}", PersistenceId, e);
sender.Tell($"{e}-done");
});
Log.Debug("[DIAG-ACTOR] TestActor {0} Persist call returned for: {1}", PersistenceId, cmd);
break;
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/core/Akka.Persistence/Journal/AsyncWriteJournal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,10 @@ private async Task ExecuteBatch(WriteMessages message, int atomicWriteCount, IAc
// try in case AsyncWriteMessages throws
try
{
// NOTE: Not using ConfigureAwait(false) to ensure continuation runs on actor's dispatcher thread
// This ensures proper synchronization context and avoids timing issues with lock contention
var writeResult =
await _breaker.WithCircuitBreaker((prepared, awj: this), (state, ct) => state.awj.WriteMessagesAsync(state.prepared, ct)).ConfigureAwait(false);
await _breaker.WithCircuitBreaker((prepared, awj: this), (state, ct) => state.awj.WriteMessagesAsync(state.prepared, ct));

ProcessResults(writeResult, atomicWriteCount, message, _resequencer, resequencerCounter, self);
}
Expand Down
Loading
Loading