Vi sätter commands i en kö per aggregat så att de utförs i rätt ordning, och vi implementerar "ångra" på det enda sätt som är korrekt i en event-sourced värld: nya, kompenserande events.
| Mönster | Definition |
|---|---|
| GoF Command (1994) | Inkapsla ett anrop som ett objekt. Har Execute() och oftast Undo(). Används bl.a. för redo/undo-stackar i UI:n. |
| CQRS Command (Greg Young, 2010) | Ett oföränderligt meddelande som beskriver en avsikt att ändra tillstånd. Ingen Undo-metod — för ändring krävs ett nytt command. |
Vi använder CQRS-betydelsen på serversidan. På klientsidan i Blazor (lektion 10) bygger vi en GoF-liknande undo-stack som genererar compensating commands.
Även om optimistic concurrency gör att vi aldrig tappar data, så är det dyrt att rulla tillbaka och försöka igen. Bättre att i de fall vi vet hur — undvika konflikten helt: serialisera skrivningarna per aggregat. En kö per StreamId garanterar att två commands mot samma aggregat aldrig körs parallellt.
.NET 10 har System.Threading.Channels som är gjort för exakt detta — en lock-fri, asynkron producer/consumer-kö.
public interface ICommandEnvelope
{
string AggregateStreamId { get; } // partitioneringsnyckel
Guid MessageId { get; }
}
public sealed record EnqueuedCommand(
ICommandEnvelope Command,
TaskCompletionSource<object?> Completion);
public sealed class AggregateCommandQueue
{
private readonly Dictionary<string, Channel<EnqueuedCommand>> _channels = new();
private readonly object _gate = new();
private readonly IServiceScopeFactory _scopes;
private readonly ILogger<AggregateCommandQueue> _log;
public AggregateCommandQueue(IServiceScopeFactory scopes, ILogger<AggregateCommandQueue> log)
=> (_scopes, _log) = (scopes, log);
public Task<object?> EnqueueAsync(ICommandEnvelope cmd, CancellationToken ct)
{
var ch = GetOrCreate(cmd.AggregateStreamId);
var completion = new TaskCompletionSource<object?>(TaskCreationOptions.RunContinuationsAsynchronously);
ch.Writer.TryWrite(new EnqueuedCommand(cmd, completion));
return completion.Task.WaitAsync(ct);
}
private Channel<EnqueuedCommand> GetOrCreate(string streamId)
{
lock (_gate)
{
if (_channels.TryGetValue(streamId, out var existing)) return existing;
var ch = Channel.CreateUnbounded<EnqueuedCommand>(
new UnboundedChannelOptions { SingleReader = true });
_channels[streamId] = ch;
_ = Task.Run(() => PumpAsync(streamId, ch.Reader));
return ch;
}
}
private async Task PumpAsync(string streamId, ChannelReader<EnqueuedCommand> reader)
{
await foreach (var item in reader.ReadAllAsync())
{
try
{
using var scope = _scopes.CreateScope();
var mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
var result = await mediator.Send(item.Command);
item.Completion.TrySetResult(result);
}
catch (Exception ex)
{
_log.LogError(ex, "Command failed on stream {Stream}", streamId);
item.Completion.TrySetException(ex);
}
}
}
}
builder.Services.AddSingleton<AggregateCommandQueue>();
app.MapPost("/accounts/{id:guid}/deposit", async (
Guid id, DepositRequest body,
AggregateCommandQueue queue,
[FromHeader(Name = "Idempotency-Key")] Guid? idempotencyKey,
CancellationToken ct) =>
{
var cmd = new DepositMoneyCommand(
MessageId: idempotencyKey ?? Guid.NewGuid(),
Account: new AccountId(id),
Amount: body.Amount,
Currency: body.Currency,
Reference: body.Reference);
var result = await queue.EnqueueAsync(cmd, ct);
return Results.Ok(result);
});
TaskCompletionSource.StreamId).Den kardinalsynd man vill begå när man kommer från CRUD-tänk: "jag tar bara bort eventet". Detta får man aldrig göra — andra projektioner, granskningsrapporter och historiska analyser har redan sett eventet.
Den korrekta vägen: emittera ett nytt event som kompenserar det förra. Eventet beskriver vad som hände — "insättningen återställdes", inte "insättningen togs bort".
public sealed record MoneyDepositReverted(
Guid EventId,
DateTimeOffset OccurredAt,
AccountId Account,
Guid OriginalEventId,
decimal Amount,
string Currency,
string Reason) : IDomainEvent;
public partial class Account
{
public void RevertDeposit(Guid originalEventId, string reason)
{
if (_state == AccountState.Closed)
throw new InvalidOperationException("Cannot revert on a closed account.");
var original = _committedDeposits.FirstOrDefault(d => d.EventId == originalEventId)
?? throw new InvalidOperationException("Original deposit not found in this stream.");
Raise(new MoneyDepositReverted(
EventId: Guid.NewGuid(),
OccurredAt: DateTimeOffset.UtcNow,
Account: Id,
OriginalEventId: originalEventId,
Amount: original.Amount,
Currency: original.Currency,
Reason: reason));
}
private void Apply(MoneyDepositReverted e) =>
Balance = Balance.Subtract(new Money(e.Amount, e.Currency));
}
public sealed record RevertDepositCommand(
Guid MessageId,
AccountId Account,
Guid OriginalEventId,
string Reason) : ICommand<Unit>, ICommandEnvelope
{
public string AggregateStreamId => $"account-{Account.Value}";
}
public sealed class RevertDepositHandler : IRequestHandler<RevertDepositCommand, Unit>
{
private readonly IAccountRepository _repo;
public RevertDepositHandler(IAccountRepository repo) => _repo = repo;
public async Task<Unit> Handle(RevertDepositCommand cmd, CancellationToken ct)
{
var account = await _repo.LoadAsync(cmd.Account, ct)
?? throw new NotFoundException(cmd.Account);
account.RevertDeposit(cmd.OriginalEventId, cmd.Reason);
await _repo.SaveAsync(account, ct);
return Unit.Value;
}
}
EmailRetractionRequested men brevet har redan flugit. När du designar aggregat: tänk igenom vilka events som har biverkningar utåt och vilka som bara påverkar internt tillstånd.
Server-sidan vet inget om "senaste handlingen". Klienten håller en stack med commands den har skickat — och när användaren trycker Ctrl+Z genereras motsvarande compensating command.
public sealed class UndoStack
{
private readonly Stack<ICommandEnvelope> _history = new();
private readonly Stack<ICommandEnvelope> _redo = new();
public void Record(ICommandEnvelope executed, ICommandEnvelope compensation)
{
_history.Push(compensation);
_redo.Clear();
}
public ICommandEnvelope? PopUndo() => _history.Count > 0 ? _history.Pop() : null;
}
Detaljerna kring Blazor-integrationen finns i lektion 10.
| Scenario | Hantering |
|---|---|
Domänundantag (InsufficientFundsException) | Skickas tillbaka via TaskCompletionSource. API returnerar 422. |
ConcurrencyConflictException mellan processer | Polly-retry inom handlern. Misslyckas det helt → 409. |
| Tekniskt fel (DB nere) | Logga, retry, kasta vidare. Köiteratorn fortsätter med nästa item. |
| Process omstart | Kön förloras (den är in-memory). Klienter får 503 / timeout. För full hållbarhet: outbox-tabell + persistent kö. |
Channel<T> per StreamId serialiserar commands mot ett aggregat — single-writer-principen.MoneyDepositReverted), inte radering.Lös övningarna självständigt. Det finns inget facit — lärandet sker i processen.
MoneyWithdrawalReverted + RevertWithdrawal-metod + RevertWithdrawalCommand. Skriv ett test som gör Withdraw följt av revert och verifierar att saldot återgår till ursprungligt värde.TaskCompletionSource:er får tillbaka exceptionen och att kön fortsätter processa nya commands därefter.UnboundedChannel med BoundedChannel med kapacitet 32 och full-mode Wait. Vad händer när kön fylls? När passar bounded? När passar unbounded?Deposit, Withdraw, RevertDeposit, RevertWithdrawal. Lägg en kö-pipeline framför API:t. Skriv ett konsol-CLI som tar kommandon deposit 100, undo, redo och visar att alla operationer registreras som unika events i strömmen.
Channel<T> mot en MSSQL-tabell dbo.PendingCommands. En BackgroundService pollar tabellen, kör nästa command per stream, och tar bort raden efter lyckad körning. Resultat: kön överlever processomstart. Visa beteendet genom att skicka 100 commands, döda processen mitt i, starta om och verifiera att resterande commands körs.
← Föregående: Transaktioner & concurrency Nästa: Snapshots & Sagas →