Skip to content

Commit

Permalink
backporting improvements from feat/index-safes
Browse files Browse the repository at this point in the history
  • Loading branch information
jaensen committed Sep 30, 2024
1 parent 99ebcab commit a7f2700
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 42 deletions.
5 changes: 3 additions & 2 deletions Circles.Index.Common/EventSchema.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class EventSchema(string @namespace, string table, byte[] topic, List<Eve

public string Namespace { get; } = @namespace;
public byte[] Topic { get; } = topic;
public string Table { get; } = table;
public string Table { get; set; } = table;
public List<EventFieldSchema> Columns { get; } = columns;

/// <summary>
Expand All @@ -37,7 +37,7 @@ public static EventSchema FromSolidity(string @namespace, string solidityEventDe
const string prefix = "event ";
if (!trimmedDefinition.StartsWith(prefix))
{
throw new ArgumentException($"Invalid event definition. Must start with '${prefix}'.");
throw new ArgumentException($"Invalid event definition. Must start with '{prefix}'.");
}

var eventDefinition = trimmedDefinition.Substring(prefix.Length);
Expand Down Expand Up @@ -135,6 +135,7 @@ private static ValueTypes MapSolidityType(string type)
"uint256" => ValueTypes.BigInt,
"string" => ValueTypes.String,
"bool" => ValueTypes.Boolean,
"address[]" => ValueTypes.AddressArray,
"CirclesType" => ValueTypes.Int,
_ => throw new ArgumentException(
$"'{type}' is not supported. Please handle this event manually.")
Expand Down
44 changes: 22 additions & 22 deletions Circles.Index.Common/Sink.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System.Collections.Concurrent;
using System.Text;

namespace Circles.Index.Common;

Expand All @@ -12,18 +12,10 @@ public class Sink(

public readonly IDatabase Database = database;

private readonly ConcurrentDictionary<Type, long> _addedEventCounts = new();
private readonly ConcurrentDictionary<Type, long> _importedEventCounts = new();

public async Task AddEvent(object indexEvent)
{
_insertBuffer.Add(indexEvent);

_addedEventCounts.AddOrUpdate(
indexEvent.GetType(),
1,
(_, count) => count + 1);

if (_insertBuffer.Length >= batchSize)
{
await Flush();
Expand Down Expand Up @@ -56,20 +48,28 @@ public async Task Flush()
return Task.CompletedTask;
}

var task = Database.WriteBatch(tableId.Namespace, tableId.Table, o.Value, schemaPropertyMap);

return task.ContinueWith(p =>
{
if (p.Exception != null)
return Database.WriteBatch(tableId.Namespace, tableId.Table, o.Value, schemaPropertyMap)
.ContinueWith(t =>
{
throw p.Exception;
}

_importedEventCounts.AddOrUpdate(
o.Key,
o.Value.Count,
(_, count) => count + o.Value.Count);
});
if (t.Exception != null)
{
var e = t.Exception.Flatten();
e.Handle(ex =>
{
var sb = new StringBuilder();
sb.AppendLine($"Error writing batch to {tableId.Namespace}_{tableId.Table}");
sb.AppendLine($"Data: {o.Value.Count} rows:");
for (int i = 0; i < o.Value.Count; i++)
{
sb.AppendLine($"- {i:0000}: {o.Value[i]})");
}

sb.AppendLine(ex.ToString());
Console.WriteLine(sb.ToString());
return true;
});
}
});
});

await Task.WhenAll(tasks);
Expand Down
3 changes: 2 additions & 1 deletion Circles.Index.Common/ValueTypes.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ public enum ValueTypes
Int = 2,
BigInt = 3,
String = 4,
Bytes = 5
Bytes = 5,
AddressArray = 6,
}
2 changes: 2 additions & 0 deletions Circles.Index.Postgres/PostgresDb.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ private string GetSqlType(ValueTypes type)
ValueTypes.Address => "TEXT",
ValueTypes.Boolean => "BOOLEAN",
ValueTypes.Bytes => "BYTEA",
ValueTypes.AddressArray => "TEXT[]",
_ => throw new ArgumentException("Unsupported type")
};
}
Expand All @@ -195,6 +196,7 @@ private NpgsqlDbType GetNpgsqlDbType(ValueTypes type)
ValueTypes.Address => NpgsqlDbType.Text,
ValueTypes.Boolean => NpgsqlDbType.Boolean,
ValueTypes.Bytes => NpgsqlDbType.Bytea,
ValueTypes.AddressArray => NpgsqlDbType.Array | NpgsqlDbType.Text,
_ => throw new ArgumentException("Unsupported type")
};
}
Expand Down
35 changes: 35 additions & 0 deletions Circles.Index.Query.Tests/TestConversionUtils.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
using Circles.Index.Utils;

namespace Circles.Index.Query.Tests;

