Del 9 av 10

Snapshots & sagas — för långa strömmar och fleraggregatsflöden

Vi tar oss an två avancerade ämnen: hur vi undviker att spela upp tusentals events vid varje load (snapshots), och hur vi koordinerar flöden över flera aggregat utan distribuerade transaktioner (sagas).

Lektion 9 — V5 mån. Här polerar vi infrastrukturen till produktionskvalitet.

När behövs snapshots?

Ett aggregat med 50 events laddas på under 10 ms. Ett aggregat med 50 000 events tar sekunder. När en ström blir så lång att rehydreringen syns i svarstiderna är det dags för en snapshot.

Ströms storlekStrategi
< 1 000 eventsIngen snapshot behövs. Rehydrering är instant.
1 000 – 10 000Överväg snapshots var 500:e event.
> 10 000Snapshots är obligatoriska. Var 100–500:e event beroende på event-storlek.

Snapshot-tabell i MSSQL

CREATE TABLE dbo.Snapshots (
    StreamId    NVARCHAR(200)  NOT NULL,
    Version     INT            NOT NULL,
    Type        NVARCHAR(400)  NOT NULL,
    PayloadJson NVARCHAR(MAX)  NOT NULL,
    CreatedAt   DATETIMEOFFSET NOT NULL,
    CONSTRAINT PK_Snapshots PRIMARY KEY (StreamId, Version)
);

CREATE INDEX IX_Snapshots_StreamId_VersionDesc
    ON dbo.Snapshots (StreamId, Version DESC);

Snapshot-stöd i aggregatet

public interface ISnapshotable<TSnapshot>
{
    TSnapshot CreateSnapshot();
    void RestoreFromSnapshot(TSnapshot snapshot);
}

public sealed record AccountSnapshot(
    Guid Id,
    string Currency,
    decimal Balance,
    AccountState State,
    int Version);

public partial class Account : ISnapshotable<AccountSnapshot>
{
    public AccountSnapshot CreateSnapshot() =>
        new(Id.Value, Balance.Currency, Balance.Amount, _state, Version);

    public void RestoreFromSnapshot(AccountSnapshot s)
    {
        Id = new AccountId(s.Id);
        Balance = new Money(s.Balance, s.Currency);
        _state = s.State;
        SetVersion(s.Version);   // exponerad protected setter på AggregateRoot
    }
}

Repository med snapshot-stöd

public async Task<Account?> LoadAsync(AccountId id, CancellationToken ct)
{
    var streamId = $"account-{id.Value}";

    // 1) Försök ladda senaste snapshot
    var snapshot = await _snapshotStore.LoadLatestAsync<AccountSnapshot>(streamId, ct);
    var account = new Account();
    var fromVersion = StreamVersion.Start;

    if (snapshot is not null)
    {
        account.RestoreFromSnapshot(snapshot.Payload);
        fromVersion = snapshot.Version + 1;     // läs bara events EFTER snapshot
    }

    // 2) Läs återstående events från strömmen
    var events = await _eventStore.LoadFromAsync(streamId, fromVersion, ct);
    if (snapshot is null && events.Count == 0) return null;

    account.LoadFromHistory(events.Select(_typeMap.Deserialize));
    return account;
}

public async Task SaveAsync(Account account, CancellationToken ct)
{
    await _eventStore.AppendAsync(/* ... */);

    // Snapshot var 100:e event
    if (account.Version % 100 == 0)
        await _snapshotStore.SaveAsync($"account-{account.Id.Value}",
                                        account.Version,
                                        account.CreateSnapshot(), ct);
}
Snapshots är cache — inte sanning Strömmen är den auktoritativa sanningen. Snapshots kan när som helst raderas och systemet ska byggas upp igen från events. Det betyder att AccountSnapshot-formatet kan ändras utan migration — bara radera tabellen och låt nästa load skapa nya snapshots.

Projection rebuild — bygg om read modeller från noll

När du lägger till en ny read model eller upptäcker en bug i en projektion vill du köra om hela historiken. Eftersom event store-strömmen är auktoritativ är detta alltid möjligt.

public async Task RebuildAsync(CancellationToken ct)
{
    // 1) Töm read model och checkpoint
    await using var conn = new SqlConnection(_cs);
    await conn.ExecuteAsync("DELETE FROM dbo.AccountSummary");
    await conn.ExecuteAsync("DELETE FROM dbo.ProjectionCheckpoints WHERE ProjectionName = 'AccountSummary'");

    // 2) Låt vanliga projection-loopen ta vid — den läser från position 0 igen
    _log.LogInformation("Rebuild scheduled. BackgroundService will catch up.");
}
Zero-downtime rebuild Vid live-rebuild: skapa en kopia av tabellen (dbo.AccountSummary_v2), projicera till kopian, byt namn när den hunnit ikapp. Inga läsare ser ett tomt tillstånd under bygget.

Sagas — flöden över flera aggregat

Vissa affärsoperationer berör flera aggregat. Klassiskt exempel: "överför pengar från konto A till konto B" — två skrivningar, två strömmar, behöver vara en logisk enhet trots att vi inte har distribuerade transaktioner.

Lösningen: en saga (även kallad process manager) lyssnar på events och skickar nästa command när rätt steg är klart. Vid fel: skicka compensating commands i omvänd ordning.

Lyckad saga

