Appending events
The recipe for writing one or more events to an aggregate.
One event
using System;
using System.Text;
using Celeriant.Client;
await using var pool = new CeleriantPool(new CeleriantPoolOptions
{
Address = "localhost:10000",
});
var key = new AggregateKey(orgId, ordersType, orderId);
await pool.WriteAsync(
key,
events: [new AggregateEvent
{
ClientSeq = 1,
EventTypeMajor = 1,
EventTypeMinor = 0,
EventTimestamp = DateTimeOffset.UtcNow,
EventValue = Encoding.UTF8.GetBytes("""{ "sku": "A-1", "qty": 2 }"""),
}],
allowCreate: true);
You set the client-side fields: ClientSeq (your per-writer sequence, used for idempotency), the event type, the timestamp, and the payload bytes. EventValue is opaque to the server; you choose the encoding. The server assigns the server-side index on write.
In real code you usually build events from typed payloads with the serializer helper instead of hand-packing bytes:
var evt = AggregateEventExtensions.Create(
1, // event type major
new OrderPlaced("A-1", 2), // typed payload, serialized for you
JsonEventSerializer.Default, // an IEventSerializer
clientSeq: 1);
The explicit object-initializer form is shown above so the fields are visible; the bytes are opaque to the server either way.
allowCreate: true creates the aggregate on its first event. Leave it false to require that the aggregate already exists.
Several events at once
Pass more than one event; they land together as one batch, in order, and the batch is one durable unit:
var now = DateTimeOffset.UtcNow;
await pool.WriteAsync(key,
events:
[
new AggregateEvent { ClientSeq = 1, EventTypeMajor = 1, EventTimestamp = now, EventValue = Encoding.UTF8.GetBytes("""{ "event": "created" }""") },
new AggregateEvent { ClientSeq = 2, EventTypeMajor = 2, EventTimestamp = now, EventValue = Encoding.UTF8.GetBytes("""{ "event": "lineAdded" }""") },
],
allowCreate: true);
Make it conditional, make it idempotent
The WriteAsync convenience overload takes the two parameters that matter for correctness:
await pool.WriteAsync(key, events,
clientId: writerId, // stable per writer; the idempotency key
expectedVersion: 4, // OCC: commit only if the aggregate is still at version 4
enforceClientIdempotency: true);
For why and how, see Handling concurrency conflicts and Implementing idempotent writes.