JSON Event Sourcing: Schema Design, Event Store & Projections
Last updated:
Event sourcing stores the full sequence of state-change events rather than the current state of a record. The current state is derived by replaying all events in order — or by loading a snapshot and replaying the delta. JSON is the dominant format for events because it is human-readable during incident response, schema-flexible for additive evolution, and natively supported by every language, database, and streaming platform without a serialization adapter. The tradeoff is size: a JSON event averages 200–500 bytes versus 50–150 bytes for binary formats like Avro or Protobuf. For most systems, JSON's operational simplicity wins. This guide covers the JSON-specific details that most event sourcing articles skip: event envelope design, append-only storage with PostgreSQL JSONB, schema evolution without breaking historical replays, building projections with the reduce pattern, snapshot optimization, TypeScript discriminated unions for type-safe event handling, and the full CQRS separation between the command and read sides.
JSON Event Schema Design
Every event consists of two parts: the envelope (fields common to all events) and the payload (fields specific to this event type). The envelope must be stable — projection code and infrastructure read these fields without knowing the event type. The payload is event-specific and can evolve independently. Use a type field with a consistent naming convention (PascalCase noun-verb like OrderPlaced, PaymentFailed) as the discriminator for routing events to handlers. Include a version field in the envelope to enable upcasting when the payload schema changes.
// ── Event envelope: fields common to all events ──────────────────────
interface EventEnvelope {
// Discriminator — used to route to the correct handler
type: string;
// Unique event identifier (UUID v4)
eventId: string;
// Which aggregate this event belongs to
aggregateId: string;
aggregateType: string; // e.g. "Order", "Account"
// Position within the aggregate stream (starts at 1)
sequence: number;
// Schema version of the payload — increment when payload shape changes
version: number;
// When the event occurred (ISO 8601, always UTC)
occurredAt: string;
// Tracing: correlationId links events from the same user action
correlationId: string;
causationId?: string; // eventId of the command that triggered this event
}
// ── Example events with typed payloads ───────────────────────────────
interface OrderPlacedEvent extends EventEnvelope {
type: 'OrderPlaced';
version: 1;
payload: {
customerId: string;
items: Array<{
productId: string;
name: string;
quantity: number;
unitPriceCents: number; // integer cents, not float — avoids precision loss
}>;
totalCents: number;
currencyCode: string; // ISO 4217: "USD", "EUR"
placedAt: string; // ISO 8601
};
}
interface OrderShippedEvent extends EventEnvelope {
type: 'OrderShipped';
version: 1;
payload: {
trackingNumber: string;
carrier: string;
shippedAt: string; // ISO 8601
estimatedDelivery: string; // ISO 8601 date: "2026-02-03"
};
}
interface OrderCancelledEvent extends EventEnvelope {
type: 'OrderCancelled';
version: 1;
payload: {
reason: 'customer_request' | 'payment_failed' | 'out_of_stock' | 'fraud';
cancelledAt: string;
refundAmountCents?: number; // optional: not all cancellations trigger refunds
};
}
// ── Naming conventions ────────────────────────────────────────────────
// Good (past tense, business action, noun-verb):
// OrderPlaced, PaymentFailed, InventoryReserved, UserEmailVerified
// Avoid (technical/CRUD, present tense):
// OrderCreated, UpdateOrder, SetStatus, DbWrite
// ── Serialized JSON on the wire / in storage ─────────────────────────
const event: OrderPlacedEvent = {
type: 'OrderPlaced',
eventId: 'evt_01HXYZ1234567890',
aggregateId: 'order_abc123',
aggregateType: 'Order',
sequence: 1,
version: 1,
occurredAt: '2026-01-28T14:32:00.000Z',
correlationId: 'corr_user42_session9',
payload: {
customerId: 'cust_42',
items: [
{ productId: 'prod_widget', name: 'Widget Pro', quantity: 2, unitPriceCents: 1999 },
],
totalCents: 3998,
currencyCode: 'USD',
placedAt: '2026-01-28T14:32:00.000Z',
},
};Use integer cents for monetary values — floating-point JSON numbers lose precision across serialization boundaries (0.1 + 0.2 !== 0.3). Use ISO 8601 strings for all timestamps so they sort lexicographically and carry timezone information explicitly. The sequence field is the position of this event within its aggregate stream — used for optimistic concurrency checks and ordered replay. For JSON Schema patterns for validating event envelopes, see the linked guide.
Storing Events: PostgreSQL vs EventStoreDB vs Kafka
Event storage requires an append-only log indexed by aggregate stream. The three main options are PostgreSQL (with JSONB), EventStoreDB (purpose-built), and Kafka (log-based streaming). PostgreSQL is the pragmatic starting point — your team already operates it, and JSONB with a unique constraint on (stream_id, sequence) provides optimistic concurrency. EventStoreDB adds native stream subscriptions and persistent projections. Kafka provides horizontal scalability for high-throughput event streams but lacks per-aggregate ordering guarantees without partitioning by aggregate ID.
-- ── PostgreSQL append-only event store ───────────────────────────────
CREATE TABLE events (
id bigserial PRIMARY KEY, -- global ordering
stream_id text NOT NULL, -- aggregateId
sequence integer NOT NULL, -- position within stream
event_type varchar(100) NOT NULL,
event_version smallint NOT NULL DEFAULT 1,
payload jsonb NOT NULL,
metadata jsonb NOT NULL DEFAULT '{}', -- correlationId, userId, etc.
occurred_at timestamptz NOT NULL DEFAULT now()
);
-- Optimistic concurrency: INSERT fails if (stream_id, sequence) already exists
CREATE UNIQUE INDEX idx_events_stream_seq ON events (stream_id, sequence);
-- Fast per-aggregate replay: load all events for one aggregate in order
CREATE INDEX idx_events_stream_id ON events (stream_id, sequence);
-- Global position scan for projection catchup (reads events in write order)
-- bigserial id is the global position — no additional index needed
-- GIN index on payload for cross-stream queries (optional)
CREATE INDEX idx_events_payload_gin ON events USING GIN (payload);
-- ── Append an event (with optimistic concurrency check) ───────────────
-- Uses INSERT ... ON CONFLICT DO NOTHING; check affected rows = 1
INSERT INTO events (stream_id, sequence, event_type, event_version, payload, metadata)
VALUES (
'order_abc123',
1,
'OrderPlaced',
1,
'{"customerId":"cust_42","totalCents":3998,"currencyCode":"USD"}',
'{"correlationId":"corr_user42_session9","userId":"user_42"}'
);
-- If another process wrote sequence=1 first, this fails with unique violation
-- ── Load events for a single aggregate (replay) ───────────────────────
SELECT id, sequence, event_type, event_version, payload, metadata, occurred_at
FROM events
WHERE stream_id = 'order_abc123'
ORDER BY sequence ASC;
-- ── Load events after a global position (projection catchup) ─────────
SELECT id, stream_id, sequence, event_type, payload, occurred_at
FROM events
WHERE id > $last_processed_id
ORDER BY id ASC
LIMIT 500;
-- ── Cross-stream query: find all OrderPlaced events in EU (GIN-indexed) ─
SELECT stream_id, payload, occurred_at
FROM events
WHERE event_type = 'OrderPlaced'
AND payload @> '{"currencyCode":"EUR"}';
-- ── EventStoreDB: append with optimistic concurrency ─────────────────
// Using @eventstore/db-client (Node.js)
import { EventStoreDBClient, jsonEvent, STREAM_EXISTS } from '@eventstore/db-client';
const client = EventStoreDBClient.connectionString('esdb://localhost:2113?tls=false');
const event = jsonEvent({
type: 'OrderPlaced',
data: { customerId: 'cust_42', totalCents: 3998, currencyCode: 'USD' },
metadata: { correlationId: 'corr_user42_session9' },
});
// expectedRevision: BigInt(0) = expect stream to have exactly 1 event (sequence 0)
// Throws WrongExpectedVersionError on conflict
await client.appendToStream('order-abc123', [event], {
expectedRevision: STREAM_EXISTS, // or BigInt(sequence - 1)
});Start with PostgreSQL unless you need native stream subscriptions or write throughput above ~5,000 events/second per node. The unique constraint on (stream_id, sequence) gives you optimistic concurrency without a separate version check query. For Kafka, partition by aggregateId to guarantee ordering within a stream — without this, events from the same aggregate can arrive out of order across partitions. For PostgreSQL JSONB indexing in depth, see the linked guide.
Schema Evolution Without Breaking Replays
The immutability of the event log is both its greatest strength and its most demanding constraint: you cannot modify stored events when the business domain evolves. Schema evolution in event sourcing requires the upcasting pattern — a transformation layer that converts older event shapes to the current shape before projection handlers receive them. The guiding principle is additive-only changes: add optional fields freely; introduce a new event version for any breaking change.
// ── Additive change: safe — no upcaster needed ────────────────────────
// Original payload (v1 events in the store — never modified)
type OrderPlacedPayload_v1 = {
customerId: string;
totalCents: number;
};
// Add optional field — v1 events simply won't have it; default to null
type OrderPlacedPayload_v2 = {
customerId: string;
totalCents: number;
regionCode?: string; // NEW optional field — safe to add
};
// Projection handles missing field with a default:
function applyOrderPlaced(state: OrderState, event: OrderPlacedEvent) {
return {
...state,
customerId: event.payload.customerId,
totalCents: event.payload.totalCents,
regionCode: event.payload.regionCode ?? 'UNKNOWN', // default for v1 events
};
}
// ── Breaking change: requires a new version + upcaster ────────────────
// Breaking change: split fullName into firstName + lastName
// v1 stored events (never touched):
type OrderPlacedPayload_v1_broken = {
customerId: string;
fullName: string; // original field
};
// v2 new schema:
type OrderPlacedPayload_v2_current = {
customerId: string;
firstName: string; // split from fullName
lastName: string;
};
// Upcaster: transforms v1 -> v2 on read
function upcasterOrderPlaced_v1_to_v2(
event: { version: 1; payload: OrderPlacedPayload_v1_broken }
): { version: 2; payload: OrderPlacedPayload_v2_current } {
const [firstName = '', ...rest] = event.payload.fullName.split(' ');
const lastName = rest.join(' ');
return {
version: 2,
payload: {
customerId: event.payload.customerId,
firstName,
lastName,
},
};
}
// ── Upcaster pipeline: runs transparently in the event loading layer ──
type UpcasterFn = (event: StoredEvent) => StoredEvent;
const upcasters: Record<string, Record<number, UpcasterFn>> = {
OrderPlaced: {
1: upcasterOrderPlaced_v1_to_v2, // v1 -> v2
// 2: upcasterOrderPlaced_v2_to_v3, // chain for further versions
},
};
function applyUpcasters(event: StoredEvent): StoredEvent {
const chain = upcasters[event.event_type];
if (!chain) return event;
let current = event;
while (chain[current.event_version]) {
current = chain[current.event_version](current);
}
return current;
}
// ── Eternal compatibility rules ───────────────────────────────────────
// SAFE: Add an optional field (set default in projection)
// SAFE: Add a new event type
// SAFE: Narrow a field's enum values with an upcaster
// UNSAFE: Remove a field (breaks all projections reading it)
// UNSAFE: Rename a field without an upcaster
// UNSAFE: Change a field's type (number -> string) without an upcaster
// UNSAFE: Make an optional field required (breaks old events without it)Store upcasters as versioned pure functions in a dedicated module. Every time you load events for replay, run them through the upcaster pipeline before passing to projection handlers. This keeps projection code clean — handlers always receive the current schema regardless of what version is stored. Maintain a schema registry (a JSON Schema document per event type per version) so new team members can understand the evolution history without reading git blame. For JSON Schema versioning strategies, see the linked guide.
Building Projections from JSON Events
A projection is a pure reduce function over an event stream that produces a read model optimized for a specific query. The reduce pattern mirrors Array.prototype.reduce: an initial state and a function that applies each event in order. Projection handlers must be pure — no I/O, no side effects — so they can be replayed deterministically from any position. For CQRS read models, a separate writer subscribes to the event stream and persists projection output to a read-optimized database.
// ── In-memory reduce projection ───────────────────────────────────────
interface OrderState {
orderId: string;
status: 'pending' | 'confirmed' | 'shipped' | 'cancelled';
customerId: string;
items: Array<{ productId: string; name: string; quantity: number; unitPriceCents: number }>;
totalCents: number;
trackingNumber?: string;
cancelReason?: string;
}
type OrderEvent = OrderPlacedEvent | OrderShippedEvent | OrderCancelledEvent;
function applyEvent(state: OrderState | null, event: OrderEvent): OrderState {
switch (event.type) {
case 'OrderPlaced':
return {
orderId: event.aggregateId,
status: 'pending',
customerId: event.payload.customerId,
items: event.payload.items,
totalCents: event.payload.totalCents,
};
case 'OrderShipped':
return {
...state!,
status: 'shipped',
trackingNumber: event.payload.trackingNumber,
};
case 'OrderCancelled':
return {
...state!,
status: 'cancelled',
cancelReason: event.payload.reason,
};
}
}
// Reduce all events to current state
async function loadOrderAggregate(orderId: string): Promise<OrderState | null> {
const events = await loadEvents(orderId); // fetch from event store
return events.reduce(
(state: OrderState | null, event) => applyEvent(state, event as OrderEvent),
null
);
}
// ── CQRS read model: persist projection to a read table ───────────────
// PostgreSQL read table (denormalized for fast queries)
/*
CREATE TABLE order_summaries (
order_id text PRIMARY KEY,
customer_id text NOT NULL,
status text NOT NULL,
total_cents integer NOT NULL,
item_count integer NOT NULL,
tracking_num text,
last_updated timestamptz NOT NULL
);
*/
// Projection writer: subscribes to events and upserts read table
async function handleEventForProjection(event: StoredEvent): Promise<void> {
const typed = event as OrderEvent;
switch (typed.type) {
case 'OrderPlaced':
await db.query(
`INSERT INTO order_summaries (order_id, customer_id, status, total_cents, item_count, last_updated)
VALUES ($1, $2, 'pending', $3, $4, now())
ON CONFLICT (order_id) DO UPDATE
SET status = 'pending', total_cents = $3, item_count = $4, last_updated = now()`,
[typed.aggregateId, typed.payload.customerId, typed.payload.totalCents, typed.payload.items.length]
);
break;
case 'OrderShipped':
await db.query(
`UPDATE order_summaries SET status = 'shipped', tracking_num = $2, last_updated = now()
WHERE order_id = $1`,
[typed.aggregateId, typed.payload.trackingNumber]
);
break;
case 'OrderCancelled':
await db.query(
`UPDATE order_summaries SET status = 'cancelled', last_updated = now()
WHERE order_id = $1`,
[typed.aggregateId]
);
break;
}
}
// ── Snapshot every N events ───────────────────────────────────────────
const SNAPSHOT_THRESHOLD = 100;
async function saveSnapshotIfNeeded(
aggregateId: string,
state: OrderState,
sequence: number
): Promise<void> {
if (sequence % SNAPSHOT_THRESHOLD === 0) {
await db.query(
`INSERT INTO snapshots (stream_id, sequence, state, taken_at)
VALUES ($1, $2, $3, now())
ON CONFLICT (stream_id) DO UPDATE
SET sequence = $2, state = $3, taken_at = now()`,
[aggregateId, sequence, JSON.stringify(state)]
);
}
}The snapshot threshold of 100–500 events is a rule of thumb — measure the actual replay latency for your longest-lived aggregates and set the threshold so replay stays under your latency budget (typically under 100ms for user-facing reads). For JSON caching strategies for projection snapshots, see the linked guide.
Event Replay and Time Travel
One of the most powerful capabilities of event sourcing is the ability to replay the entire event history to rebuild any read model — or to reconstruct the state of any aggregate at any point in time (time travel). Replay is the mechanism for recovering from projection bugs, adding new read models, and auditing historical states. The event store must support efficient position-based scanning.
// ── Full replay from position 0 (rebuild a projection) ───────────────
async function rebuildProjection(projectionWriter: (event: StoredEvent) => Promise<void>) {
let lastId = 0;
const BATCH_SIZE = 500;
// Truncate the read model before rebuilding
await db.query('TRUNCATE TABLE order_summaries');
while (true) {
const rows = await db.query<StoredEvent>(
`SELECT id, stream_id, sequence, event_type, event_version, payload, metadata, occurred_at
FROM events
WHERE id > $1
ORDER BY id ASC
LIMIT $2`,
[lastId, BATCH_SIZE]
);
if (rows.length === 0) break;
for (const event of rows) {
const upcasted = applyUpcasters(event); // transform old schemas
await projectionWriter(upcasted);
}
lastId = rows[rows.length - 1].id;
console.log(`Replayed up to global position ${lastId}`);
}
}
// ── Replay events for a single aggregate (load current state) ─────────
async function loadAggregateWithSnapshot(orderId: string): Promise<OrderState | null> {
// 1. Try to load the latest snapshot
const snapshot = await db.query<{ sequence: number; state: string }>(
'SELECT sequence, state FROM snapshots WHERE stream_id = $1',
[orderId]
);
let startSequence = 0;
let state: OrderState | null = null;
if (snapshot) {
startSequence = snapshot.sequence + 1;
state = JSON.parse(snapshot.state) as OrderState;
}
// 2. Load only events after the snapshot sequence
const events = await db.query<StoredEvent>(
`SELECT id, sequence, event_type, event_version, payload, occurred_at
FROM events
WHERE stream_id = $1 AND sequence >= $2
ORDER BY sequence ASC`,
[orderId, startSequence]
);
// 3. Reduce snapshot + delta events to current state
return events.reduce(
(s: OrderState | null, ev) => applyEvent(s, applyUpcasters(ev) as OrderEvent),
state
);
}
// ── Time travel: state at a specific point in time ────────────────────
async function loadOrderAtTime(orderId: string, asOf: Date): Promise<OrderState | null> {
const events = await db.query<StoredEvent>(
`SELECT id, sequence, event_type, event_version, payload, occurred_at
FROM events
WHERE stream_id = $1
AND occurred_at <= $2
ORDER BY sequence ASC`,
[orderId, asOf.toISOString()]
);
return events.reduce(
(s: OrderState | null, ev) => applyEvent(s, applyUpcasters(ev) as OrderEvent),
null
);
}
// ── Filter replay by event type (cross-stream analytics) ─────────────
async function loadAllCancelledOrders(since: Date): Promise<string[]> {
const rows = await db.query<{ stream_id: string }>(
`SELECT DISTINCT stream_id
FROM events
WHERE event_type = 'OrderCancelled'
AND occurred_at >= $1
AND payload @> '{"reason":"payment_failed"}'`, // GIN-indexed
[since.toISOString()]
);
return rows.map(r => r.stream_id);
}Replay throughput depends on event store I/O and projection write latency. Batch reads in pages of 500–1,000 events and use transactions for atomic projection updates within each batch. For large rebuilds (millions of events), run in a background process and track the high-water mark in a checkpoint table so you can resume after interruption without replaying from scratch.
TypeScript Discriminated Unions for Event Types
TypeScript discriminated unions make event handling exhaustively type-safe: the compiler verifies that every event type is handled in every switch statement, and payload types are narrowed automatically by the type field. Combined with an event registry, this pattern catches missing event handlers and incorrect payload access at compile time rather than at runtime during a production replay.
// ── Discriminated union: all order events ─────────────────────────────
type OrderEvent =
| { type: 'OrderPlaced'; aggregateId: string; sequence: number; version: 1; payload: OrderPlacedPayload }
| { type: 'OrderShipped'; aggregateId: string; sequence: number; version: 1; payload: OrderShippedPayload }
| { type: 'OrderCancelled'; aggregateId: string; sequence: number; version: 1; payload: OrderCancelledPayload }
| { type: 'OrderRefunded'; aggregateId: string; sequence: number; version: 1; payload: OrderRefundedPayload };
// ── Exhaustive switch: compiler enforces all cases are handled ─────────
function applyOrderEvent(state: OrderState | null, event: OrderEvent): OrderState {
switch (event.type) {
case 'OrderPlaced':
// TypeScript narrows: event.payload is OrderPlacedPayload here
return {
orderId: event.aggregateId,
status: 'pending',
customerId: event.payload.customerId, // type-safe access
items: event.payload.items,
totalCents: event.payload.totalCents,
};
case 'OrderShipped':
return { ...state!, status: 'shipped', trackingNumber: event.payload.trackingNumber };
case 'OrderCancelled':
return { ...state!, status: 'cancelled', cancelReason: event.payload.reason };
case 'OrderRefunded':
return { ...state!, status: 'refunded', refundCents: event.payload.refundAmountCents };
default:
// Exhaustiveness check: if a new event type is added to the union
// but not handled here, TypeScript raises a compile-time error
const _exhaustive: never = event;
throw new Error(`Unhandled event type: ${(_exhaustive as any).type}`);
}
}
// ── Event registry pattern: map event types to handler functions ───────
type EventHandler<TEvent> = (state: OrderState | null, event: TEvent) => OrderState;
const eventHandlers: {
[K in OrderEvent['type']]: EventHandler<Extract<OrderEvent, { type: K }>>
} = {
OrderPlaced: (state, event) => ({ orderId: event.aggregateId, status: 'pending', customerId: event.payload.customerId, items: event.payload.items, totalCents: event.payload.totalCents }),
OrderShipped: (state, event) => ({ ...state!, status: 'shipped', trackingNumber: event.payload.trackingNumber }),
OrderCancelled: (state, event) => ({ ...state!, status: 'cancelled', cancelReason: event.payload.reason }),
OrderRefunded: (state, event) => ({ ...state!, status: 'refunded', refundCents: event.payload.refundAmountCents }),
};
function applyEventFromRegistry(state: OrderState | null, event: OrderEvent): OrderState {
const handler = eventHandlers[event.type] as EventHandler<OrderEvent>;
return handler(state, event);
}
// ── Type guard: parse raw JSON from event store into typed event ───────
function parseOrderEvent(raw: StoredEvent): OrderEvent {
const payload = raw.payload as Record<string, unknown>;
switch (raw.event_type) {
case 'OrderPlaced':
return { type: 'OrderPlaced', aggregateId: raw.stream_id, sequence: raw.sequence, version: 1, payload: payload as OrderPlacedPayload };
case 'OrderShipped':
return { type: 'OrderShipped', aggregateId: raw.stream_id, sequence: raw.sequence, version: 1, payload: payload as OrderShippedPayload };
case 'OrderCancelled':
return { type: 'OrderCancelled', aggregateId: raw.stream_id, sequence: raw.sequence, version: 1, payload: payload as OrderCancelledPayload };
case 'OrderRefunded':
return { type: 'OrderRefunded', aggregateId: raw.stream_id, sequence: raw.sequence, version: 1, payload: payload as OrderRefundedPayload };
default:
throw new Error(`Unknown event type: ${raw.event_type}`);
}
}
// ── Runtime validation with Zod ───────────────────────────────────────
import { z } from 'zod';
const OrderPlacedSchema = z.object({
type: z.literal('OrderPlaced'),
aggregateId: z.string(),
sequence: z.number().int().positive(),
version: z.literal(1),
payload: z.object({
customerId: z.string(),
totalCents: z.number().int().positive(),
currencyCode: z.string().length(3),
items: z.array(z.object({
productId: z.string(),
name: z.string(),
quantity: z.number().int().positive(),
unitPriceCents: z.number().int().nonnegative(),
})),
}),
});The never exhaustiveness check is the most valuable pattern — when you add a new event type to the union, every switch statement in the codebase that lacks a handler for it becomes a compile error rather than a silent runtime bug during replay. For safe JSON parsing in TypeScript with runtime validation, see the linked guide.
JSON Event Sourcing with CQRS Pattern
CQRS (Command Query Responsibility Segregation) separates the write side (commands that produce events) from the read side (queries against projections). In an event sourced system, the write side appends events to the event store; the read side subscribes to the event stream and maintains denormalized read models. The two sides are eventually consistent — there is a lag between when an event is written and when the projection reflects it.
// ── Command side: validate, emit events, append to store ─────────────
interface PlaceOrderCommand {
type: 'PlaceOrder';
customerId: string;
items: Array<{ productId: string; quantity: number; unitPriceCents: number; name: string }>;
}
async function handlePlaceOrder(
cmd: PlaceOrderCommand,
eventStore: EventStore
): Promise<string> {
// 1. Validate the command (business rules)
if (cmd.items.length === 0) throw new Error('Order must have at least one item');
const totalCents = cmd.items.reduce((sum, i) => sum + i.quantity * i.unitPriceCents, 0);
// 2. Generate a new aggregate ID
const orderId = `order_${crypto.randomUUID()}`;
// 3. Produce events (pure — no I/O at this step)
const event: OrderPlacedEvent = {
type: 'OrderPlaced',
eventId: `evt_${crypto.randomUUID()}`,
aggregateId: orderId,
aggregateType: 'Order',
sequence: 1,
version: 1,
occurredAt: new Date().toISOString(),
correlationId: cmd.customerId,
payload: { customerId: cmd.customerId, items: cmd.items, totalCents, currencyCode: 'USD', placedAt: new Date().toISOString() },
};
// 4. Append to event store (with optimistic concurrency: expect stream to be new)
await eventStore.append(orderId, [event], { expectedSequence: 0 });
return orderId;
}
// ── Read side: projection subscribes to new events and updates read model ──
class OrderProjectionSubscriber {
private checkpointId = 0; // last processed global event id
async start() {
// Load checkpoint from persistent storage
const saved = await db.query<{ last_id: number }>(
'SELECT last_id FROM projection_checkpoints WHERE name = $1',
['order_summaries']
);
this.checkpointId = saved?.last_id ?? 0;
// Poll for new events (or use LISTEN/NOTIFY for lower latency)
setInterval(() => this.catchUp(), 1000);
}
async catchUp() {
const events = await db.query<StoredEvent>(
`SELECT id, stream_id, sequence, event_type, event_version, payload, occurred_at
FROM events WHERE id > $1 ORDER BY id ASC LIMIT 500`,
[this.checkpointId]
);
for (const raw of events) {
const upcasted = applyUpcasters(raw);
const typed = parseOrderEvent(upcasted);
await handleEventForProjection(typed); // upsert to order_summaries
this.checkpointId = raw.id;
}
if (events.length > 0) {
// Persist checkpoint
await db.query(
`INSERT INTO projection_checkpoints (name, last_id) VALUES ($1, $2)
ON CONFLICT (name) DO UPDATE SET last_id = $2`,
['order_summaries', this.checkpointId]
);
}
}
}
// ── Query side: read from projection, not from event store ─────────────
// Fast reads — no event replay, no aggregate loading
async function getOrderSummary(orderId: string) {
return db.query(
'SELECT order_id, customer_id, status, total_cents, item_count, tracking_num FROM order_summaries WHERE order_id = $1',
[orderId]
);
}
async function getOrdersByStatus(status: string, limit = 20) {
return db.query(
'SELECT order_id, customer_id, total_cents, last_updated FROM order_summaries WHERE status = $1 ORDER BY last_updated DESC LIMIT $2',
[status, limit]
);
}
// ── Read model rebuild (projection reset) ─────────────────────────────
async function rebuildOrderSummaries() {
await db.query('TRUNCATE TABLE order_summaries');
await db.query("UPDATE projection_checkpoints SET last_id = 0 WHERE name = 'order_summaries'");
// Subscriber's next catchUp() will replay from id=0
}The checkpoint table is essential for reliable projection catchup — it records the last global event position processed by each projection. Without it, a restart replays from the beginning every time. Store the checkpoint in the same transaction as the projection write to avoid processing an event without updating the checkpoint (which would cause duplicate processing after a crash). For event-driven JSON architecture with message queues, see the linked guide.
Key Terms
- Event Sourcing
- An architectural pattern where the authoritative source of truth is an immutable, append-only log of domain events rather than the current state of each record. State is derived by replaying events in order from the beginning of the stream, or from a snapshot plus a delta of newer events. Every state change is captured as an event with a business name (OrderPlaced, PaymentFailed), a timestamp, and a payload describing what changed. The event log enables time travel (reconstruct any historical state), retroactive projections (build new read models from old events), and complete audit trails without additional audit infrastructure.
- Event Store
- An append-only database optimized for storing and retrieving event streams. Fundamental operations are: append one or more events to a stream (with optimistic concurrency), read all events for a stream in order, and scan all events across all streams in global append order. Implementations include purpose-built stores (EventStoreDB), relational databases with an append-only events table and unique constraint on (stream_id, sequence), and log-based streaming platforms (Kafka, Kinesis partitioned by aggregate ID). The defining constraint is that committed events are never updated or deleted — immutability is the foundation of replay correctness.
- Aggregate
- A cluster of domain objects treated as a single unit for the purpose of data consistency. In event sourcing, an aggregate owns one event stream identified by its ID. All events for an aggregate are ordered by a sequence number within that stream. The aggregate enforces business rules (invariants) before producing events — it loads its current state by replaying its event stream, validates the incoming command against the current state, and produces new events if the command is valid. Aggregates should be sized to contain only the state needed to enforce invariants — large aggregates accumulate many events and require snapshot optimization to avoid O(N) replay latency.
- Projection
- A read model built by reducing a stream of events into a data structure optimized for a specific query pattern. Projections are derived views — they can be deleted and rebuilt at any time by replaying the event log. A projection handler is a pure function from (currentState, event) to newState. In a CQRS architecture, projections are maintained by a subscriber that reads from the event stream and writes to a denormalized read table (order_summaries, user_profiles, inventory_counts). Multiple independent projections can be built from the same event log, each optimized for different query patterns, without affecting the write side or each other.
- Snapshot
- A point-in-time serialization of an aggregate's current state stored alongside the event stream to optimize replay performance. Without snapshots, loading an aggregate requires replaying all N events from position 0 — O(N) cost. With a snapshot taken every K events, loading requires fetching the snapshot and replaying only the K events since the snapshot — O(K) cost. Snapshots are an optimization only: they are never the authoritative source of truth. If a snapshot is corrupted or missing, the aggregate can always be rebuilt from the event stream. Common thresholds are 100–500 events per aggregate. Store the snapshot as a JSON document with the sequence number of the last included event.
- Event Upcasting
- A transformation function that converts an older event schema to the current schema when reading from the event store, without modifying the stored event. Upcasting is the mechanism for schema evolution in event sourcing: when a breaking change is needed (splitting a field, changing a type), the stored events are left untouched and an upcaster function is registered to transform events of the old version into the current shape. Projection handlers always receive the current schema version regardless of what is stored. Upcasters are chained: v1 to v2 upcaster followed by v2 to v3 upcaster produces current schema from any historical version.
- CQRS (Command Query Responsibility Segregation)
- An architectural pattern that separates the write side (commands that change state and produce events) from the read side (queries against denormalized read models). In an event sourced system, the write side validates commands, loads aggregates by replaying events, enforces business invariants, and appends new events to the event store. The read side subscribes to the event stream, maintains one or more projection tables optimized for specific query patterns, and serves queries directly from those tables without touching the event store. The two sides are eventually consistent: there is a propagation lag between when an event is written and when the projection reflects it.
- Eventual Consistency
- A consistency model where read models are guaranteed to reflect all committed writes eventually — but not necessarily immediately. In event sourced CQRS systems, a write appends an event to the event store; a subscriber asynchronously reads that event and updates the projection. During the propagation window (typically milliseconds to seconds), a query against the projection may return stale data. Eventual consistency requires explicit design: user interfaces should reflect the command result optimistically rather than relying on a synchronous read-after-write; critical consistency requirements may use the event store directly (synchronous aggregate load) rather than a projection.
FAQ
What is event sourcing and why is JSON commonly used for events?
Event sourcing is an architectural pattern where instead of storing only the current state of a record, you store the full sequence of state-change events that produced that state. The current state is derived by replaying all events from the beginning of the stream — or from a recent snapshot plus a delta of newer events. A traditional CRUD system stores UPDATE orders SET status = 'shipped', discarding what the status was before; event sourcing stores an OrderShipped event with a timestamp, actor, and reason, preserving the complete audit trail forever. JSON is the dominant format for events because it is human-readable during debugging and incident response; schema-flexible (new optional fields can be added without a migration); natively supported by every language and database; and directly stored in document stores, event streaming platforms, and relational databases with JSONB columns without a serialization adapter. The tradeoff is size: a JSON event averages 200–500 bytes compared to 50–150 bytes for binary formats like Avro or Protobuf. For most systems, JSON's operational simplicity wins.
How do I design a JSON schema for events that supports backward compatibility?
The fundamental rule is additive-only changes: you may add new optional fields to an event payload, but you must never remove a field, rename a field, or change the type of an existing field. This rule must hold forever because you will always need to replay historical events. The event envelope (type, aggregateId, sequence, version, occurredAt) must be especially stable. For breaking changes, introduce a new version: OrderPlaced_v2 alongside the original OrderPlaced. The original event remains unchanged in the store; new events use the new type. An upcaster transforms old shapes to the current shape before projection handlers receive them. Use integer cents for monetary values, ISO 8601 strings for dates, and literal string enums for status fields. Define event schemas as TypeScript discriminated unions — the compiler enforces that all event types are handled, and payload types are narrowed automatically.
What is the difference between event sourcing and traditional CRUD with an audit log?
In traditional CRUD with an audit log, the primary table is the authoritative source of truth and the audit log is a side-effect. In event sourcing, the event log is the authoritative source of truth and the current state is a derived view. This distinction has deep practical consequences. CRUD with an audit log: current state is always immediately available; the audit log can fall out of sync if writes bypass the application layer; the audit schema often captures only field-level changes, not business intent. Event sourcing: there is no "current state" table — projections are computed by replaying events; the event log is append-only and immutable; events capture business intent (OrderPlaced, PaymentFailed) not just field changes; and you can retroactively build any read model from the same event log, including models not anticipated when events were first written. The cost is higher initial complexity: event store infrastructure, projection rebuilding, and snapshot management that CRUD systems do not require.
How do I build a projection (read model) from a stream of JSON events?
A projection is a pure reduce function over an event stream that produces a read model optimized for a specific query. Start with an initial empty state and apply each event in order using a switch on the type field — each case returns an updated state object using spread syntax to preserve immutability. For a CQRS read model, a subscriber reads from the event stream and upserts a denormalized read table on each event. To rebuild from scratch, truncate the read table and replay all events from global position 0. Snapshot optimization: every 100–500 events, serialize the current aggregate state to a snapshot table. On subsequent loads, fetch the latest snapshot and replay only the events after the snapshot's sequence number, reducing O(N) replay to O(snapshot_delta). Projection handlers must be pure — no I/O, no side effects — so they can be replayed deterministically from any position without unexpected results.
What is a snapshot in event sourcing and when should I use one?
A snapshot is a point-in-time serialization of an aggregate's current state stored alongside the event stream. When loading an aggregate, instead of replaying all events from position 0 (O(N) cost), you fetch the latest snapshot and replay only the events after the snapshot's sequence number. You should introduce snapshots when aggregate load time becomes a measurable bottleneck. A common threshold is 100–500 events per aggregate: below 100 events, replay is typically fast enough; above 500 events, replay latency may impact user-facing request handling. The snapshot is a JSON document containing the full aggregate state plus the last included sequence number. Snapshotting strategies include after-write (snapshot after every N writes), lazy (snapshot if event count exceeds threshold on a read), and background (a separate process periodically snapshots large aggregates). Snapshots are an optimization — they can always be deleted and rebuilt by replaying the event log. Never treat a snapshot as authoritative if the event stream contradicts it.
How do I handle event schema evolution without breaking historical replays?
Use the upcasting pattern: a transformation function that converts an older event shape to the current shape when reading from the event store, without modifying stored events. When you need to change an event (splitting fullName into firstName and lastName), you: (1) increment the event version; (2) write new events using the new schema; (3) register an upcaster that transforms old events to the new shape on read; (4) update projection handlers to work with the new shape. The upcaster runs transparently in the event loading layer — projections always receive the current schema regardless of what is stored. Additive changes (adding an optional field) need no upcaster — projection handlers use a default value if the field is absent. The three safe operations: add an optional field, split a field via upcasting, merge fields via upcasting. The three forbidden operations: remove a field, rename a field without an upcaster, change a field's type without an upcaster.
Can I use PostgreSQL as an event store with JSON columns?
Yes. PostgreSQL with JSONB is a practical event store for systems that do not require dedicated event streaming throughput. The core table design: events with columns id (bigserial, global ordering), stream_id (aggregate ID), sequence (position within stream), event_type, event_version, payload (jsonb), metadata (jsonb), occurred_at (timestamptz). Create a unique constraint on (stream_id, sequence) for optimistic concurrency — an INSERT that violates it means another process wrote first. Index by (stream_id, sequence) for per-aggregate replay. Use id for global position scanning. GIN index on payload enables cross-stream queries. The limitation: PostgreSQL does not natively support stream subscriptions — use polling or LISTEN/NOTIFY for projection catchup. EventStoreDB provides native subscriptions and expectedVersion concurrency checks out of the box, making it better for high-throughput or subscription-heavy workloads.
Further reading and primary sources
- Martin Fowler: Event Sourcing — Original canonical description of the event sourcing pattern by Martin Fowler, covering motivations, tradeoffs, and implementation considerations
- EventStoreDB Documentation — Official EventStoreDB docs covering stream operations, subscriptions, projections, optimistic concurrency, and JSON event storage
- Greg Young: CQRS and Event Sourcing — Greg Young's original documents on CQRS and event sourcing, defining aggregates, commands, events, and projections
- Versioning in an Event Sourced System (Leanpub) — Greg Young's comprehensive guide to event schema versioning, upcasting patterns, and maintaining backward compatibility
- PostgreSQL JSONB Documentation — Official PostgreSQL reference for JSONB storage, operators, GIN indexing — the foundation for PostgreSQL as an event store