Del 5 av 10

Event Sourcing — append-only sanning

Vi lagrar inte tillstånd — vi lagrar historiken. I denna lektion bygger vi en minimal in-memory event store från scratch och rehydrerar aggregat ur strömmar.

Lektion 5 — V3 mån. Här lär vi mönstret rent — riktig persistens (SqlStreamStore på MSSQL) följer i lektion 6.

Idén i en mening

Lagra varje förändring som en oföränderlig händelse i en append-only logg. Tillståndet räknas fram genom att spela upp händelserna i ordning.

State-stored vs event-sourced I en state-stored modell sparar du resultatet (saldo = 4 200 kr). I en event-sourced modell sparar du förloppet (insättning 5 000, uttag 800). Saldot blir en funktion av historiken.

Command vs Event — ordbok

AspektCommandEvent
TempusImperativ (DepositMoney)Perfekt (MoneyDeposited)
Kan avvisas?Ja — bryter den invariant?Nej — har redan hänt
MottagareEtt aggregatNoll till många lyssnare
Genererar0..N events
Persisteras?Eventuellt (idempotency-store)Alltid (event store)

Ström, position, version

Eventet — design

Eventet är ett faktum. Det ska vara självförklarande över tid (10 år från nu ska någon kunna läsa det och förstå vad som hände).

public interface IDomainEvent
{
    Guid EventId { get; }
    DateTimeOffset OccurredAt { get; }
}

public sealed record MoneyDeposited(
    Guid EventId,
    DateTimeOffset OccurredAt,
    AccountId Account,
    decimal Amount,
    string Currency,
    string Reference) : IDomainEvent;
Designregler för events

En minimal event store — från scratch

Innan vi tar in SqlStreamStore i nästa lektion bygger vi mönstret i ren C# så att alla delar är genomskinliga.

public sealed record StoredEvent(
    string StreamId,
    int Version,
    string EventType,
    string PayloadJson,
    string MetadataJson,
    long GlobalPosition,
    DateTimeOffset CreatedAt);

public sealed class WrongExpectedVersionException : Exception
{
    public WrongExpectedVersionException(string streamId, int expected, int actual)
        : base($"Stream '{streamId}' expected version {expected} but was {actual}.") { }
}

public interface IEventStore
{
    Task AppendAsync(string streamId, int expectedVersion,
                     IReadOnlyList<IDomainEvent> events, CancellationToken ct = default);

    Task<IReadOnlyList<StoredEvent>> LoadAsync(string streamId, CancellationToken ct = default);
}

InMemoryEventStore

public sealed class InMemoryEventStore : IEventStore
{
    private readonly object _gate = new();
    private readonly Dictionary<string, List<StoredEvent>> _streams = new();
    private long _globalPosition;

    public Task AppendAsync(string streamId, int expectedVersion,
                            IReadOnlyList<IDomainEvent> events, CancellationToken ct = default)
    {
        lock (_gate)
        {
            if (!_streams.TryGetValue(streamId, out var stream))
                stream = _streams[streamId] = new List<StoredEvent>();

            var currentVersion = stream.Count - 1; // -1 if empty
            if (currentVersion != expectedVersion)
                throw new WrongExpectedVersionException(streamId, expectedVersion, currentVersion);

            var nextVersion = currentVersion + 1;
            foreach (var e in events)
            {
                stream.Add(new StoredEvent(
                    StreamId: streamId,
                    Version: nextVersion++,
                    EventType: e.GetType().FullName!,
                    PayloadJson: JsonSerializer.Serialize(e, e.GetType()),
                    MetadataJson: "{}",
                    GlobalPosition: Interlocked.Increment(ref _globalPosition),
                    CreatedAt: DateTimeOffset.UtcNow));
            }
        }
        return Task.CompletedTask;
    }

    public Task<IReadOnlyList<StoredEvent>> LoadAsync(string streamId, CancellationToken ct = default)
    {
        lock (_gate)
        {
            if (!_streams.TryGetValue(streamId, out var stream))
                return Task.FromResult<IReadOnlyList<StoredEvent>>(Array.Empty<StoredEvent>());
            return Task.FromResult<IReadOnlyList<StoredEvent>>(stream.ToArray());
        }
    }
}
Inte produktionsklar InMemoryEventStore är bara för att förstå mönstret och för enhetstester. Den förlorar all data vid omstart, hanterar inte konkurrent skrivning över processer, och har ingen prenumerationsmekanism. I lektion 6 byts den ut mot SqlStreamStore.

Rehydrera ett aggregat

Ett event-sourced aggregat exponerar två saker till repository:t — möjligheten att tillämpa ett historiskt event utan att producera nya, och en lista över obekräftade events.

public abstract class AggregateRoot<TId>
{
    private readonly List<IDomainEvent> _uncommitted = new();

    public TId Id { get; protected set; } = default!;
    public int Version { get; private set; } = -1;   // -1 = ny ström

    public IReadOnlyList<IDomainEvent> UncommittedEvents => _uncommitted;
    public void ClearUncommitted() => _uncommitted.Clear();

    protected void Raise(IDomainEvent @event)
    {
        Apply(@event);
        _uncommitted.Add(@event);
    }

    public void LoadFromHistory(IEnumerable<IDomainEvent> history)
    {
        foreach (var e in history)
        {
            Apply(e);
            Version++;
        }
    }

    protected abstract void Apply(IDomainEvent @event);
}

Repository som kopplar ihop aggregat + event store