public class TestConversionUtils
{
// [SetUp]
// public void Setup()
// {
// }
//
// [Test]
// public void ConvertCrcToCircles()
// {
// ConversionUtils.CrcToCircles();
// }
//
// [Test]
// public void ConvertCirclesToCrc()
// {
// ConversionUtils.CirclesToCrc();
// }
//
// [Test]
// public void ConvertCirclesToStaticCircles()
// {
// ConversionUtils.CirclesToStaticCircles();
// }
//
// [Test]
// public void ConvertStaticCirclesToCircles()
// {
// ConversionUtils.StaticCirclesToCircles();
// }
}
21 changes: 4 additions & 17 deletions Circles.Index/IndexPerformanceMetrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ public class IndexPerformanceMetrics
private long _startProcessTimestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
private long _startPeriodTimestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds();

private long _totalGasUsedInPeriod = 0;
private long _totalGasUsed = 0;

private long _totalTransactionsInPeriod = 0;
private long _totalTransactions = 0;

Expand All @@ -27,14 +24,11 @@ public IndexPerformanceMetrics()
{
var metrics = GetAveragesOverLastPeriod();
var output = string.Format(CultureInfo.CurrentCulture,
"[Metrics] Time: {0,-10} {1,-14} {2,9:n0} ({3,15:000,000,000,000}), {4,-16} {5,9:n0} ({6,15:000,000,000,000}), {7,-14} {8,9:n0} ({9,15:000,000,000,000}), {10,6} {11,9:n0} ({12,15:000,000,000,000})",
"[Metrics] Time: {0,-10} {1,-14} {2,9:n0} ({3,15:000,000,000,000}), {4,-16} {5,9:n0} ({6,15:000,000,000,000}), {7,-14} {8,9:n0} ({9,15:000,000,000,000})",
DateTimeOffset.UtcNow.ToUnixTimeSeconds() - _startProcessTimestamp,
"Blocks/s:", metrics.BlocksPerSecond, _totalBlocks,
"Transactions/s:", metrics.TransactionsPerSecond, _totalTransactions,
"Logs/s:", metrics.LogsPerSecond, _totalLogs,
"MGas/s:", metrics.GasUsedPerSecond / 1000000, _totalGasUsed / 1000000L);

// File.AppendAllLines("logs/circles-indexer.log", new[] { output });
"Logs/s:", metrics.LogsPerSecond, _totalLogs);

Console.WriteLine(output);
}, null, TimeSpan.Zero, TimeSpan.FromSeconds(5));
Expand All @@ -46,10 +40,6 @@ public void LogBlockWithReceipts(BlockWithReceipts blockWithReceipts)
Interlocked.Add(ref _totalLogsInPeriod, logs);
Interlocked.Add(ref _totalLogs, logs);

var gasUsed = blockWithReceipts.Block.Transactions.Sum(tx => tx.GasLimit);
Interlocked.Add(ref _totalGasUsedInPeriod, gasUsed);
Interlocked.Add(ref _totalGasUsed, gasUsed);

var transactions = blockWithReceipts.Block.Transactions.Length;
Interlocked.Add(ref _totalTransactionsInPeriod, transactions);
Interlocked.Add(ref _totalTransactions, transactions);
Expand All @@ -58,16 +48,13 @@ public void LogBlockWithReceipts(BlockWithReceipts blockWithReceipts)
Interlocked.Increment(ref _totalBlocks);
}

private (double GasUsedPerSecond, double TransactionsPerSecond, double BlocksPerSecond, double LogsPerSecond)
private (double TransactionsPerSecond, double BlocksPerSecond, double LogsPerSecond)
GetAveragesOverLastPeriod()
{
long currentTimestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds();
double elapsedSecondsInPeriod = currentTimestamp - _startPeriodTimestamp;
elapsedSecondsInPeriod = Math.Max(elapsedSecondsInPeriod, 1);

double gasUsedPerSecond = Interlocked.Read(ref _totalGasUsedInPeriod) / elapsedSecondsInPeriod;
Interlocked.Exchange(ref _totalGasUsedInPeriod, 0);

double transactionsPerSecond =
Interlocked.Read(ref _totalTransactionsInPeriod) / elapsedSecondsInPeriod;
Interlocked.Exchange(ref _totalTransactionsInPeriod, 0);
Expand All @@ -80,7 +67,7 @@ public void LogBlockWithReceipts(BlockWithReceipts blockWithReceipts)

_startPeriodTimestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds();

return (GasUsedPerSecond: gasUsedPerSecond, TransactionsPerSecond: transactionsPerSecond,
return (TransactionsPerSecond: transactionsPerSecond,
BlocksPerSecond: blocksPerSecond, LogsPerSecond: logsPerSecond);
}
}

0 comments on commit a7f2700

Please sign in to comment.