Vi ersätter InMemoryEventStore med en riktig event store ovanpå MSSQL och bygger en projection-worker som matar read models — allt i en och samma databas.
SqlStreamStore är ett open source-bibliotek (MIT-licens, James Nugent m.fl.) som ger event-stream-semantik ovanpå en relationsdatabas — MSSQL, PostgreSQL eller MySQL. Det är medvetet litet: ingen server, inga prenumerationer över nätet, ingen klustring. Bara en uppsättning tabeller och en .NET-klient.
SqlStreamStore lägger tre tabeller i ditt valda schema (vi använder dbo):
| Tabell | Innehåll |
|---|---|
Streams | En rad per ström: IdInternal, Id (din stream-id), Version (senaste), Position (senaste globala position). |
Messages | En rad per event: StreamIdInternal, StreamVersion, Position (global), Id, Type, JsonData, JsonMetadata, Created. Unik nyckel på (StreamIdInternal, StreamVersion) — där kommer din optimistic concurrency ifrån. |
Subscriptions | För persistenta subscriptioner (vi använder polling-baserade subscriptioner i den här kursen). |
dotnet add package SqlStreamStore
dotnet add package SqlStreamStore.MsSql
var settings = new MsSqlStreamStoreV3Settings(
builder.Configuration.GetConnectionString("MSSQL")!);
var streamStore = new MsSqlStreamStoreV3(settings);
await streamStore.CreateSchemaIfNotExists(); // körs en gång vid uppstart
builder.Services.AddSingleton<IStreamStore>(streamStore);
MsSqlStreamStoreV3 — V1/V2 är legacy. V3 har bättre prestanda och stödjer HierarchyId för all-stream-positioner.
public sealed class StreamStoreEventStore : IEventStore
{
private readonly IStreamStore _store;
private readonly JsonSerializerOptions _json = new(JsonSerializerDefaults.Web);
public StreamStoreEventStore(IStreamStore store) => _store = store;
public async Task AppendAsync(string streamId, int expectedVersion,
IReadOnlyList<IDomainEvent> events, CancellationToken ct)
{
var messages = events.Select(e => new NewStreamMessage(
messageId: e.EventId,
type: e.GetType().FullName!,
jsonData: JsonSerializer.Serialize(e, e.GetType(), _json),
jsonMetadata: "{\"schemaVersion\":1}"
)).ToArray();
// expectedVersion -1 = ny ström. SqlStreamStore använder ExpectedVersion.NoStream (-1) / .Any (-2)
try
{
await _store.AppendToStream(streamId, expectedVersion, messages, ct);
}
catch (WrongExpectedVersionException ex)
{
throw new ConcurrencyConflictException(streamId, expectedVersion, ex);
}
}
}
| Värde | Betydelse |
|---|---|
ExpectedVersion.NoStream (-1) | Strömmen får inte existera. Används vid Open. |
ExpectedVersion.Any (-2) | Hoppa över check. Använd bara för idempotenta replays. |
| 0, 1, 2, … | Exakt vilken version som senaste skrivna event ska ha haft. |
public async Task<IReadOnlyList<StoredEvent>> LoadAsync(string streamId, CancellationToken ct)
{
var result = new List<StoredEvent>();
var page = await _store.ReadStreamForwards(streamId, StreamVersion.Start, maxCount: 200, ct);
while (page.Messages.Length > 0)
{
foreach (var m in page.Messages)
{
var json = await m.GetJsonData(ct);
result.Add(new StoredEvent(streamId, m.StreamVersion, m.Type, json,
m.JsonMetadata, m.Position, m.CreatedUtc));
}
if (page.IsEnd) break;
page = await page.ReadNext(ct);
}
return result;
}
En projection läser från all-stream (alla events i global ordning) och uppdaterar denormaliserade tabeller. Eftersom event store och read model ligger i samma MSSQL kan vi göra båda uppdateringarna i samma TransactionScope → projektionen blir exactly-once utan outbox.
CREATE TABLE dbo.AccountSummary (
Id UNIQUEIDENTIFIER PRIMARY KEY,
Currency CHAR(3) NOT NULL,
Balance DECIMAL(19, 4) NOT NULL,
UpdatedAt DATETIMEOFFSET NOT NULL
);
CREATE TABLE dbo.ProjectionCheckpoints (
ProjectionName NVARCHAR(200) PRIMARY KEY,
Position BIGINT NOT NULL,
UpdatedAt DATETIMEOFFSET NOT NULL
);
public sealed class AccountSummaryProjection : BackgroundService
{
private const string Name = "AccountSummary";
private readonly IStreamStore _store;
private readonly string _connectionString;
private readonly IEventTypeMap _typeMap;
private readonly ILogger<AccountSummaryProjection> _log;
public AccountSummaryProjection(IStreamStore store, IConfiguration cfg,
IEventTypeMap typeMap, ILogger<AccountSummaryProjection> log)
{
_store = store;
_connectionString = cfg.GetConnectionString("MSSQL")!;
_typeMap = typeMap;
_log = log;
}
protected override async Task ExecuteAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
var fromPosition = await ReadCheckpointAsync(ct);
var page = await _store.ReadAllForwards(fromPosition + 1, maxCount: 100, ct);
if (page.Messages.Length == 0)
{
await Task.Delay(TimeSpan.FromMilliseconds(500), ct);
continue;
}
foreach (var m in page.Messages)
{
using var scope = new TransactionScope(
TransactionScopeOption.Required,
new TransactionOptions { IsolationLevel = IsolationLevel.ReadCommitted },
TransactionScopeAsyncFlowOption.Enabled);
await using var conn = new SqlConnection(_connectionString);
await conn.OpenAsync(ct);
await ApplyAsync(conn, m, ct);
await WriteCheckpointAsync(conn, m.Position, ct);
scope.Complete();
}
}
}
private async Task ApplyAsync(SqlConnection conn, StreamMessage m, CancellationToken ct)
{
var clrType = _typeMap.TryResolve(m.Type);
if (clrType is null) return; // okänt event — hoppa över
var json = await m.GetJsonData(ct);
var domainEvent = (IDomainEvent)JsonSerializer.Deserialize(json, clrType)!;
switch (domainEvent)
{
case AccountOpened e:
await conn.ExecuteAsync(
@"INSERT INTO dbo.AccountSummary (Id, Currency, Balance, UpdatedAt)
VALUES (@Id, @Currency, 0, @UpdatedAt)",
new { Id = e.Account.Value, e.Currency, UpdatedAt = m.CreatedUtc });
break;
case MoneyDeposited e:
await conn.ExecuteAsync(
@"UPDATE dbo.AccountSummary
SET Balance = Balance + @Amount, UpdatedAt = @UpdatedAt
WHERE Id = @Id",
new { Id = e.Account.Value, e.Amount, UpdatedAt = m.CreatedUtc });
break;
case MoneyWithdrawn e:
await conn.ExecuteAsync(
@"UPDATE dbo.AccountSummary
SET Balance = Balance - @Amount, UpdatedAt = @UpdatedAt
WHERE Id = @Id",
new { Id = e.Account.Value, e.Amount, UpdatedAt = m.CreatedUtc });
break;
}
}
private async Task<long> ReadCheckpointAsync(CancellationToken ct)
{
await using var conn = new SqlConnection(_connectionString);
return await conn.ExecuteScalarAsync<long?>(
"SELECT Position FROM dbo.ProjectionCheckpoints WHERE ProjectionName = @n",
new { n = Name }) ?? -1L;
}
private Task WriteCheckpointAsync(SqlConnection conn, long position, CancellationToken ct) =>
conn.ExecuteAsync(
@"MERGE dbo.ProjectionCheckpoints AS t
USING (VALUES (@n, @p, SYSDATETIMEOFFSET())) AS s(N, P, U)
ON t.ProjectionName = s.N
WHEN MATCHED THEN UPDATE SET Position = s.P, UpdatedAt = s.U
WHEN NOT MATCHED THEN INSERT (ProjectionName, Position, UpdatedAt)
VALUES (s.N, s.P, s.U);",
new { n = Name, p = position });
}
TransactionScope omsluter både read model-uppdateringen och checkpoint-skrivningen. Antingen committas båda eller ingen. Vid omstart läser projektionen senaste checkpoint och fortsätter från nästa position — inga duplicates, inga förlorade events. Förutsättning: båda tabellerna ligger i samma MSSQL-databas (vilket de gör hos oss).
public sealed class GetAccountSummaryHandler
: IRequestHandler<GetAccountSummaryQuery, AccountSummaryDto?>
{
private readonly string _cs;
public GetAccountSummaryHandler(IConfiguration cfg) => _cs = cfg.GetConnectionString("MSSQL")!;
public async Task<AccountSummaryDto?> Handle(GetAccountSummaryQuery q, CancellationToken ct)
{
await using var conn = new SqlConnection(_cs);
return await conn.QuerySingleOrDefaultAsync<AccountSummaryDto>(
@"SELECT Id, Currency, Balance, UpdatedAt FROM dbo.AccountSummary WHERE Id = @Id",
new { Id = q.Account.Value });
}
}
// API tar emot kommando
// → MediatR pipeline (logg, validering, idempotency)
// → Handler laddar aggregat via EventSourcedRepository
// → StreamStoreEventStore.LoadAsync (läser dbo.Messages)
// → aggregat.Deposit(...) → producerar event
// → repository.SaveAsync
// → StreamStoreEventStore.AppendAsync (skriver dbo.Messages)
//
// AccountSummaryProjection (BackgroundService) tickar:
// → ReadAllForwards(checkpoint + 1)
// → TransactionScope:
// UPDATE dbo.AccountSummary
// MERGE dbo.ProjectionCheckpoints
//
// Query:
// → API → handler → SELECT dbo.AccountSummary
| Verktyg | Persistens | Bra för |
|---|---|---|
| SqlStreamStore | MSSQL/Postgres/MySQL | Du har redan en relations-DB och vill hålla driften enkel. |
| Marten | PostgreSQL | Postgres-stack, vill ha event sourcing + dokumentdatabas i ett. |
| EventStoreDB / KurrentDB | Egen server | Höga skrivvolymer, behöver inbyggda projektioner, sagor, klustring. |
| Egen tabell | Vad du vill | Få aggregat, vill äga alla detaljer, lärande. |
Streams, Messages).(StreamIdInternal, StreamVersion).ReadAllForwards + checkpoint-tabell + TransactionScope = exactly-once-projektioner när allt ligger i samma databas.OPENJSON/JSON_VALUE.Lös övningarna självständigt. Det finns inget facit — lärandet sker i processen.
MsSqlStreamStoreV3, kör CreateSchemaIfNotExists, append 10 events i en testström. Inspektera dbo.Streams och dbo.Messages med SSMS.expectedVersion och verifierar att en av dem får WrongExpectedVersionException.RecentTransactionsProjection som håller de 50 senaste transaktionerna i en tabell dbo.RecentTransactions. Inkludera korrekt checkpoint-hantering i samma TransactionScope.dbo.AccountSummary och dbo.ProjectionCheckpoints. Starta om projektionen och verifiera att den bygger upp read model:en korrekt från Position = 0.POST /accounts/{id}/deposit går hela vägen: command → handler → aggregat → SqlStreamStore-append → projection → GET /accounts/{id} returnerar uppdaterat saldo. Använd Testcontainers för att starta MSSQL i integrationstesterna.
dbo.Events-tabell med kolumnerna i lektion 5. Implementera IEventStore direkt mot tabellen. Jämför kodmängd, prestanda och funktioner (vad förlorar du? vad vinner du?).
← Föregående: Event Sourcing Nästa: Transaktioner & concurrency →