// 1. Användare:        POST /transfers
// 2. TransferAggregate.Start  → TransferStarted (egen stream)
// 3. Saga reagerar:    SendCommand(WithdrawMoneyCommand, account A)
// 4. Account A:        Withdraw → MoneyWithdrawn
// 5. Saga reagerar:    SendCommand(DepositMoneyCommand, account B)
// 6. Account B:        Deposit → MoneyDeposited
// 7. Saga reagerar:    SendCommand(CompleteTransferCommand)
// 8. TransferAggregate.Complete → TransferCompleted

Misslyckad saga (deposit på B fail)

// 5'. Account B:       Deposit → kastar (kontot stängt)
// 6'. Saga reagerar:   SendCommand(RevertWithdrawalCommand, account A)
// 7'. Account A:       RevertWithdrawal → MoneyWithdrawalReverted
// 8'. Saga reagerar:   SendCommand(FailTransferCommand, reason)
// 9'. TransferAggregate.Fail → TransferFailed

Sagaimplementation

public sealed class MoneyTransferSaga : BackgroundService
{
    private readonly IStreamStore _store;
    private readonly AggregateCommandQueue _queue;
    private readonly IEventTypeMap _typeMap;
    private readonly string _cs;
    private const string Name = "MoneyTransferSaga";

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        while (!ct.IsCancellationRequested)
        {
            var fromPos = await ReadCheckpointAsync(ct);
            var page = await _store.ReadAllForwards(fromPos + 1, 100, ct);

            if (page.Messages.Length == 0)
            {
                await Task.Delay(500, ct);
                continue;
            }

            foreach (var m in page.Messages)
            {
                var clrType = _typeMap.TryResolve(m.Type);
                if (clrType is null) { await WriteCheckpointAsync(m.Position, ct); continue; }

                var json = await m.GetJsonData(ct);
                var evt = (IDomainEvent)JsonSerializer.Deserialize(json, clrType)!;

                switch (evt)
                {
                    case TransferStarted s:
                        await _queue.EnqueueAsync(new WithdrawMoneyCommand(
                            Guid.NewGuid(), s.FromAccount, s.Amount, s.Currency,
                            $"Transfer {s.TransferId}"), ct);
                        break;

                    case MoneyWithdrawn w when w.Reference.StartsWith("Transfer "):
                        var transferId = ExtractTransferId(w.Reference);
                        var toAccount = await LookupTargetAsync(transferId, ct);
                        await _queue.EnqueueAsync(new DepositMoneyCommand(
                            Guid.NewGuid(), toAccount, w.Amount, w.Currency,
                            $"Transfer {transferId}"), ct);
                        break;

                    case MoneyDeposited d when d.Reference.StartsWith("Transfer "):
                        var tid = ExtractTransferId(d.Reference);
                        await _queue.EnqueueAsync(new CompleteTransferCommand(
                            Guid.NewGuid(), tid), ct);
                        break;
                }

                await WriteCheckpointAsync(m.Position, ct);
            }
        }
    }
}
Saga = en projektion som skickar commands Strukturellt är en saga nästan identisk med en read model-projektion: läs all-stream → reagera på events → uppdatera tillstånd. Skillnaden är att i stället för att skriva till en read model-tabell, skickas nya commands. Samma checkpoint-mönster, samma exactly-once-semantik om vi också persisterar saga-tillstånd i samma transaktion.
Sagas är svåra Ett system med > 3 sagas blir snabbt svårt att debugga. Innan du tar in en saga: kan flödet modelleras som ett enda aggregat (transferaggregatet med interna steg) i stället? Aggregat ger atomicitet utan komplexitet.

Projection-typer — en översikt

TypOutputExempel
Read modelTabell för queriesAccountSummary
Process manager (saga)Nya commandsMoneyTransferSaga
NotifierE-post, push, webhookLowBalanceNotifier
ReactorInterna sidoeffekterRecalculateCreditScore

Sammanfattning

Referenser

Elektroniska resurser

Böcker

Övningar

Lös övningarna självständigt. Det finns inget facit — lärandet sker i processen.

  1. Implementera SnapshotStore Skapa SqlSnapshotStore mot tabellen i exemplet. Skriv ett integrationstest som genererar 500 events, verifierar att snapshot tas vid version 100, 200, 300, 400, 500 och att load endast läser snapshot + senaste events efter den.
  2. Rebuild-test Generera 100 events. Töm AccountSummary. Starta projection-workern. Verifiera att tabellen byggs upp identisk med innan tömningen.
  3. Transfer-saga Implementera Transfer-aggregat (events: TransferStarted, TransferCompleted, TransferFailed) och MoneyTransferSaga. Skriv två integrationstester: (1) lyckad transfer, (2) destination-konto stängt → kompensation körs.
  4. Zero-downtime rebuild Designa en strategi för att bygga om en stor read model utan nedtid: skapa _v2-tabell, projicera, byt namn atomiskt. Skriv ned stegen som en runbook (1 sida).

Soloprojektor

Projekt 1 — Bank med transfers Bygg ut bankkontoexemplet med en Transfer-aggregat och saga. Lägg också till snapshots för konton (var 100:e event). UI: ett enkelt formulär i Blazor för att skicka transfers och se historiken. Integrationstester med Testcontainers MSSQL.
Projekt 2 — Saga visualizer (fördjupning) Bygg en Razor-sida som visar sagas tillstånd i realtid. För varje aktiv saga: visa vilket steg den är på, hur länge varje steg tagit, och vilka events som triggat övergångarna. Använd ReadAllForwards + filter på event-typer som tillhör sagan.

← Föregående: Command-kö & Undo Nästa: Blazor-klient →