Del 6 av 10

SqlStreamStore på MSSQL — & projektioner i samma databas

Vi ersätter InMemoryEventStore med en riktig event store ovanpå MSSQL och bygger en projection-worker som matar read models — allt i en och samma databas.

Lektion 6 — V3 tor. Hela skriv- och läs-infrastrukturen i en enda MSSQL-instans. En backup, en deployment, en transaktionsgräns.

Vad SqlStreamStore är

SqlStreamStore är ett open source-bibliotek (MIT-licens, James Nugent m.fl.) som ger event-stream-semantik ovanpå en relationsdatabas — MSSQL, PostgreSQL eller MySQL. Det är medvetet litet: ingen server, inga prenumerationer över nätet, ingen klustring. Bara en uppsättning tabeller och en .NET-klient.

Underhållsstatus SqlStreamStore har inte fått större releaser sedan ~2021 men är fortfarande stabil och i aktiv produktion på många ställen. Eftersom kodbasen är liten (~5 000 rader) och MIT-licensierad kan du forka den om något behöver fixas. Alternativ: Marten (Postgres), EventStoreDB/KurrentDB (egen server) eller egen handrullad event-tabell.

Schema i MSSQL

SqlStreamStore lägger tre tabeller i ditt valda schema (vi använder dbo):

TabellInnehåll
StreamsEn rad per ström: IdInternal, Id (din stream-id), Version (senaste), Position (senaste globala position).
MessagesEn rad per event: StreamIdInternal, StreamVersion, Position (global), Id, Type, JsonData, JsonMetadata, Created. Unik nyckel på (StreamIdInternal, StreamVersion) — där kommer din optimistic concurrency ifrån.
SubscriptionsFör persistenta subscriptioner (vi använder polling-baserade subscriptioner i den här kursen).

Installation och initiering

dotnet add package SqlStreamStore
dotnet add package SqlStreamStore.MsSql
var settings = new MsSqlStreamStoreV3Settings(
    builder.Configuration.GetConnectionString("MSSQL")!);

var streamStore = new MsSqlStreamStoreV3(settings);
await streamStore.CreateSchemaIfNotExists();   // körs en gång vid uppstart

builder.Services.AddSingleton<IStreamStore>(streamStore);
V3-schemat Använd alltid MsSqlStreamStoreV3 — V1/V2 är legacy. V3 har bättre prestanda och stödjer HierarchyId för all-stream-positioner.

Append — skriva events

public sealed class StreamStoreEventStore : IEventStore
{
    private readonly IStreamStore _store;
    private readonly JsonSerializerOptions _json = new(JsonSerializerDefaults.Web);

    public StreamStoreEventStore(IStreamStore store) => _store = store;

    public async Task AppendAsync(string streamId, int expectedVersion,
                                  IReadOnlyList<IDomainEvent> events, CancellationToken ct)
    {
        var messages = events.Select(e => new NewStreamMessage(
            messageId: e.EventId,
            type: e.GetType().FullName!,
            jsonData: JsonSerializer.Serialize(e, e.GetType(), _json),
            jsonMetadata: "{\"schemaVersion\":1}"
        )).ToArray();

        // expectedVersion -1 = ny ström. SqlStreamStore använder ExpectedVersion.NoStream (-1) / .Any (-2)
        try
        {
            await _store.AppendToStream(streamId, expectedVersion, messages, ct);
        }
        catch (WrongExpectedVersionException ex)
        {
            throw new ConcurrencyConflictException(streamId, expectedVersion, ex);
        }
    }
}

Versionssemantik

VärdeBetydelse
ExpectedVersion.NoStream (-1)Strömmen får inte existera. Används vid Open.
ExpectedVersion.Any (-2)Hoppa över check. Använd bara för idempotenta replays.
0, 1, 2, …Exakt vilken version som senaste skrivna event ska ha haft.

Read — ladda en ström

public async Task<IReadOnlyList<StoredEvent>> LoadAsync(string streamId, CancellationToken ct)
{
    var result = new List<StoredEvent>();
    var page = await _store.ReadStreamForwards(streamId, StreamVersion.Start, maxCount: 200, ct);

    while (page.Messages.Length > 0)
    {
        foreach (var m in page.Messages)
        {
            var json = await m.GetJsonData(ct);
            result.Add(new StoredEvent(streamId, m.StreamVersion, m.Type, json,
                                       m.JsonMetadata, m.Position, m.CreatedUtc));
        }
        if (page.IsEnd) break;
        page = await page.ReadNext(ct);
    }
    return result;
}

