Teu progresso
0 / 83 módulos0%
Estágio 04 · 04-03
BloqueadoTimes adotam "event-driven architecture" e descobrem que cada feature vira investigação forense em N consumers. Eventos sem schema, sem versioning, sem ordering claro, sem dead letter. Saga sem compensation. CQRS aplicado em CRUD trivial. Event sourcing onde simples table seria melhor.
Este módulo separa conceito de cargo cult. Quando cada padrão vence, quando custa mais que entrega, e como combinar: events as integration, event sourcing, CQRS, outbox, sagas (choreography vs orchestration), idempotency design, projection rebuilds, schema evolution.
Andreas Tobias et al. distinguem:
Cada nível adiciona complexidade. Use o mínimo necessário.
OrderPaid {orderId, ts}).GET /orders/{id} pra detalhe.Pros: simple, freshest data. Cons: callback chatter; A precisa estar disponível pra B agir.
OrderPaid {orderId, customerId, amount, items: [...]}.Pros: serviços desacoplados de A's availability. Cons: events maiores; schema crescente; data dup entre services.
State é derivado de log de eventos. Não há "tabela de orders" canonical; há OrderEvents log. Current state = fold dos eventos.
Pros:
Cons:
Quando vence: domínios com alto valor de auditoria/regulação (financeiro, médico), domínios com lógica complexa que beneficia retro-thinking.
Command Query Responsibility Segregation. Models separados pra write (commands) e read (queries).
Write side: aggregate, normalização, integridade. Read side: projetions otimizadas pra queries (denormalized, search-friendly).
Pros:
Cons:
CQRS sem ES é viável: escreva no Postgres normalized, projete pra ElasticSearch via worker.
CQRS + ES é o stack puro.
Conceito DDD: cluster de objetos consistentes. Aggregate root é entry point. Transações alteram 1 aggregate por vez.
Em event sourcing: aggregate's state = fold de seus events. Commands geram events.
Pra Logística: Order é aggregate. Eventos OrderCreated, OrderAssigned, OrderDelivered modificam. Customer outro aggregate.
Cross-aggregate transactions = saga (não txn ACID).
Choreography vs orchestration:
Engines: Temporal, Camunda, Orkes Conductor, AWS Step Functions. Code workflows com retries, timeouts, persistence.
Em greenfield com flows complexos (5+ steps), orchestration via Temporal é altamente produtivo.
Padrão central que vimos. Detalhes operacionais:
id, aggregate_type, aggregate_id, payload, created_at, processed_at, attempts.pg_notify pode reduzir polling latency.id como producer key/dedup.CDC alternativa via Debezium: source of truth é Postgres directly. Sem outbox; events derivados de WAL.
Eventos são contratos, evolução exige cuidado.
Padrões:
OrderCreatedV1, OrderCreatedV2. Consumers escolhem.Nunca:
Em ES, eventos antigos no log são imutáveis. Você convive com 5 versões eternamente. Disciplina vira crítica.
Read models construídos consumindo events. Cada projection é um consumer (group, position).
Operations:
Snapshots:
OrderCreated, não CreateOrder.Anti-padrões:
UI precisa lidar com:
Pattern: "read your writes", após write, cliente caches imediato; backend confirma async; UI consistent enough.
Quando integrando com sistema externo (legacy, third-party API), ACL traduz model externo pra model interno. Eventos podem ser veículo: ACL consome events externos e emite events internos limpos.
Mantém domain core imune a model alheio.
Você não precisa K microservices pra adotar EDA. Modular monolith com in-process events é poderoso:
Vince Knight, Vaughn Vernon e outros pregam isso. Em projetos médios, é o sweet spot.
Producer outbox + idempotent consumer = "exactly-once processing":
Step list:
ReserveCourier → reserve em DB de courier; compensation ReleaseCourier.ChargeFee → invoke billing API; compensation RefundFee.NotifyCourier → push to mobile; compensation: nothing or apologetic notification.Estados: started, compensating, succeeded, failed.
Workflow engine torna isso código declarativo. Sem engine, você implementa via state machine + events; mais trabalho, mais ad-hoc.
Além de "consumer reagindo": agregações em janela.
Use case: dashboard real-time de orders/min por tenant; alerta de courier offline > 10 min.
Subseção §2.4 introduziu ES conceitualmente. Aqui está o operacional: schema do event store, 5 estratégias de versioning, snapshot pattern, rebuild de projection sem downtime, optimistic concurrency, e anti-patterns que corrompem audit log.
Event store schema (Postgres canonical):
CREATE TABLE events (
stream_id UUID NOT NULL,
version INT NOT NULL,
type TEXT NOT NULL,
payload JSONB NOT NULL,
metadata JSONB NOT NULL DEFAULT '{}'::jsonb,
ts TIMESTAMPTZ NOT NULL DEFAULT now(),
global_seq BIGSERIAL,
PRIMARY KEY (stream_id, version)
);
CREATE INDEX events_global_seq_idx ON events (global_seq);
CREATE INDEX events_type_ts_idx ON events (type, ts);
PRIMARY KEY (stream_id, version) garante optimistic concurrency: append duplicado falha. global_seq BIGSERIAL dá ordem global monotônica para projection consumers. Index (type, ts) serve audit ad-hoc; nada além disso — event store é append-only log, não tabela query-driven.
Append idempotente:
INSERT INTO events (stream_id, version, type, payload, metadata)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (stream_id, version) DO NOTHING
RETURNING global_seq;
0 rows retornadas = optimistic conflict. Reload events, recompute aggregate, retry command. Sempre attach command_id em metadata para dedup; retry sem idempotency key vira double-issue se command processou parcialmente. Falha após X tentativas → 409 ao client.
Versioning — 5 estratégias:
OrderConfirmedV1, emite OrderConfirmedV2 para writes novos. Consumers handlam ambos.schema_version invalida snapshots; hydration força reload from events com upcasting.Decision tree: event store < 1M events e migration cheap → lazy migration. Event store grande, breaking change → upcast on read. Hot path com muito throughput → multiple events.
Snapshot pattern:
CREATE TABLE snapshots (
stream_id UUID PRIMARY KEY,
version INT NOT NULL,
schema_version INT NOT NULL,
payload JSONB NOT NULL,
ts TIMESTAMPTZ NOT NULL DEFAULT now()
);
Hydration de aggregate com 10k events é proibitivo. Snapshot a cada N=50–100 events:
async function loadOrder(streamId: string): Promise<Order> {
const snap = await db.oneOrNone(
`SELECT version, schema_version, payload FROM snapshots WHERE stream_id = $1`,
[streamId]
);
const fromVersion = snap && snap.schema_version === Order.SCHEMA_VERSION ? snap.version : 0;
const events = await db.manyOrNone(
`SELECT version, type, payload FROM events
WHERE stream_id = $1 AND version > $2 ORDER BY version ASC`,
[streamId, fromVersion]
);
const base = fromVersion > 0 ? Order.fromSnapshot(snap.payload) : Order.empty(streamId);
return events.reduce((agg, e) => agg.apply(upcast(e)), base);
}
Snapshot é cache derivado, NUNCA source of truth. Consumer que lê snapshot está bug. Schema migration de aggregate sem bumping schema_version aplica V2 events em V1 state — silent corruption. Logística: Order com 200+ status events ao longo de delivery; snapshot a cada 50 reduz hydration de 200ms → 5ms.
Projection rebuild sem downtime:
async function runProjection(name: string, handler: (e: Event) => Promise<void>) {
let cursor = await getCursor(name); // 0 para greenfield
while (true) {
const batch = await db.manyOrNone(
`SELECT * FROM events WHERE global_seq > $1 ORDER BY global_seq ASC LIMIT 1000`,
[cursor]
);
if (batch.length === 0) { await sleep(100); continue; }
for (const e of batch) await handler(e); // idempotent UPSERT
cursor = batch[batch.length - 1].global_seq;
await setCursor(name, cursor);
}
}
Catch-up: consumer parte de global_seq=0, processa histórico até alcançar tip, segue realtime. Numbers reais: 10M events × ~1ms cada = ~3h single-threaded. Paraleliza shardando por hash(stream_id) % N. Cutover safe: nova projection roda em paralelo, compara reads com antiga, switch read traffic, drop antiga.
Lag monitoring (cruza com 04-02, outbox + idempotent consumer):
SELECT name, (SELECT MAX(global_seq) FROM events) - last_processed_seq AS lag
FROM projection_cursors;
SLO: lag p99 < 500ms healthy. Aggregate persist + outbox = TX atômica; consumer idempotente via UPSERT no projection table.
Event design — 5 regras:
OrderShipped, nunca ShipOrder.OrderCancellationRequested).Sales.OrderPlaced ≠ Fulfillment.OrderPlaced em microservices.Logística end-to-end: Order aggregate emite OrderPlaced, CourierAssigned, PickedUp, OutForDelivery, Delivered, CancellationRequested, Cancelled. Snapshot a cada 50. Três projections: orders_view (lista por tenant), dashboard_aggregate (counts por status), delivery_eta (window de 24h para ML). Versioning real: CourierAssignedV1 (courier_id) → CourierAssignedV2 (adiciona vehicle_type); aggregate upcasta V1 inferindo vehicle_type='unknown'.
Anti-patterns observados:
schema_version (load aplica V2 events em V1 state).ShipOrder em vez de OrderShipped).ON CONFLICT DO NOTHING sem retry loop (silent loss em concurrent write).Cruza com: 04-02 (outbox + idempotent consumer), 04-01 (logical clocks, ordering em multi-shard), 04-06 (DDD, aggregate + invariants), 02-09 (Postgres, event store schema + indexing), 04-13 (streaming, projection compute via Kafka Streams/Flink).
Atomic write to DB e publish to broker é o problema distribuído canônico. Sem 2PC (lento, frágil, indisponível em maioria dos brokers modernos), a única resposta production-grade é transactional outbox: business write e event row na mesma transação Postgres; relay process separado publica para o broker. Saga estende: processo multi-step sem 2PC, cada step com compensating action explícita. 2026 trouxe maturidade: Debezium 2.7+ estável para CDC outbox, Temporal 1.25+ TS SDK production, Restate 1.x como alternativa Rust mais leve, Inngest 3.x para times serverless-first.
Schema mínimo:
CREATE TABLE outbox (
id BIGSERIAL PRIMARY KEY,
aggregate_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload JSONB NOT NULL,
headers JSONB NOT NULL DEFAULT '{}'::jsonb,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
published_at TIMESTAMPTZ
);
CREATE INDEX outbox_unpublished_idx ON outbox (id) WHERE published_at IS NULL;
CREATE INDEX outbox_partition_idx ON outbox (aggregate_id, id);
Business write + outbox no mesmo BEGIN:
BEGIN;
UPDATE orders SET status = 'paid' WHERE id = $1;
INSERT INTO outbox (aggregate_id, event_type, payload, headers)
VALUES ($1, 'OrderPaid',
jsonb_build_object('orderId', $1, 'amount', $2, 'paidAt', now()),
jsonb_build_object('message_id', gen_random_uuid()::text,
'trace_id', $3));
COMMIT;
Atomicity garantida: se transação rollback, evento não existe; se commit, evento existe e será publicado. Nunca insira outbox em conexão/transação separada da business write — anula a invariante.
Debezium connector lê WAL via logical replication slot, transforma cada INSERT em outbox em evento Kafka. Latência ~100ms p99. Config (Debezium 2.7+):
{
"name": "logistica-outbox",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"slot.name": "logistica_outbox_slot",
"publication.name": "logistica_outbox_pub",
"table.include.list": "public.outbox",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.by.field": "event_type",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.table.fields.additional.placement": "headers:header"
}
}
Trade-offs CDC: latência baixa, sem worker para operar; mas replication slot precisa monitoring (pg_replication_slots.confirmed_flush_lsn lag); slot abandonado retém WAL → disco enche → DB trava. Set wal_keep_size = 1GB mínimo (5-10GB para resiliência), alerte em slot lag > 500MB.
Worker simples, latência 500ms-5s, scale horizontal por partition:
-- Worker loop
BEGIN;
SELECT id, aggregate_id, event_type, payload, headers
FROM outbox
WHERE published_at IS NULL
ORDER BY id
LIMIT 100
FOR UPDATE SKIP LOCKED;
-- publish each row to Kafka with key = aggregate_id (preserves ordering per aggregate)
UPDATE outbox SET published_at = now() WHERE id = ANY($1::bigint[]);
COMMIT;
SKIP LOCKED permite N workers sem contenção. LISTEN/NOTIFY no commit reduz latência: worker bloqueia em LISTEN outbox_new, trigger after insert faz NOTIFY outbox_new. Sem LISTEN/NOTIFY, polling interval 200-500ms. Polling vence CDC quando: time não tem capacidade ops Debezium/Connect, throughput < 1k events/s, ou DB não pode habilitar logical replication.
Broker entrega at-least-once. Consumer precisa dedup. Tabela inbox no consumer:
CREATE TABLE inbox (
message_id TEXT PRIMARY KEY,
topic TEXT NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- TTL via partition drop ou vacuum job
CREATE INDEX inbox_processed_at_idx ON inbox (processed_at);
Handler:
async function handle(msg: KafkaMessage) {
const messageId = msg.headers.message_id;
await pg.transaction(async (tx) => {
const ins = await tx.query(
`INSERT INTO inbox (message_id, topic) VALUES ($1, $2) ON CONFLICT DO NOTHING RETURNING message_id`,
[messageId, msg.topic]
);
if (ins.rowCount === 0) return; // already processed
await applyBusinessEffect(tx, msg.payload);
});
}
UNIQUE constraint em message_id é a invariante. TTL: 24-72h cobre janela de retry típica; jobs de vacuum diário evitam crescimento infinito.
| Orchestrator | Stack / Maturidade | Forte em | Fraco em |
|---|---|---|---|
| Temporal 1.25+ | Java/Go core, TS/Python/.NET SDKs, self-host ou Cloud | Workflows complexos longos (dias-meses), signals + queries, history replay | Setup pesado (Cassandra/Postgres + frontend + worker), curva |
| Restate 1.x | Rust core, JS/TS/Java/Kotlin SDKs (Q4 2024 estável) | RPC-style durable handlers, ops simples, latência baixa, virtual objects | Ecosistema novo, menos battle-tested em > 1M execuções/dia |
| Inngest 3.x | Hosted-first (self-host Q4 2024), JS/TS/Python/Go | DX altíssimo, step functions, flow control + concurrency limits, retries declarativos | Hosted = vendor; debug de step graph complexo |
| AWS Step Functions | JSON ASL, Standard + Express | Serverless puro, integração nativa AWS, observability | AWS lock-in, cost explosion em high-frequency (use Express, < 5min) |
| Camunda 8 / Zeebe 8.7 | BPMN visual, Java-first | Workflows com stakeholder não-dev, auditoria regulatória | Heavy ops, BPMN learning curve |
// activities.ts
export async function chargePayment(orderId: string, amount: number): Promise<string> { /* ... */ }
export async function refundPayment(chargeId: string): Promise<void> { /* idempotent */ }
export async function reserveCourier(orderId: string): Promise<string> { /* ... */ }
export async function releaseCourier(reservationId: string): Promise<void> { /* idempotent */ }
export async function notifyCustomer(orderId: string, status: string): Promise<void> { /* ... */ }
// workflow.ts
import { proxyActivities, ActivityFailure } from '@temporalio/workflow';
const acts = proxyActivities<typeof activities>({
startToCloseTimeout: '30s',
retry: { initialInterval: '1s', backoffCoefficient: 2, maximumAttempts: 5 },
});
export async function fulfillOrderWorkflow(orderId: string, amount: number) {
const compensations: Array<() => Promise<void>> = [];
try {
const chargeId = await acts.chargePayment(orderId, amount);
compensations.unshift(() => acts.refundPayment(chargeId));
const reservationId = await acts.reserveCourier(orderId);
compensations.unshift(() => acts.releaseCourier(reservationId));
await acts.notifyCustomer(orderId, 'fulfilled');
} catch (err) {
for (const comp of compensations) {
await comp().catch((e) => { /* log; comp idempotente reentra */ });
}
await acts.notifyCustomer(orderId, 'failed');
throw err;
}
}
Workflow start latency Temporal ~50-100ms p99. Crítico: side effects (HTTP, DB write) só dentro de activity — workflow code é replayed deterministicamente; activity tem at-least-once semantics, então toda activity deve ser idempotente.
import { service, handlers } from '@restatedev/restate-sdk';
export const fulfillment = service({
name: 'fulfillment',
handlers: {
fulfill: async (ctx, req: { orderId: string; amount: number }) => {
const chargeId = await ctx.run('charge', () => chargePayment(req.orderId, req.amount));
try {
const resId = await ctx.run('reserve', () => reserveCourier(req.orderId));
await ctx.run('notify', () => notifyCustomer(req.orderId, 'fulfilled'));
return { chargeId, resId };
} catch (e) {
await ctx.run('refund', () => refundPayment(chargeId));
await ctx.run('notify-fail', () => notifyCustomer(req.orderId, 'failed'));
throw e;
}
},
},
});
ctx.run persiste resultado; replay pula side effects já executados. Modelo mais próximo de "código normal + durabilidade", menos cerimônia que Temporal, mas ecosistema 2026 ainda maturando.
import { Inngest } from 'inngest';
const inngest = new Inngest({ id: 'logistica' });
export const fulfillOrder = inngest.createFunction(
{
id: 'fulfill-order',
retries: 5,
concurrency: { limit: 50, key: 'event.data.region' },
rateLimit: { limit: 100, period: '1m' },
},
{ event: 'order/paid' },
async ({ event, step }) => {
const charge = await step.run('charge', () => chargePayment(event.data.orderId, event.data.amount));
try {
const res = await step.run('reserve', () => reserveCourier(event.data.orderId));
await step.sendEvent('notify', { name: 'order/fulfilled', data: { orderId: event.data.orderId } });
return { charge, res };
} catch (e) {
await step.run('refund', () => refundPayment(charge));
throw e;
}
},
);
Regra: começou com choreography e tem 5+ serviços envolvidos no mesmo processo de negócio? Migre para orchestration.
[charge OK, reserve OK, notify FAIL] → retry notify; [charge OK, reserve FAIL] → refund charge + notify failure.outbox table no orders-service; INSERT na mesma transação do UPDATE orders SET status='paid'.logistica_outbox_slot; publica em Kafka topic orders.events. Latência commit → Kafka ~150ms p99.fulfillment-service consome orders.events; inbox dedup table com TTL 24h via partition drop diário.fulfillOrderWorkflow: activities chargePayment → reserveCourier → notifyCustomer; compensations refundPayment, releaseCourier, notifyFailure. Cada activity verifica idempotency_key na business table antes de side effect externo.orders.events.dlq; alerta SRE; manual replay via tool interno.FOR UPDATE SKIP LOCKED — workers contendem na mesma row; throughput colapsa.message_id gerado no consumer em vez de no producer — dedup vira no-op (cada consumer gera id distinto para mesma mensagem reentregue).Cruza com: 04-03 §2.4 (event sourcing), §2.7 (saga intro), §2.8 (outbox revisited), §2.13 (anti-corruption layer), §2.15 (outbox + idempotency consumer), §2.16 (saga design), §2.18 (event sourcing operacional), 04-02 §2.18 (idempotent consumer + dedup), §2.20 (Kafka 4.0 + share groups), 02-09 §2.13 (Postgres logical replication), 04-08 §2.21 (saga patterns deep — Temporal/Cadence), 04-13 §2.12 (CDC), 04-04 §2.30 (compensations + DR), 04-01 §2.21 (logical clocks for ordering).
Você precisa, sem consultar:
Aplicar CQRS + Event Sourcing parcial em um subdomain do Logística.
Order aggregate.
order_events(stream_id, version, type, payload, ts).version (optimistic).Order.create, Order.assign, Order.markPickedUp, Order.markDelivered. Cada um valida invariants e emite event.Order.fromEvents(events).orders_view (tabela Postgres), atualizada por consumer de events.dashboard_aggregate (Postgres ou Redis), counts por tenant.OrderAssigned V2.Threshold de Maestria
Acerte todas as 5 pra marcar o módulo como concluído. Sem pressa, sem timer. Tudo fica salvo no teu navegador.
Q1Qual é a distinção fundamental entre event sourcing e CQRS?
Q2Em saga choreography vs orchestration, qual o principal trade-off em sistemas com 5+ serviços?
Q3Por que snapshots em event sourcing devem incluir um campo schema_version?
Q4Por que ON CONFLICT DO NOTHING em append de event store exige retry loop?
Q5Qual evidência indica que outbox CDC com replication slot está mal monitorado?
Destrava
04-03 é prereq dos seguintes módulos: