Del 8 av 10

Command-kö & ångra — single-writer och compensating events

Vi sätter commands i en kö per aggregat så att de utförs i rätt ordning, och vi implementerar "ångra" på det enda sätt som är korrekt i en event-sourced värld: nya, kompenserande events.

Lektion 8 — V4 tor. Här möts GoF-Command-mönstret från klassisk OO och CQRS-commandet från event-sourcing-världen.

Två betydelser av "command"

MönsterDefinition
GoF Command (1994)Inkapsla ett anrop som ett objekt. Har Execute() och oftast Undo(). Används bl.a. för redo/undo-stackar i UI:n.
CQRS Command (Greg Young, 2010)Ett oföränderligt meddelande som beskriver en avsikt att ändra tillstånd. Ingen Undo-metod — för ändring krävs ett nytt command.

Vi använder CQRS-betydelsen på serversidan. På klientsidan i Blazor (lektion 10) bygger vi en GoF-liknande undo-stack som genererar compensating commands.

Varför en kö?

Även om optimistic concurrency gör att vi aldrig tappar data, så är det dyrt att rulla tillbaka och försöka igen. Bättre att i de fall vi vet hur — undvika konflikten helt: serialisera skrivningarna per aggregat. En kö per StreamId garanterar att två commands mot samma aggregat aldrig körs parallellt.

Single-writer-principen Varje aggregatinstans har bara en aktiv skribent åt gången. Andra commands får vänta i kön. Detta är samma princip som actor model och som SqlStreamStores prenumerationer använder.

Implementation med Channel<T>

.NET 10 har System.Threading.Channels som är gjort för exakt detta — en lock-fri, asynkron producer/consumer-kö.

public interface ICommandEnvelope
{
    string AggregateStreamId { get; }     // partitioneringsnyckel
    Guid MessageId { get; }
}

public sealed record EnqueuedCommand(
    ICommandEnvelope Command,
    TaskCompletionSource<object?> Completion);

public sealed class AggregateCommandQueue
{
    private readonly Dictionary<string, Channel<EnqueuedCommand>> _channels = new();
    private readonly object _gate = new();
    private readonly IServiceScopeFactory _scopes;
    private readonly ILogger<AggregateCommandQueue> _log;

    public AggregateCommandQueue(IServiceScopeFactory scopes, ILogger<AggregateCommandQueue> log)
        => (_scopes, _log) = (scopes, log);

    public Task<object?> EnqueueAsync(ICommandEnvelope cmd, CancellationToken ct)
    {
        var ch = GetOrCreate(cmd.AggregateStreamId);
        var completion = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
        ch.Writer.TryWrite(new EnqueuedCommand(cmd, completion));
        return completion.Task.WaitAsync(ct);
    }

    private Channel<EnqueuedCommand> GetOrCreate(string streamId)
    {
        lock (_gate)
        {
            if (_channels.TryGetValue(streamId, out var existing)) return existing;

            var ch = Channel.CreateUnbounded<EnqueuedCommand>(
                new UnboundedChannelOptions { SingleReader = true });
            _channels[streamId] = ch;

            _ = Task.Run(() => PumpAsync(streamId, ch.Reader));
            return ch;
        }
    }

    private async Task PumpAsync(string streamId, ChannelReader<EnqueuedCommand> reader)
    {
        await foreach (var item in reader.ReadAllAsync())
        {
            try
            {
                using var scope = _scopes.CreateScope();
                var mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
                var result = await mediator.Send(item.Command);
                item.Completion.TrySetResult(result);
            }
            catch (Exception ex)
            {
                _log.LogError(ex, "Command failed on stream {Stream}", streamId);
                item.Completion.TrySetException(ex);
            }
        }
    }
}

Registrering

builder.Services.AddSingleton<AggregateCommandQueue>();

app.MapPost("/accounts/{id:guid}/deposit", async (
    Guid id, DepositRequest body,
    AggregateCommandQueue queue,
    [FromHeader(Name = "Idempotency-Key")] Guid? idempotencyKey,
    CancellationToken ct) =>
{
    var cmd = new DepositMoneyCommand(
        MessageId: idempotencyKey ?? Guid.NewGuid(),
        Account: new AccountId(id),
        Amount: body.Amount,
        Currency: body.Currency,
        Reference: body.Reference);

    var result = await queue.EnqueueAsync(cmd, ct);
    return Results.Ok(result);
});
Vad detta ger oss

Ångra i en event-sourced värld

Den kardinalsynd man vill begå när man kommer från CRUD-tänk: "jag tar bara bort eventet". Detta får man aldrig göra — andra projektioner, granskningsrapporter och historiska analyser har redan sett eventet.

Den korrekta vägen: emittera ett nytt event som kompenserar det förra. Eventet beskriver vad som hände — "insättningen återställdes", inte "insättningen togs bort".

public sealed record MoneyDepositReverted(
    Guid EventId,
    DateTimeOffset OccurredAt,
    AccountId Account,
    Guid OriginalEventId,
    decimal Amount,
    string Currency,
    string Reason) : IDomainEvent;

public partial class Account
{
    public void RevertDeposit(Guid originalEventId, string reason)
    {
        if (_state == AccountState.Closed)
            throw new InvalidOperationException("Cannot revert on a closed account.");

        var original = _committedDeposits.FirstOrDefault(d => d.EventId == originalEventId)
            ?? throw new InvalidOperationException("Original deposit not found in this stream.");

        Raise(new MoneyDepositReverted(
            EventId: Guid.NewGuid(),
            OccurredAt: DateTimeOffset.UtcNow,
            Account: Id,
            OriginalEventId: originalEventId,
            Amount: original.Amount,
            Currency: original.Currency,
            Reason: reason));
    }

    private void Apply(MoneyDepositReverted e) =>
        Balance = Balance.Subtract(new Money(e.Amount, e.Currency));
}

Compensating command

public sealed record RevertDepositCommand(
    Guid MessageId,
    AccountId Account,
    Guid OriginalEventId,
    string Reason) : ICommand<Unit>, ICommandEnvelope
{
    public string AggregateStreamId => $"account-{Account.Value}";
}

public sealed class RevertDepositHandler : IRequestHandler<RevertDepositCommand, Unit>
{
    private readonly IAccountRepository _repo;
    public RevertDepositHandler(IAccountRepository repo) => _repo = repo;

    public async Task<Unit> Handle(RevertDepositCommand cmd, CancellationToken ct)
    {
        var account = await _repo.LoadAsync(cmd.Account, ct)
            ?? throw new NotFoundException(cmd.Account);
        account.RevertDeposit(cmd.OriginalEventId, cmd.Reason);
        await _repo.SaveAsync(account, ct);
        return Unit.Value;
    }
}
Inte alla händelser går att kompensera "Skickade e-post till kund" går inte att ångra. Du kan emittera EmailRetractionRequested men brevet har redan flugit. När du designar aggregat: tänk igenom vilka events som har biverkningar utåt och vilka som bara påverkar internt tillstånd.

Undo-stack på klientsidan (förhandstitt)

Server-sidan vet inget om "senaste handlingen". Klienten håller en stack med commands den har skickat — och när användaren trycker Ctrl+Z genereras motsvarande compensating command.

public sealed class UndoStack
{
    private readonly Stack<ICommandEnvelope> _history = new();
    private readonly Stack<ICommandEnvelope> _redo = new();

    public void Record(ICommandEnvelope executed, ICommandEnvelope compensation)
    {
        _history.Push(compensation);
        _redo.Clear();
    }

    public ICommandEnvelope? PopUndo() => _history.Count > 0 ? _history.Pop() : null;
}

Detaljerna kring Blazor-integrationen finns i lektion 10.

Felscenarier i kön

ScenarioHantering
Domänundantag (InsufficientFundsException)Skickas tillbaka via TaskCompletionSource. API returnerar 422.
ConcurrencyConflictException mellan processerPolly-retry inom handlern. Misslyckas det helt → 409.
Tekniskt fel (DB nere)Logga, retry, kasta vidare. Köiteratorn fortsätter med nästa item.
Process omstartKön förloras (den är in-memory). Klienter får 503 / timeout. För full hållbarhet: outbox-tabell + persistent kö.

Sammanfattning

Referenser

Elektroniska resurser

Böcker

Övningar

Lös övningarna självständigt. Det finns inget facit — lärandet sker i processen.

  1. Bygg AggregateCommandQueue Skriv klassen från scratch utan att kopiera. Verifiera med ett test som skickar 100 commands till samma aggregat och 100 till ett annat — kontrollera att samma aggregats commands hamnar i rätt ordning men att de två aggregaten överlappar i tid.
  2. Compensating event Lägg till MoneyWithdrawalReverted + RevertWithdrawal-metod + RevertWithdrawalCommand. Skriv ett test som gör Withdraw följt av revert och verifierar att saldot återgår till ursprungligt värde.
  3. Felhantering i kön Konstruera en handler som alltid kastar exception. Skicka 10 commands. Verifiera att alla 10 TaskCompletionSource:er får tillbaka exceptionen och att kön fortsätter processa nya commands därefter.
  4. Bounded queue Ersätt UnboundedChannel med BoundedChannel med kapacitet 32 och full-mode Wait. Vad händer när kön fylls? När passar bounded? När passar unbounded?

Soloprojektor

Projekt 1 — Komplett undo/redo för bankkontot Implementera Deposit, Withdraw, RevertDeposit, RevertWithdrawal. Lägg en kö-pipeline framför API:t. Skriv ett konsol-CLI som tar kommandon deposit 100, undo, redo och visar att alla operationer registreras som unika events i strömmen.
Projekt 2 — Persistent kö (fördjupning) Byt ut den in-memory Channel<T> mot en MSSQL-tabell dbo.PendingCommands. En BackgroundService pollar tabellen, kör nästa command per stream, och tar bort raden efter lyckad körning. Resultat: kön överlever processomstart. Visa beteendet genom att skicka 100 commands, döda processen mitt i, starta om och verifiera att resterande commands körs.

← Föregående: Transaktioner & concurrency Nästa: Snapshots & Sagas →