Projection — bygg read model i samma databas

En projection läser från all-stream (alla events i global ordning) och uppdaterar denormaliserade tabeller. Eftersom event store och read model ligger i samma MSSQL kan vi göra båda uppdateringarna i samma TransactionScope → projektionen blir exactly-once utan outbox.

CREATE TABLE dbo.AccountSummary (
    Id          UNIQUEIDENTIFIER PRIMARY KEY,
    Currency    CHAR(3)          NOT NULL,
    Balance     DECIMAL(19, 4)   NOT NULL,
    UpdatedAt   DATETIMEOFFSET   NOT NULL
);

CREATE TABLE dbo.ProjectionCheckpoints (
    ProjectionName  NVARCHAR(200) PRIMARY KEY,
    Position        BIGINT        NOT NULL,
    UpdatedAt       DATETIMEOFFSET NOT NULL
);
public sealed class AccountSummaryProjection : BackgroundService
{
    private const string Name = "AccountSummary";
    private readonly IStreamStore _store;
    private readonly string _connectionString;
    private readonly IEventTypeMap _typeMap;
    private readonly ILogger<AccountSummaryProjection> _log;

    public AccountSummaryProjection(IStreamStore store, IConfiguration cfg,
                                    IEventTypeMap typeMap, ILogger<AccountSummaryProjection> log)
    {
        _store = store;
        _connectionString = cfg.GetConnectionString("MSSQL")!;
        _typeMap = typeMap;
        _log = log;
    }

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        while (!ct.IsCancellationRequested)
        {
            var fromPosition = await ReadCheckpointAsync(ct);
            var page = await _store.ReadAllForwards(fromPosition + 1, maxCount: 100, ct);

            if (page.Messages.Length == 0)
            {
                await Task.Delay(TimeSpan.FromMilliseconds(500), ct);
                continue;
            }

            foreach (var m in page.Messages)
            {
                using var scope = new TransactionScope(
                    TransactionScopeOption.Required,
                    new TransactionOptions { IsolationLevel = IsolationLevel.ReadCommitted },
                    TransactionScopeAsyncFlowOption.Enabled);

                await using var conn = new SqlConnection(_connectionString);
                await conn.OpenAsync(ct);

                await ApplyAsync(conn, m, ct);
                await WriteCheckpointAsync(conn, m.Position, ct);

                scope.Complete();
            }
        }
    }

    private async Task ApplyAsync(SqlConnection conn, StreamMessage m, CancellationToken ct)
    {
        var clrType = _typeMap.TryResolve(m.Type);
        if (clrType is null) return;     // okänt event — hoppa över

        var json = await m.GetJsonData(ct);
        var domainEvent = (IDomainEvent)JsonSerializer.Deserialize(json, clrType)!;

        switch (domainEvent)
        {
            case AccountOpened e:
                await conn.ExecuteAsync(
                    @"INSERT INTO dbo.AccountSummary (Id, Currency, Balance, UpdatedAt)
                      VALUES (@Id, @Currency, 0, @UpdatedAt)",
                    new { Id = e.Account.Value, e.Currency, UpdatedAt = m.CreatedUtc });
                break;

            case MoneyDeposited e:
                await conn.ExecuteAsync(
                    @"UPDATE dbo.AccountSummary
                         SET Balance = Balance + @Amount, UpdatedAt = @UpdatedAt
                       WHERE Id = @Id",
                    new { Id = e.Account.Value, e.Amount, UpdatedAt = m.CreatedUtc });
                break;

            case MoneyWithdrawn e:
                await conn.ExecuteAsync(
                    @"UPDATE dbo.AccountSummary
                         SET Balance = Balance - @Amount, UpdatedAt = @UpdatedAt
                       WHERE Id = @Id",
                    new { Id = e.Account.Value, e.Amount, UpdatedAt = m.CreatedUtc });
                break;
        }
    }

    private async Task<long> ReadCheckpointAsync(CancellationToken ct)
    {
        await using var conn = new SqlConnection(_connectionString);
        return await conn.ExecuteScalarAsync<long?>(
            "SELECT Position FROM dbo.ProjectionCheckpoints WHERE ProjectionName = @n",
            new { n = Name }) ?? -1L;
    }

    private Task WriteCheckpointAsync(SqlConnection conn, long position, CancellationToken ct) =>
        conn.ExecuteAsync(
            @"MERGE dbo.ProjectionCheckpoints AS t
              USING (VALUES (@n, @p, SYSDATETIMEOFFSET())) AS s(N, P, U)
                ON t.ProjectionName = s.N
              WHEN MATCHED THEN UPDATE SET Position = s.P, UpdatedAt = s.U
              WHEN NOT MATCHED THEN INSERT (ProjectionName, Position, UpdatedAt)
                                    VALUES (s.N, s.P, s.U);",
            new { n = Name, p = position });
}
Varför det blir exactly-once TransactionScope omsluter både read model-uppdateringen och checkpoint-skrivningen. Antingen committas båda eller ingen. Vid omstart läser projektionen senaste checkpoint och fortsätter från nästa position — inga duplicates, inga förlorade events. Förutsättning: båda tabellerna ligger i samma MSSQL-databas (vilket de gör hos oss).