public sealed class EventSourcedRepository<TAggregate, TId>
    where TAggregate : AggregateRoot<TId>, new()
{
    private readonly IEventStore _store;
    private readonly IEventTypeMap _typeMap;     // mappar EventType -> CLR-typ
    public EventSourcedRepository(IEventStore store, IEventTypeMap typeMap)
        => (_store, _typeMap) = (store, typeMap);

    public async Task<TAggregate?> LoadAsync(string streamId, CancellationToken ct = default)
    {
        var stored = await _store.LoadAsync(streamId, ct);
        if (stored.Count == 0) return null;

        var events = stored.Select(s =>
        {
            var clrType = _typeMap.Resolve(s.EventType);
            return (IDomainEvent)JsonSerializer.Deserialize(s.PayloadJson, clrType)!;
        });

        var agg = new TAggregate();
        agg.LoadFromHistory(events);
        return agg;
    }

    public async Task SaveAsync(string streamId, TAggregate agg, CancellationToken ct = default)
    {
        var expected = agg.Version - agg.UncommittedEvents.Count; // version före nya events
        await _store.AppendAsync(streamId, expected, agg.UncommittedEvents, ct);
        agg.ClearUncommitted();
    }
}

Tester som dokumentation

Event-sourced aggregat testas med ett Given–When–Then-mönster som läser som en specifikation.

[Fact]
public void Withdraw_more_than_balance_throws()
{
    // Given
    var account = new Account();
    account.LoadFromHistory(new IDomainEvent[]
    {
        new AccountOpened(Guid.NewGuid(), DateTimeOffset.UtcNow,
                          new AccountId(Guid.NewGuid()), "SEK"),
        new MoneyDeposited(Guid.NewGuid(), DateTimeOffset.UtcNow,
                           account.Id, 100m, "SEK", "init")
    });

    // When + Then
    Assert.Throws<InsufficientFundsException>(() =>
        account.Withdraw(new Money(500m, "SEK"), "rent"));
}

Eventversionering — upcasting

Events lever för alltid. När fältet Reference läggs till på MoneyDeposited efter ett år finns redan miljontals gamla events utan fältet. Lösningen är upcasting: vid läsning konverteras äldre versioner till den senaste formen.

public interface IUpcaster
{
    string FromType { get; }
    int FromVersion { get; }
    (string Type, int Version, string Json) Upcast(string json);
}

public sealed class MoneyDepositedV1ToV2 : IUpcaster
{
    public string FromType => "MoneyDepositedV1";
    public int FromVersion => 1;
    public (string, int, string) Upcast(string json)
    {
        using var doc = JsonDocument.Parse(json);
        var root = doc.RootElement;
        var v2 = new
        {
            EventId = root.GetProperty("EventId").GetGuid(),
            OccurredAt = root.GetProperty("OccurredAt").GetDateTimeOffset(),
            Account = root.GetProperty("Account"),
            Amount = root.GetProperty("Amount").GetDecimal(),
            Currency = root.GetProperty("Currency").GetString(),
            Reference = "(legacy)"     // nytt fält, default
        };
        return ("MoneyDeposited", 2, JsonSerializer.Serialize(v2));
    }
}
Ta aldrig bort eller redigera ett event När ett event är committat är det historia. Behöver du "ta tillbaka" en händelse — emittera ett compensating event (t.ex. MoneyDepositReverted). Detta är grunden för "ångra" i lektion 8.

Fördelar och kostnader

FördelarKostnader
Komplett audit log gratisEventversionering över tid kräver disciplin
Tidsresor (state vid valfri tidpunkt)Read models måste byggas separat
Nya projektioner från gammal historikEventual consistency mellan skriv och läs
Lätt att felsöka — varje förändring är en händelseSnapshots behövs för långa strömmar

Sammanfattning

Referenser

Elektroniska resurser

Böcker

Övningar

Lös övningarna självständigt. Det finns inget facit — lärandet sker i processen.

  1. Implementera InMemoryEventStore Skriv klassen från scratch utan att kopiera. Verifiera med tre tester: (1) append + load returnerar samma events, (2) fel expectedVersion kastar WrongExpectedVersionException, (3) global position är monotont stigande över flera strömmar.
  2. Rehydrering Bygg ett Order-aggregat (events: OrderPlaced, OrderLineAdded, OrderShipped). Spara fem events, ladda en ny instans från strömmen, verifiera att totalsumman stämmer.
  3. Optimistic concurrency Ladda samma aggregat två gånger i två trådar, mutera båda, spara båda. Verifiera att den andra sparningen kastar WrongExpectedVersionException.
  4. Upcaster Skapa OrderPlacedV1 utan Currency och OrderPlacedV2 med Currency. Skriv en upcaster och bevisa med ett test att en gammal payload kan laddas som V2.

Soloprojektor

Projekt 1 — Mini-event-store med fil-persistens Utöka InMemoryEventStore så att den även persisterar till en append-only fil (en JSON per rad). Vid uppstart läser den in filen. Skriv en CLI som tar kommandon append, load, tail. 10+ tester.
Projekt 2 — Tidsresa (fördjupning) Implementera LoadAsync(streamId, asOf: DateTimeOffset) som returnerar aggregatet som det såg ut vid en viss tidpunkt. Bygg ett UI (console eller Blazor) där användaren skjuter en tidslinje-slider och ser tillståndet ändras.

← Föregående: CQRS & Commands Nästa: SqlStreamStore & projektioner →