Skip to content

Commit

Permalink
Add support for custom indexes on events tables
Browse files Browse the repository at this point in the history
  • Loading branch information
elexisvenator committed Apr 14, 2023
1 parent 02d321c commit 88b630c
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 20 deletions.
2 changes: 1 addition & 1 deletion docs/configuration/hostbuilder.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ public interface IConfigureMarten
void Configure(IServiceProvider services, StoreOptions options);
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten/MartenServiceCollectionExtensions.cs#L732-L743' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_iconfiguremarten' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten/MartenServiceCollectionExtensions.cs#L734-L745' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_iconfiguremarten' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

You could alternatively implement a custom `IConfigureMarten` class like so:
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/storeoptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class MyStoreOptions: StoreOptions
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.Testing/Examples/ConfiguringDocumentStore.cs#L203-L221' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_custom-store-options' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.Testing/Examples/ConfiguringDocumentStore.cs#L223-L241' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_custom-store-options' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

This strategy might be beneficial if you need to share Marten configuration across different applications
Expand Down
2 changes: 1 addition & 1 deletion docs/events/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var store = DocumentStore.For(opts =>
opts.Events.TenancyStyle = TenancyStyle.Conjoined;
});
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.Testing/Examples/ConfiguringDocumentStore.cs#L226-L236' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_making_the_events_multi_tenanted' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.Testing/Examples/ConfiguringDocumentStore.cs#L246-L256' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_making_the_events_multi_tenanted' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

By default, if you try to define projection with a single tenancy, Marten will throw an exception at runtime informing you about the mismatch.
Expand Down
2 changes: 1 addition & 1 deletion docs/events/multitenancy.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ var store = DocumentStore.For(opts =>
opts.Events.TenancyStyle = TenancyStyle.Conjoined;
});
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.Testing/Examples/ConfiguringDocumentStore.cs#L226-L236' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_making_the_events_multi_tenanted' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.Testing/Examples/ConfiguringDocumentStore.cs#L246-L256' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_making_the_events_multi_tenanted' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->
16 changes: 8 additions & 8 deletions docs/events/projections/aggregate-projections.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public class TripProjection: SingleStreamProjection<Trip>
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L44-L74' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_tripprojection_aggregate' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L43-L73' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_tripprojection_aggregate' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

And register that projection like this:
Expand All @@ -116,7 +116,7 @@ var store = DocumentStore.For(opts =>
opts.Projections.Add<TripProjection>(ProjectionLifecycle.Async);
});
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L17-L30' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_registering_an_aggregate_projection' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L16-L29' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_registering_an_aggregate_projection' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Any projection based on `SingleStreamProjection<T>` will allow you to define steps by event type to either create, delete, or mutate an aggregate
Expand Down Expand Up @@ -185,7 +185,7 @@ public class Trip
internal bool ShouldDelete(VacationOver e) => Traveled > 1000;
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L113-L163' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_trip_stream_aggregation' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L112-L162' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_trip_stream_aggregation' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Or finally, you can use a method named `Create()` on a projection type as shown in this sample:
Expand Down Expand Up @@ -221,7 +221,7 @@ public class TripProjection: SingleStreamProjection<Trip>
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L44-L74' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_tripprojection_aggregate' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L43-L73' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_tripprojection_aggregate' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

The `Create()` method has to return either the aggregate document type or `Task<T>` where `T` is the aggregate document type. There must be an argument for the specific event type or `Event<T>` where `T` is the event type if you need access to event metadata. You can also take in an `IQuerySession` if you need to look up additional data as part of the transformation or `IEvent` in addition to the exact event type just to get at event metadata.
Expand Down Expand Up @@ -261,7 +261,7 @@ public class TripProjection: SingleStreamProjection<Trip>
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L169-L194' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_projectevent_in_aggregate_projection' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L168-L193' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_projectevent_in_aggregate_projection' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

I'm not personally that wild about using lots of inline Lambdas like the example above, and to that end, Marten now supports the `Apply()` method convention. Here's the same `TripProjection`, but this time using methods to mutate the `Trip` document:
Expand Down Expand Up @@ -297,7 +297,7 @@ public class TripProjection: SingleStreamProjection<Trip>
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L44-L74' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_tripprojection_aggregate' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L43-L73' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_tripprojection_aggregate' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

