Vi tar oss an två avancerade ämnen: hur vi undviker att spela upp tusentals events vid varje load (snapshots), och hur vi koordinerar flöden över flera aggregat utan distribuerade transaktioner (sagas).
Ett aggregat med 50 events laddas på under 10 ms. Ett aggregat med 50 000 events tar sekunder. När en ström blir så lång att rehydreringen syns i svarstiderna är det dags för en snapshot.
| Ströms storlek | Strategi |
|---|---|
| < 1 000 events | Ingen snapshot behövs. Rehydrering är instant. |
| 1 000 – 10 000 | Överväg snapshots var 500:e event. |
| > 10 000 | Snapshots är obligatoriska. Var 100–500:e event beroende på event-storlek. |
CREATE TABLE dbo.Snapshots (
StreamId NVARCHAR(200) NOT NULL,
Version INT NOT NULL,
Type NVARCHAR(400) NOT NULL,
PayloadJson NVARCHAR(MAX) NOT NULL,
CreatedAt DATETIMEOFFSET NOT NULL,
CONSTRAINT PK_Snapshots PRIMARY KEY (StreamId, Version)
);
CREATE INDEX IX_Snapshots_StreamId_VersionDesc
ON dbo.Snapshots (StreamId, Version DESC);
public interface ISnapshotable<TSnapshot>
{
TSnapshot CreateSnapshot();
void RestoreFromSnapshot(TSnapshot snapshot);
}
public sealed record AccountSnapshot(
Guid Id,
string Currency,
decimal Balance,
AccountState State,
int Version);
public partial class Account : ISnapshotable<AccountSnapshot>
{
public AccountSnapshot CreateSnapshot() =>
new(Id.Value, Balance.Currency, Balance.Amount, _state, Version);
public void RestoreFromSnapshot(AccountSnapshot s)
{
Id = new AccountId(s.Id);
Balance = new Money(s.Balance, s.Currency);
_state = s.State;
SetVersion(s.Version); // exponerad protected setter på AggregateRoot
}
}
public async Task<Account?> LoadAsync(AccountId id, CancellationToken ct)
{
var streamId = $"account-{id.Value}";
// 1) Försök ladda senaste snapshot
var snapshot = await _snapshotStore.LoadLatestAsync<AccountSnapshot>(streamId, ct);
var account = new Account();
var fromVersion = StreamVersion.Start;
if (snapshot is not null)
{
account.RestoreFromSnapshot(snapshot.Payload);
fromVersion = snapshot.Version + 1; // läs bara events EFTER snapshot
}
// 2) Läs återstående events från strömmen
var events = await _eventStore.LoadFromAsync(streamId, fromVersion, ct);
if (snapshot is null && events.Count == 0) return null;
account.LoadFromHistory(events.Select(_typeMap.Deserialize));
return account;
}
public async Task SaveAsync(Account account, CancellationToken ct)
{
await _eventStore.AppendAsync(/* ... */);
// Snapshot var 100:e event
if (account.Version % 100 == 0)
await _snapshotStore.SaveAsync($"account-{account.Id.Value}",
account.Version,
account.CreateSnapshot(), ct);
}
AccountSnapshot-formatet kan ändras utan migration — bara radera tabellen och låt nästa load skapa nya snapshots.
När du lägger till en ny read model eller upptäcker en bug i en projektion vill du köra om hela historiken. Eftersom event store-strömmen är auktoritativ är detta alltid möjligt.
public async Task RebuildAsync(CancellationToken ct)
{
// 1) Töm read model och checkpoint
await using var conn = new SqlConnection(_cs);
await conn.ExecuteAsync("DELETE FROM dbo.AccountSummary");
await conn.ExecuteAsync("DELETE FROM dbo.ProjectionCheckpoints WHERE ProjectionName = 'AccountSummary'");
// 2) Låt vanliga projection-loopen ta vid — den läser från position 0 igen
_log.LogInformation("Rebuild scheduled. BackgroundService will catch up.");
}
dbo.AccountSummary_v2), projicera till kopian, byt namn när den hunnit ikapp. Inga läsare ser ett tomt tillstånd under bygget.
Vissa affärsoperationer berör flera aggregat. Klassiskt exempel: "överför pengar från konto A till konto B" — två skrivningar, två strömmar, behöver vara en logisk enhet trots att vi inte har distribuerade transaktioner.
Lösningen: en saga (även kallad process manager) lyssnar på events och skickar nästa command när rätt steg är klart. Vid fel: skicka compensating commands i omvänd ordning.
// 1. Användare: POST /transfers
// 2. TransferAggregate.Start → TransferStarted (egen stream)
// 3. Saga reagerar: SendCommand(WithdrawMoneyCommand, account A)
// 4. Account A: Withdraw → MoneyWithdrawn
// 5. Saga reagerar: SendCommand(DepositMoneyCommand, account B)
// 6. Account B: Deposit → MoneyDeposited
// 7. Saga reagerar: SendCommand(CompleteTransferCommand)
// 8. TransferAggregate.Complete → TransferCompleted
// 5'. Account B: Deposit → kastar (kontot stängt)
// 6'. Saga reagerar: SendCommand(RevertWithdrawalCommand, account A)
// 7'. Account A: RevertWithdrawal → MoneyWithdrawalReverted
// 8'. Saga reagerar: SendCommand(FailTransferCommand, reason)
// 9'. TransferAggregate.Fail → TransferFailed
public sealed class MoneyTransferSaga : BackgroundService
{
private readonly IStreamStore _store;
private readonly AggregateCommandQueue _queue;
private readonly IEventTypeMap _typeMap;
private readonly string _cs;
private const string Name = "MoneyTransferSaga";
protected override async Task ExecuteAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
var fromPos = await ReadCheckpointAsync(ct);
var page = await _store.ReadAllForwards(fromPos + 1, 100, ct);
if (page.Messages.Length == 0)
{
await Task.Delay(500, ct);
continue;
}
foreach (var m in page.Messages)
{
var clrType = _typeMap.TryResolve(m.Type);
if (clrType is null) { await WriteCheckpointAsync(m.Position, ct); continue; }
var json = await m.GetJsonData(ct);
var evt = (IDomainEvent)JsonSerializer.Deserialize(json, clrType)!;
switch (evt)
{
case TransferStarted s:
await _queue.EnqueueAsync(new WithdrawMoneyCommand(
Guid.NewGuid(), s.FromAccount, s.Amount, s.Currency,
$"Transfer {s.TransferId}"), ct);
break;
case MoneyWithdrawn w when w.Reference.StartsWith("Transfer "):
var transferId = ExtractTransferId(w.Reference);
var toAccount = await LookupTargetAsync(transferId, ct);
await _queue.EnqueueAsync(new DepositMoneyCommand(
Guid.NewGuid(), toAccount, w.Amount, w.Currency,
$"Transfer {transferId}"), ct);
break;
case MoneyDeposited d when d.Reference.StartsWith("Transfer "):
var tid = ExtractTransferId(d.Reference);
await _queue.EnqueueAsync(new CompleteTransferCommand(
Guid.NewGuid(), tid), ct);
break;
}
await WriteCheckpointAsync(m.Position, ct);
}
}
}
}
| Typ | Output | Exempel |
|---|---|---|
| Read model | Tabell för queries | AccountSummary |
| Process manager (saga) | Nya commands | MoneyTransferSaga |
| Notifier | E-post, push, webhook | LowBalanceNotifier |
| Reactor | Interna sidoeffekter | RecalculateCreditScore |
Lös övningarna självständigt. Det finns inget facit — lärandet sker i processen.
SqlSnapshotStore mot tabellen i exemplet. Skriv ett integrationstest som genererar 500 events, verifierar att snapshot tas vid version 100, 200, 300, 400, 500 och att load endast läser snapshot + senaste events efter den.AccountSummary. Starta projection-workern. Verifiera att tabellen byggs upp identisk med innan tömningen.Transfer-aggregat (events: TransferStarted, TransferCompleted, TransferFailed) och MoneyTransferSaga. Skriv två integrationstester: (1) lyckad transfer, (2) destination-konto stängt → kompensation körs._v2-tabell, projicera, byt namn atomiskt. Skriv ned stegen som en runbook (1 sida).Transfer-aggregat och saga. Lägg också till snapshots för konton (var 100:e event). UI: ett enkelt formulär i Blazor för att skicka transfers och se historiken. Integrationstester med Testcontainers MSSQL.
ReadAllForwards + filter på event-typer som tillhör sagan.