Vi lagrar inte tillstånd — vi lagrar historiken. I denna lektion bygger vi en minimal in-memory event store från scratch och rehydrerar aggregat ur strömmar.
Lagra varje förändring som en oföränderlig händelse i en append-only logg. Tillståndet räknas fram genom att spela upp händelserna i ordning.
| Aspekt | Command | Event |
|---|---|---|
| Tempus | Imperativ (DepositMoney) | Perfekt (MoneyDeposited) |
| Kan avvisas? | Ja — bryter den invariant? | Nej — har redan hänt |
| Mottagare | Ett aggregat | Noll till många lyssnare |
| Genererar | 0..N events | — |
| Persisteras? | Eventuellt (idempotency-store) | Alltid (event store) |
account-1f2c….Eventet är ett faktum. Det ska vara självförklarande över tid (10 år från nu ska någon kunna läsa det och förstå vad som hände).
public interface IDomainEvent
{
Guid EventId { get; }
DateTimeOffset OccurredAt { get; }
}
public sealed record MoneyDeposited(
Guid EventId,
DateTimeOffset OccurredAt,
AccountId Account,
decimal Amount,
string Currency,
string Reference) : IDomainEvent;
record).SchemaVersion som metadata.Innan vi tar in SqlStreamStore i nästa lektion bygger vi mönstret i ren C# så att alla delar är genomskinliga.
public sealed record StoredEvent(
string StreamId,
int Version,
string EventType,
string PayloadJson,
string MetadataJson,
long GlobalPosition,
DateTimeOffset CreatedAt);
public sealed class WrongExpectedVersionException : Exception
{
public WrongExpectedVersionException(string streamId, int expected, int actual)
: base($"Stream '{streamId}' expected version {expected} but was {actual}.") { }
}
public interface IEventStore
{
Task AppendAsync(string streamId, int expectedVersion,
IReadOnlyList<IDomainEvent> events, CancellationToken ct = default);
Task<IReadOnlyList<StoredEvent>> LoadAsync(string streamId, CancellationToken ct = default);
}
public sealed class InMemoryEventStore : IEventStore
{
private readonly object _gate = new();
private readonly Dictionary<string, List<StoredEvent>> _streams = new();
private long _globalPosition;
public Task AppendAsync(string streamId, int expectedVersion,
IReadOnlyList<IDomainEvent> events, CancellationToken ct = default)
{
lock (_gate)
{
if (!_streams.TryGetValue(streamId, out var stream))
stream = _streams[streamId] = new List<StoredEvent>();
var currentVersion = stream.Count - 1; // -1 if empty
if (currentVersion != expectedVersion)
throw new WrongExpectedVersionException(streamId, expectedVersion, currentVersion);
var nextVersion = currentVersion + 1;
foreach (var e in events)
{
stream.Add(new StoredEvent(
StreamId: streamId,
Version: nextVersion++,
EventType: e.GetType().FullName!,
PayloadJson: JsonSerializer.Serialize(e, e.GetType()),
MetadataJson: "{}",
GlobalPosition: Interlocked.Increment(ref _globalPosition),
CreatedAt: DateTimeOffset.UtcNow));
}
}
return Task.CompletedTask;
}
public Task<IReadOnlyList<StoredEvent>> LoadAsync(string streamId, CancellationToken ct = default)
{
lock (_gate)
{
if (!_streams.TryGetValue(streamId, out var stream))
return Task.FromResult<IReadOnlyList<StoredEvent>>(Array.Empty<StoredEvent>());
return Task.FromResult<IReadOnlyList<StoredEvent>>(stream.ToArray());
}
}
}
InMemoryEventStore är bara för att förstå mönstret och för enhetstester. Den förlorar all data vid omstart, hanterar inte konkurrent skrivning över processer, och har ingen prenumerationsmekanism. I lektion 6 byts den ut mot SqlStreamStore.
Ett event-sourced aggregat exponerar två saker till repository:t — möjligheten att tillämpa ett historiskt event utan att producera nya, och en lista över obekräftade events.
public abstract class AggregateRoot<TId>
{
private readonly List<IDomainEvent> _uncommitted = new();
public TId Id { get; protected set; } = default!;
public int Version { get; private set; } = -1; // -1 = ny ström
public IReadOnlyList<IDomainEvent> UncommittedEvents => _uncommitted;
public void ClearUncommitted() => _uncommitted.Clear();
protected void Raise(IDomainEvent @event)
{
Apply(@event);
_uncommitted.Add(@event);
}
public void LoadFromHistory(IEnumerable<IDomainEvent> history)
{
foreach (var e in history)
{
Apply(e);
Version++;
}
}
protected abstract void Apply(IDomainEvent @event);
}
public sealed class EventSourcedRepository<TAggregate, TId>
where TAggregate : AggregateRoot<TId>, new()
{
private readonly IEventStore _store;
private readonly IEventTypeMap _typeMap; // mappar EventType -> CLR-typ
public EventSourcedRepository(IEventStore store, IEventTypeMap typeMap)
=> (_store, _typeMap) = (store, typeMap);
public async Task<TAggregate?> LoadAsync(string streamId, CancellationToken ct = default)
{
var stored = await _store.LoadAsync(streamId, ct);
if (stored.Count == 0) return null;
var events = stored.Select(s =>
{
var clrType = _typeMap.Resolve(s.EventType);
return (IDomainEvent)JsonSerializer.Deserialize(s.PayloadJson, clrType)!;
});
var agg = new TAggregate();
agg.LoadFromHistory(events);
return agg;
}
public async Task SaveAsync(string streamId, TAggregate agg, CancellationToken ct = default)
{
var expected = agg.Version - agg.UncommittedEvents.Count; // version före nya events
await _store.AppendAsync(streamId, expected, agg.UncommittedEvents, ct);
agg.ClearUncommitted();
}
}
Event-sourced aggregat testas med ett Given–When–Then-mönster som läser som en specifikation.
[Fact]
public void Withdraw_more_than_balance_throws()
{
// Given
var account = new Account();
account.LoadFromHistory(new IDomainEvent[]
{
new AccountOpened(Guid.NewGuid(), DateTimeOffset.UtcNow,
new AccountId(Guid.NewGuid()), "SEK"),
new MoneyDeposited(Guid.NewGuid(), DateTimeOffset.UtcNow,
account.Id, 100m, "SEK", "init")
});
// When + Then
Assert.Throws<InsufficientFundsException>(() =>
account.Withdraw(new Money(500m, "SEK"), "rent"));
}
Events lever för alltid. När fältet Reference läggs till på MoneyDeposited efter ett år finns redan miljontals gamla events utan fältet. Lösningen är upcasting: vid läsning konverteras äldre versioner till den senaste formen.
public interface IUpcaster
{
string FromType { get; }
int FromVersion { get; }
(string Type, int Version, string Json) Upcast(string json);
}
public sealed class MoneyDepositedV1ToV2 : IUpcaster
{
public string FromType => "MoneyDepositedV1";
public int FromVersion => 1;
public (string, int, string) Upcast(string json)
{
using var doc = JsonDocument.Parse(json);
var root = doc.RootElement;
var v2 = new
{
EventId = root.GetProperty("EventId").GetGuid(),
OccurredAt = root.GetProperty("OccurredAt").GetDateTimeOffset(),
Account = root.GetProperty("Account"),
Amount = root.GetProperty("Amount").GetDecimal(),
Currency = root.GetProperty("Currency").GetString(),
Reference = "(legacy)" // nytt fält, default
};
return ("MoneyDeposited", 2, JsonSerializer.Serialize(v2));
}
}
MoneyDepositReverted). Detta är grunden för "ångra" i lektion 8.
| Fördelar | Kostnader |
|---|---|
| Komplett audit log gratis | Eventversionering över tid kräver disciplin |
| Tidsresor (state vid valfri tidpunkt) | Read models måste byggas separat |
| Nya projektioner från gammal historik | Eventual consistency mellan skriv och läs |
| Lätt att felsöka — varje förändring är en händelse | Snapshots behövs för långa strömmar |
Apply över historiken.InMemoryEventStore är ett perfekt pedagogiskt verktyg och testdouble.Lös övningarna självständigt. Det finns inget facit — lärandet sker i processen.
expectedVersion kastar WrongExpectedVersionException, (3) global position är monotont stigande över flera strömmar.Order-aggregat (events: OrderPlaced, OrderLineAdded, OrderShipped). Spara fem events, ladda en ny instans från strömmen, verifiera att totalsumman stämmer.WrongExpectedVersionException.OrderPlacedV1 utan Currency och OrderPlacedV2 med Currency. Skriv en upcaster och bevisa med ett test att en gammal payload kan laddas som V2.InMemoryEventStore så att den även persisterar till en append-only fil (en JSON per rad). Vid uppstart läser den in filen. Skriv en CLI som tar kommandon append, load, tail. 10+ tester.
LoadAsync(streamId, asOf: DateTimeOffset) som returnerar aggregatet som det såg ut vid en viss tidpunkt. Bygg ett UI (console eller Blazor) där användaren skjuter en tidslinje-slider och ser tillståndet ändras.
← Föregående: CQRS & Commands Nästa: SqlStreamStore & projektioner →