The `Apply()` methods can accept any combination of these arguments:
Expand Down Expand Up @@ -466,7 +466,7 @@ public class Trip
internal bool ShouldDelete(VacationOver e) => Traveled > 1000;
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L113-L163' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_trip_stream_aggregation' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L112-L162' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_trip_stream_aggregation' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Here's an example of using the various ways of doing `Trip` stream aggregation:
Expand Down Expand Up @@ -500,7 +500,7 @@ internal async Task use_a_stream_aggregation()
var trip = await session.Events.AggregateStreamAsync<Trip>(tripId);
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L81-L109' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_stream_aggregation' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.AsyncDaemon.Testing/TestingSupport/TripProjectionWithCustomName.cs#L80-L108' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_using_stream_aggregation' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Aggregate Versioning
Expand Down
8 changes: 4 additions & 4 deletions docs/events/projections/live-aggregates.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ await theSession.Events.AggregateStreamAsync(
fromVersion: baseStateVersion
);
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/aggregate_stream_into_samples.cs#L141-L147' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_aggregate-stream-into-state-default' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/aggregate_stream_into_samples.cs#L140-L148' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_aggregate-stream-into-state-default' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

It can be helpful, for instance, in snapshotting. Snapshot is a state of the stream at a specific point of time (version). It is a performance optimization that shouldn't be your first choice, but it's an option to consider for performance-critical computations. As you're optimizing your processing, you usually don't want to store a snapshot after each event not to increase the number of writes. Usually, you'd like to do a snapshot on the specific interval or specific event type.
Expand Down Expand Up @@ -360,7 +360,7 @@ public class CashRegisterRepository
}
}
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/aggregate_stream_into_samples.cs#L81-L131' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_aggregate-stream-into-state-wrapper' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/aggregate_stream_into_samples.cs#L80-L130' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_aggregate-stream-into-state-wrapper' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

Then append event and store snapshot on opening accounting month:
Expand Down Expand Up @@ -390,7 +390,7 @@ var repository = new CashRegisterRepository(theSession);

await repository.Store(openedCashierShift, cashierShiftOpened);
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/aggregate_stream_into_samples.cs#L164-L188' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_aggregate-stream-into-state-store' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/aggregate_stream_into_samples.cs#L181-L205' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_aggregate-stream-into-state-store' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