Frågesidan (Dapper)

public sealed class GetAccountSummaryHandler
    : IRequestHandler<GetAccountSummaryQuery, AccountSummaryDto?>
{
    private readonly string _cs;
    public GetAccountSummaryHandler(IConfiguration cfg) => _cs = cfg.GetConnectionString("MSSQL")!;

    public async Task<AccountSummaryDto?> Handle(GetAccountSummaryQuery q, CancellationToken ct)
    {
        await using var conn = new SqlConnection(_cs);
        return await conn.QuerySingleOrDefaultAsync<AccountSummaryDto>(
            @"SELECT Id, Currency, Balance, UpdatedAt FROM dbo.AccountSummary WHERE Id = @Id",
            new { Id = q.Account.Value });
    }
}

Helhetsbilden

// API tar emot kommando
// → MediatR pipeline (logg, validering, idempotency)
//   → Handler laddar aggregat via EventSourcedRepository
//     → StreamStoreEventStore.LoadAsync (läser dbo.Messages)
//   → aggregat.Deposit(...) → producerar event
//   → repository.SaveAsync
//     → StreamStoreEventStore.AppendAsync (skriver dbo.Messages)
//
// AccountSummaryProjection (BackgroundService) tickar:
//   → ReadAllForwards(checkpoint + 1)
//   → TransactionScope:
//        UPDATE dbo.AccountSummary
//        MERGE  dbo.ProjectionCheckpoints
//
// Query:
//   → API → handler → SELECT dbo.AccountSummary

Jämförelse med andra event stores

VerktygPersistensBra för
SqlStreamStoreMSSQL/Postgres/MySQLDu har redan en relations-DB och vill hålla driften enkel.
MartenPostgreSQLPostgres-stack, vill ha event sourcing + dokumentdatabas i ett.
EventStoreDB / KurrentDBEgen serverHöga skrivvolymer, behöver inbyggda projektioner, sagor, klustring.
Egen tabellVad du villFå aggregat, vill äga alla detaljer, lärande.

Sammanfattning

Referenser

Elektroniska resurser

Böcker

Övningar

Lös övningarna självständigt. Det finns inget facit — lärandet sker i processen.

  1. Sätt upp SqlStreamStore Skapa en MSSQL-databas (LocalDB eller Docker-container). Konfigurera MsSqlStreamStoreV3, kör CreateSchemaIfNotExists, append 10 events i en testström. Inspektera dbo.Streams och dbo.Messages med SSMS.
  2. Concurrency-test Skriv ett integrationstest som öppnar samma ström i två parallella tasks, försöker appenda med fel expectedVersion och verifierar att en av dem får WrongExpectedVersionException.
  3. Bygg en egen projection Skapa en RecentTransactionsProjection som håller de 50 senaste transaktionerna i en tabell dbo.RecentTransactions. Inkludera korrekt checkpoint-hantering i samma TransactionScope.
  4. Rebuild from scratch Töm dbo.AccountSummary och dbo.ProjectionCheckpoints. Starta om projektionen och verifiera att den bygger upp read model:en korrekt från Position = 0.

Soloprojektor

Projekt 1 — End-to-end pipeline Bygg en konsolapp + Web API där POST /accounts/{id}/deposit går hela vägen: command → handler → aggregat → SqlStreamStore-append → projection → GET /accounts/{id} returnerar uppdaterat saldo. Använd Testcontainers för att starta MSSQL i integrationstesterna.
Projekt 2 — Handrullad event-tabell (fördjupning) Byt ut SqlStreamStore mot en egen dbo.Events-tabell med kolumnerna i lektion 5. Implementera IEventStore direkt mot tabellen. Jämför kodmängd, prestanda och funktioner (vad förlorar du? vad vinner du?).

← Föregående: Event Sourcing Nästa: Transaktioner & concurrency →