and read snapshot and following event with:
Expand All @@ -400,7 +400,7 @@ and read snapshot and following event with:
```cs
var currentState = await repository.Get(financialAccountId);
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/aggregate_stream_into_samples.cs#L207-L211' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_aggregate-stream-into-state-get' title='Start of snippet'>anchor</a></sup>
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/EventSourcingTests/Aggregation/aggregate_stream_into_samples.cs#L224-L228' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_aggregate-stream-into-state-get' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->

## Live Aggregation from Linq Queries
Expand Down
25 changes: 25 additions & 0 deletions docs/events/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,28 @@ public Dictionary<string, object>? Headers { get; set; }
<!-- endSnippet -->

The full event data is available on `EventStream` and `IEvent` objects immediately after committing a transaction that involves event capture. See [diagnostics and instrumentation](/diagnostics) for more information on capturing event data in the instrumentation hooks.

## Adding indexes to event tables

Additional indexes can be added to the `mt_streams` and `mt_events` tables. These can be useful if you often need to perform queries directly against the event tables.

<!-- snippet: sample_setting_event_custom_indexes -->
<a id='snippet-sample_setting_event_custom_indexes'></a>
```cs
var store = DocumentStore.For(_ =>
{
_.Connection("some connection string");

// Add an index to the mt_streams table on "is_archived"
_.Events.AddIndexToStreamsTable(
new IndexDefinition("idx_mt_streams_is_archived")
.AgainstColumns("is_archived"));

// Add an index to the mt_events table on "type"
_.Events.AddIndexToEventsTable(
new IndexDefinition("idx_mt_events_is_type")
.AgainstColumns("type"));
});
```
<sup><a href='https://github.com/JasperFx/marten/blob/master/src/Marten.Testing/Examples/ConfiguringDocumentStore.cs#L205-L220' title='Snippet source file'>snippet source</a> | <a href='#snippet-sample_setting_event_custom_indexes' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->
50 changes: 50 additions & 0 deletions src/EventSourcingTests/EventStoreCustomIndexesTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
using System.Linq;
using System.Threading.Tasks;
using Marten.Events;
using Marten.Testing;
using Marten.Testing.Harness;
using Weasel.Postgresql.Tables;
using Xunit;

namespace EventSourcingTests;

public class EventStoreCustomIndexesTests : OneOffConfigurationsContext
{
[Fact]
public async Task can_create_custom_indexes_on_event_tables()
{
const string streamsTypeIndexName = "idx_mt_streams_type";
const string eventsDataIndexName = "idx_mt_events_data_gin";
StoreOptions(options =>
{
var streamsTypeIndex = new IndexDefinition(streamsTypeIndexName).AgainstColumns("type");
options.Events.AddIndexToStreamsTable(streamsTypeIndex);

var eventsDataIndex = new IndexDefinition(eventsDataIndexName).AgainstColumns("data");
eventsDataIndex.Method = IndexMethod.gin;
options.Events.AddIndexToEventsTable(eventsDataIndex);
});

await theStore.EnsureStorageExistsAsync(typeof(IEvent));

Assert.True(await CheckIfIndexExists("mt_streams", streamsTypeIndexName));
Assert.True(await CheckIfIndexExists("mt_events", eventsDataIndexName));
}

private async Task<bool> CheckIfIndexExists(string tableName, string indexName)
{
var exists = await theSession.QueryAsync<bool>(@"
select exists(
select 1
from pg_catalog.pg_indexes
where schemaname = ?
and tablename = ?
and indexname = ?
)",
_schemaName,
tableName,
indexName);

return exists.FirstOrDefault(false);
}
}
24 changes: 22 additions & 2 deletions src/Marten.Testing/Examples/ConfiguringDocumentStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using Marten.Testing.Harness;
using Newtonsoft.Json;
using Weasel.Core;
using Weasel.Postgresql;
using Weasel.Postgresql.Tables;

namespace Marten.Testing.Examples;
// Leave this commented out please, and always use the User
Expand Down Expand Up @@ -200,6 +200,26 @@ public void setting_event_schema()
#endregion
}

public void setting_event_custom_indexes()
{
#region sample_setting_event_custom_indexes
var store = DocumentStore.For(_ =>
{
_.Connection("some connection string");

// Add an index to the mt_streams table on "is_archived"
_.Events.AddIndexToStreamsTable(
new IndexDefinition("idx_mt_streams_is_archived")
.AgainstColumns("is_archived"));

// Add an index to the mt_events table on "type"
_.Events.AddIndexToEventsTable(
new IndexDefinition("idx_mt_events_is_type")
.AgainstColumns("type"));
});
#endregion
}

#region sample_custom-store-options
public class MyStoreOptions: StoreOptions
{
Expand Down Expand Up @@ -235,4 +255,4 @@ public void set_multi_tenancy_on_events()

#endregion
}
}
}
9 changes: 7 additions & 2 deletions src/Marten/Events/EventGraph.FeatureSchema.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
using System;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using JasperFx.Core;
using Marten.Events.Archiving;
using Marten.Events.Daemon;
using Marten.Events.Projections;
Expand Down Expand Up @@ -38,8 +39,12 @@ void IFeatureSchema.WritePermissions(Migrator rules, TextWriter writer)

private IEnumerable<ISchemaObject> createAllSchemaObjects()
{
yield return new StreamsTable(this);
var streamsTable = new StreamsTable(this);
streamsTable.Indexes.AddRange(_customStreamsTableIndexes);
yield return streamsTable;

var eventsTable = new EventsTable(this);
eventsTable.Indexes.AddRange(_customEventsTableIndexes);
yield return eventsTable;

#region sample_using-sequence
Expand Down
16 changes: 16 additions & 0 deletions src/Marten/Events/EventGraph.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using Marten.Util;
using NpgsqlTypes;
using Weasel.Core;
using Weasel.Postgresql.Tables;
using static Marten.Events.EventMappingExtensions;

namespace Marten.Events;
Expand Down Expand Up @@ -217,6 +218,21 @@ public IEventStoreOptions Upcast(params IEventUpcaster[] upcasters)
return this;
}

private readonly IList<IndexDefinition> _customEventsTableIndexes = new List<IndexDefinition>();
private readonly IList<IndexDefinition> _customStreamsTableIndexes = new List<IndexDefinition>();

public IEventStoreOptions AddIndexToEventsTable(IndexDefinition index)
{
_customEventsTableIndexes.Add(index);
return this;
}

public IEventStoreOptions AddIndexToStreamsTable(IndexDefinition index)
{
_customStreamsTableIndexes.Add(index);
return this;
}

/// <summary>
/// Override the database schema name for event related tables. By default this
/// is the same schema as the document storage
Expand Down
Loading

0 comments on commit 88b630c

Please sign in to comment.