Teu progresso
0 / 83 módulos0%
Estágio 04 · 04-13
BloqueadoLogística produz toneladas de eventos: pings de localização, mudanças de status, métricas operacionais, transações financeiras. Postgres OLTP guarda o agora; ClickHouse/Timescale (03-13) guarda agregados. Mas quando você precisa de transformações declarativas em pipeline (limpar, deduplicar, joinar com dimensões, alimentar ML, gerar relatórios contábeis), entra a categoria de data engineering com ferramentas próprias: Spark, Flink, dbt, Airflow, Dagster, Beam, Materialize.
Diferença chave: batch vs streaming. Batch processa snapshots periódicos (diário, hourly). Streaming processa eventos contínuos com windows (5 min sliding, sessions, etc). Real-time analytics, fraud detection ao vivo, dashboards user-facing exigem streaming. Reportes financeiros mensais são batch.
Este módulo é data processing por dentro: lambda vs kappa architectures, streaming engines (Flink, Spark Structured Streaming), batch tools (Spark, dbt, Airflow), consistency em streaming (exactly-once, watermarks, late events), e como integrar tudo via lakehouse (Iceberg/Delta/Hudi) sem virar mil-pipelines spaghetti.
Lambda (Marz): dois caminhos paralelos, batch layer (correto, lento) + speed layer (rápido, aproximado), unidos em serving layer. Complexidade alta (dois codebases).
Kappa (Kreps): só streaming. Re-process via re-read do log. Reprocessing batch-style sobre stream histórico.
Kappa simplifica e tem ganho com Kafka log retention longa. Lambda persiste em algumas orgs.
Flink é "default sério"; ksqlDB e Materialize ganham em DX.
Categoria que cresceu 2023-2025: você escreve SQL como em Postgres, engine mantém materialized view incrementalmente atualizada conforme dados chegam. Não é micro-batch, é update real-time conforme cada change.
Modelo conceitual:
Exemplo em Materialize:
CREATE SOURCE orders FROM KAFKA BROKER 'kafka:9092' TOPIC 'orders'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://sr:8081';
CREATE MATERIALIZED VIEW revenue_per_user AS
SELECT user_id, SUM(amount) AS total
FROM orders
WHERE status = 'paid'
GROUP BY user_id;
-- agora SELECT retorna em ms, sempre fresco
SELECT * FROM revenue_per_user WHERE user_id = 42;
Materialize:
RisingWave:
Quando vale streaming SQL incremental sobre Flink:
psql no engine, descobrir queries iterativamente.Quando NÃO vale:
Padrões de uso emergentes:
Late events (chegam minutos depois): comum (mobile offline, network). Engine precisa lidar com out-of-order.
Watermark: heurística "vi todos os eventos com event_time ≤ T". Permite fechar windows. Sempre tradeoff entre latência de fechar e completude.
Aggregations (count, sum, percentiles) por window. Late events disparam side outputs ou updates.
Difícil de ser estritamente exactly-once em sistema distribuído. Effectively-once via:
Spark Structured uses similar approach (offset tracking + idempotent writes).
State pode ser enorme (terabytes). Backends:
Operações: keyed state (por key), operator state, broadcast.
State migration entre versões é desafio (savepoints permitem upgrade).
Producer mais rápido que consumer. Engines lidam:
Sem backpressure, OOM ou lag explosivo.
Apache Spark: distributed compute. Core abstraction RDD (legado), DataFrame/Dataset API (preferido), SQL.
Execução: query → optimized plan (Catalyst) → physical plan → tasks. Distribuído via cluster manager (YARN, K8s, standalone).
Forças: scale (TB+), expressivo (SQL + UDF + ML), Delta Lake integration. Fraquezas: latency (segundos+), JVM overhead.
dbt (data build tool): SQL templated com Jinja, dependências entre modelos, tests, documentação.
Use case: data warehouse transformations. SQL → SQL (não move dados; gera tabelas/views no warehouse).
-- models/orders_daily.sql
{{ config(materialized='table') }}
SELECT date_trunc('day', created_at) as d, count(*) as orders
FROM {{ ref('orders') }}
GROUP BY 1
Dependências (ref) montam DAG. dbt run executa. dbt test valida (not_null, unique, custom).
dbt = padrão modern de transformations em warehouse (Snowflake, BigQuery, Redshift, ClickHouse).
dbt incremental models são onde times param de "rodar tudo full-refresh" e começam a ter pipelines escaláveis. Mas escolha errada de strategy = registros duplicados, deletes silenciosos, ou full table rewrite mascarado. Quatro strategies oficiais: append, merge, delete+insert, insert_overwrite. Cada uma resolve cenário diferente e quebra de jeito específico se aplicada errado.
| Strategy | Como funciona | Quando usa | DB suportado | Custo write | Race conditions |
|---|---|---|---|---|---|
append | INSERT só novas rows | Append-only event stream sem updates | Todos | Mínimo | Nenhum (no conflicts) |
merge | MERGE ON unique_key (UPDATE if exists, INSERT if not) | SCD type 1, dimensões mutáveis | Snowflake, BigQuery, Databricks, Postgres 15+ | Médio | Resolve via merge atomic |
delete+insert | DELETE rows com unique_key IN (...) + INSERT | Postgres < 15, Redshift, fact reload por janela | Todos | Alto (delete cost) | Window de inconsistência entre DELETE e INSERT |
insert_overwrite | DROP+REPLACE partition específica | Particionado por dia/hora; reload janela completa | BigQuery, Spark, Databricks | Baixo (atomic per partition) | Atomic em partition level |
append deep — o mais simples e mais traiçoeiro. Bom: page_views, clickstream, raw events. Mau: qualquer source com retries no upstream → duplica. Sempre adicione dbt_utils.deduplicate downstream se source tem retry semantics.
{{ config(materialized='incremental', incremental_strategy='append') }}
select * from {{ source('events', 'page_views') }}
{% if is_incremental() %}
where event_at > (select max(event_at) from {{ this }})
{% endif %}
Pegadinha: event_at > max(event_at) perde events com = exato. Use >= + dedupe downstream OU watermark com cushion (event_at > max(event_at) - interval '1 hour' + dedupe). Trade-off: cushion captura late-arriving data ao custo de reprocessar window e exigir dedupe explícito.
merge deep — escolha default em modern warehouses. Snowflake/BigQuery/Databricks: MERGE é atomic, performante, não tem janela de inconsistência. Postgres 15+: também tem MERGE nativo (antes era CTE-com-UPDATE-ou-INSERT trick). unique_key é obrigatório e tem que ter unique constraint OU índice unique pra evitar full table scan no MERGE source side.
{{ config(
materialized='incremental',
incremental_strategy='merge',
unique_key='order_id',
on_schema_change='sync_all_columns'
) }}
select
order_id,
customer_id,
status,
total,
updated_at,
_airbyte_emitted_at as ingested_at
from {{ source('app_db', 'orders') }}
{% if is_incremental() %}
where _airbyte_emitted_at > (select max(ingested_at) from {{ this }})
{% endif %}
Sub-pegadinha: SCD type 2 (history) pede pattern diferente — dbt_snapshot ou snapshot table custom; merge sozinho perde history.
delete+insert deep — quando precisa em Postgres < 15 ou window reload. Cenário: reprocessar últimos 7 dias de fact_sales porque corrigiu source data.
{{ config(
materialized='incremental',
incremental_strategy='delete+insert',
unique_key='sale_id'
) }}
select * from {{ ref('stg_sales') }}
{% if is_incremental() %}
where sale_at >= dateadd('day', -7, current_date)
{% endif %}
Pegadinha crítica: a window entre DELETE e INSERT (mesmo em transação) tem outras queries lendo zero rows em isolation levels mais frouxos. Read-replica downstream pode renderizar dashboard com 0 rows. Mitigação: rodar em horário de baixa leitura OU usar materialized view sobre snapshot. Custo: 7 dias de DELETE em fact com 100M rows = scan + lock. unique_key precisa estar indexado.
insert_overwrite deep — partição particionada vence. BigQuery / Databricks / Spark com partitioned tables. dbt detecta partições afetadas, dropa, recria.
{{ config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={'field': 'sale_date', 'data_type': 'date', 'granularity': 'day'}
) }}
select * from {{ ref('stg_sales') }}
{% if is_incremental() %}
where sale_date in ({{ partitions_to_replace() | join(', ') }})
{% endif %}
Macro partitions_to_replace(): define quais partições reprocessar (últimos N dias, ou var('reload_dates')). Vantagem: atomic per partition, custo proporcional à window, não scan tabela inteira. Pegadinha: schema change em coluna não-partition pode falhar — força full-refresh.
Decision tree (operacional):
append.merge.delete+insert.insert_overwrite.Operational hardening:
lookback_window (var('lookback_hours', 24)) — sempre reprocesse last N hours pra cobrir events atrasados. Não confie só em max(event_at).dbt run --full-refresh -s model_name recria tudo. Pra backfill por janela, use --vars '{start_date: 2026-01-01, end_date: 2026-01-31}' + lógica condicional no model.dbt run --vars '{lookback_hours: 48}' rodado 2x — assertion: row count idêntico.dbt source freshness + Elementary Data ou re_data pra dashboard de incremental health (last_run_rows_added, model_lag).Anti-patterns observados:
materialized='incremental' sem is_incremental() no SQL → silenciosamente roda full-refresh todo dia mascarado.unique_key sem unique constraint na warehouse → MERGE faz full scan; perf degrada com tabela.append com source que retry duplica → registros duplos descobertos 3 meses depois em audit financeiro.delete+insert em horário de pico → dashboards mostram 0 momentaneamente; suporte enche de tickets.Cruza com 04-13 §2.12 (CDC alimenta merge incremental), 04-13 §2.14 (data quality tests devem cobrir uniqueness e completeness pós-incremental), 03-13 §2.15 (decisão lakehouse vs warehouse afeta strategies disponíveis), 04-13 §2.16 (exactly-once semantics na ingestão é pré-requisito pra append).
Cenário concreto: CDC do Postgres OLTP (02-09 §2.13.1) escreve raw events em S3 (formato Parquet, table format Iceberg). dbt transforma raw → staging → marts em ClickHouse. Stack 2026-canônica.
Estrutura de projeto:
logistics-analytics/
├── dbt_project.yml
├── models/
│ ├── sources.yml # raw tables em Iceberg via ClickHouse external table
│ ├── staging/
│ │ ├── stg_orders.sql
│ │ ├── stg_payments.sql
│ │ └── stg_tracking_pings.sql
│ ├── intermediate/
│ │ └── int_order_with_payments.sql
│ └── marts/
│ ├── fct_daily_revenue.sql
│ ├── fct_courier_utilization.sql
│ └── dim_lojista.sql
├── tests/
│ └── assert_revenue_positive.sql
├── snapshots/
│ └── snap_lojista_tier.sql
└── seeds/
└── tier_pricing.csv
Sources com Iceberg:
# models/sources.yml
version: 2
sources:
- name: raw_logistics
schema: iceberg.lakehouse.logistics
description: "CDC events from Postgres OLTP via Debezium → S3 Iceberg"
tables:
- name: orders_cdc
identifier: orders_cdc
description: "Append-only stream of order changes"
loaded_at_field: cdc_ts
freshness:
warn_after: { count: 30, period: minute }
error_after: { count: 2, period: hour }
- name: payments_cdc
identifier: payments_cdc
- name: tracking_pings
identifier: tracking_pings
meta:
retention_days: 90 # custom meta pra lifecycle policy
Modelo staging (parse + dedup CDC):
-- models/staging/stg_orders.sql
{{ config(
materialized='incremental',
unique_key='order_id',
on_schema_change='append_new_columns',
incremental_strategy='delete+insert'
) }}
WITH ranked AS (
SELECT
after.id AS order_id,
after.tenant_id AS tenant_id,
after.status AS status,
after.total::numeric(12,2) AS total,
after.created_at AS created_at,
cdc_ts AS updated_at,
op,
ROW_NUMBER() OVER (PARTITION BY after.id ORDER BY cdc_ts DESC) AS rn
FROM {{ source('raw_logistics', 'orders_cdc') }}
{% if is_incremental() %}
WHERE cdc_ts > (SELECT MAX(updated_at) FROM {{ this }})
{% endif %}
)
SELECT order_id, tenant_id, status, total, created_at, updated_at
FROM ranked
WHERE rn = 1 AND op != 'd' -- mantém só última versão; descarta deletes do mart
Mart com facto agregado:
-- models/marts/fct_daily_revenue.sql
{{ config(
materialized='table',
engine='SummingMergeTree(revenue)',
order_by='(tenant_id, day)',
partition_by='toYYYYMM(day)'
) }}
SELECT
date_trunc('day', o.created_at)::date AS day,
o.tenant_id,
COUNT(*) AS orders,
SUM(o.total) AS revenue,
AVG(o.total) AS avg_ticket
FROM {{ ref('stg_orders') }} o
WHERE o.status IN ('delivered', 'in_transit')
GROUP BY 1, 2
Tests + freshness:
# models/marts/schema.yml
version: 2
models:
- name: fct_daily_revenue
description: "Daily revenue per tenant for finance dashboards"
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns: [day, tenant_id]
columns:
- name: tenant_id
tests: [not_null]
- name: revenue
tests:
- not_null
- dbt_utils.expression_is_true:
expression: ">= 0"
Snapshot pra dimensão SCD Type 2:
-- snapshots/snap_lojista_tier.sql
{% snapshot snap_lojista_tier %}
{{ config(target_schema='snapshots', unique_key='tenant_id',
strategy='check', check_cols=['tier', 'plan']) }}
SELECT tenant_id, tier, plan, updated_at FROM {{ source('raw_logistics', 'tenants') }}
{% endsnapshot %}
SELECT ... FROM table FOR VERSION AS OF '<snapshot_id>'. Útil pra auditoria + replay.OPTIMIZE table periodicamente (ClickHouse) ou Iceberg rewrite_data_files action.dbt jobs em Dagster (recomendado 2026 sobre Airflow pra novos projetos): cada model é asset; dependências automáticas; freshness checks built-in.
# dagster definitions
from dagster_dbt import DbtCliResource, dbt_assets
@dbt_assets(manifest='target/manifest.json')
def logistics_dbt_assets(context, dbt: DbtCliResource):
yield from dbt.cli(['build'], context=context).stream()
Schedule: hourly pra staging, daily pra marts. Failure alert pra Slack via Dagster sensors.
Cruza com 02-09 §2.13.1 (CDC fonte do pipeline), 03-13 §2.15 (decision tree de OLAP onde dbt + Iceberg cabe), 02-09 §2.13 (Materialize alternativa real-time pra alguns marts).
Pipelines têm dependências, schedules, retries, alerts.
Cada DAG: tasks, dependencies, schedule, retry policy, sensors (espera condição).
Lakehouse = data lake (Parquet em 04-03) + metadados ACID. Tabelas com snapshots, time travel, schema evolution, ACID concurrent writes.
Operações: MERGE INTO, UPDATE, DELETE em data lake. Time travel: query estado de 30 dias atrás.
Compaction periódica funde arquivos pequenos. Vacuum remove versões antigas.
Pegar mudanças do OLTP em real-time:
updated_at. Não captura deletes.Pipeline: Postgres → Debezium → Kafka topic per table → consumers (data warehouse, search index, cache invalidation).
CDC é cola entre OLTP e analytics em arquiteturas event-driven (04-03).
Schema breakages em produção custam horas. Discipline schema reviews.
Data sem qualidade leva a decisões erradas. Dashboards com NaN minam confidence.
Stream-stream join: dois streams em window (last 5min de orders × last 5min de payments). Stream-table join: enriquece stream com lookup (events × user dimension). Temporal join: junta com state válido at event time (versioned dimension).
Implementation cuida de state (stream join precisa state ambos lados durante window).
Um evento processado só uma vez. Impossível em puro distribuído sem cooperação. Soluções:
Maioria production aceita "at-least-once + idempotent".
Stream sempre rodando custa. Otimizações:
BigQuery cobra por bytes scanned; particionamento e clustering são economicamente mandatory.
Status 2026. Apache Iceberg dominou enterprise adoption 2024-2026: Snowflake, Databricks, AWS, Google, Microsoft, Cloudflare R2 todos suportam nativamente. Delta Lake mantém dominância em Databricks ecosystems; OSS Delta UniForm (2024+) entrega compatibility com Iceberg readers. Apache Hudi mantém niche em CDC-heavy workloads + indexing nativo. AWS S3 Tables (re:Invent 2024) entrega Iceberg-native storage AWS-managed, ~3x faster query vs raw S3. Cloudflare R2 Iceberg (2025+): zero egress fees + Iceberg native = compelling pra multi-cloud.
Decision matrix:
| Feature | Iceberg | Delta Lake | Hudi |
|---|---|---|---|
| Engine support | Spark/Flink/Trino/DuckDB/Snowflake/Databricks/Athena/BigQuery/ClickHouse | Spark/Databricks/Trino/DuckDB | Spark/Flink/Presto |
| Schema evolution | Robust (column add/drop/rename/reorder) | Robust (Delta 3+) | Limited (rewrite) |
| Time travel | Snapshot-based + tag/branch | Version-based | Commit-based |
| Branching | Tag + branch (Iceberg 1.4+) | Limited (Delta 4+) | No |
| Streaming | Flink-friendly | Spark Structured Streaming native | Best CDC |
| Compaction | Manual (write_audit_publish) ou auto-managed | Auto-optimize (Databricks) | Built-in |
| OSS health | Apache Foundation, vendor-neutral | Linux Foundation, Databricks-driven | Apache, Uber-driven |
Iceberg architecture deep. Catalog (metadata): REST catalog (recommended 2026), Glue, Hive Metastore, Nessie (git-like). Metadata file (JSON): tracks schema, partitioning, snapshots. Manifest list: list of manifest files per snapshot. Manifest file: list of data files (Parquet) com partition stats. Data files (Parquet/ORC/Avro): actual data. Pattern: writer atomic commit by writing new metadata pointing ao new manifest list; readers see consistent snapshot.
CDC pipeline (Postgres → Iceberg). Stack 2026: Postgres logical replication → Debezium → Kafka → Flink/Spark Structured Streaming → Iceberg table. Alternativa: PeerDB (Postgres → Iceberg/ClickHouse direct, sem Kafka), Estuary Flow (managed CDC). MERGE INTO em Iceberg pra upserts:
MERGE INTO orders_iceberg t
USING orders_cdc_stream s
ON t.order_id = s.order_id
WHEN MATCHED AND s.op = 'd' THEN DELETE
WHEN MATCHED AND s.op = 'u' THEN UPDATE SET * = s.*
WHEN NOT MATCHED AND s.op IN ('c', 'r') THEN INSERT *;
Latency real: Debezium → Iceberg via Flink minibatch 1-5min; via Spark microbatch 30s-2min; PeerDB direct 10-30s.
Time travel + branching (Iceberg 1.4+). Snapshot-based time travel:
SELECT * FROM orders FOR TIMESTAMP AS OF '2026-05-01 10:00:00';
SELECT * FROM orders FOR VERSION AS OF 12345;
Tags: stable references pra snapshots (audit/compliance):
ALTER TABLE orders CREATE TAG `release-v1.5` AS OF VERSION 12345;
Branches: experimental writes sem affecting main:
ALTER TABLE orders CREATE BRANCH `experiment` AS OF VERSION 12345;
-- writes em main_branch são isolated;
ALTER TABLE orders FAST FORWARD `main` `experiment`; -- merge
Use case Logística: backfill historical tracking data em branch; validate row counts + schema; FAST FORWARD para main atomically.
WAP (Write-Audit-Publish) pattern. Write to staging branch. Audit queries verify quality (count, schema, business rules: ex. assert sum(amount) > 0). Publish via FAST FORWARD branch to main. Stack: Apache Airflow + Iceberg branches + dbt-iceberg adapter.
Schema evolution patterns. Add column: instant em Iceberg/Delta; readers see NULL pra old data. Rename column: Iceberg supports nativamente (column ID-based); Delta 3+ via columnMapping. Drop column: soft drop em Iceberg (column hidden but data preserved); hard drop requires file rewrite. Reorder columns: trivial em Iceberg/Delta (logical names independent of file order). Type promotion: int → bigint OK; bigint → int requires rewrite.
Compaction + maintenance. Streaming writes geram 1000s of small files; query performance degrada. Iceberg procedures:
CALL system.rewrite_data_files('db.orders');
CALL system.expire_snapshots('db.orders', TIMESTAMP '2026-04-01');
CALL system.remove_orphan_files('db.orders');
Schedule (cron via Airflow/Dagster): compaction nightly; expiration weekly; orphan cleanup monthly.
Logística applied stack. Source: Postgres OLTP (Orders, Couriers, Tracking). CDC: PeerDB → Iceberg em S3 (latency ~30s). Tables: orders_iceberg, couriers_iceberg, tracking_pings_iceberg (high volume, partitioned by date). Catalog: REST catalog self-hosted (Tabular before Databricks acquisition; ou Apache Polaris OSS 2024+). Query: Trino pra ad-hoc; ClickHouse pra dashboards (Iceberg engine 24.x+). Maintenance: Airflow nightly compaction + weekly expiration + monthly orphan cleanup. Cost real: 100GB Iceberg em S3 = $2.30/mo storage; query cost depende engine. Em Trino self-host ~$300/mo total stack.
Anti-patterns observados:
Cruza com: 02-09 (Postgres, source para CDC); 02-12 (Mongo, alternative source); 03-13 (analytical DBs, query layer); 04-02 (messaging, Kafka pra CDC); 04-09 (scaling, lakehouse storage cost economics); 03-05 (AWS, S3 Tables managed).
Status 2026. Flink 1.20 (LTS, lançado 08/2024) consolidou-se como runtime padrão pra low-latency stateful streaming + CEP em workloads onde Kafka Streams não escala (cross-key state, complex joins) e Spark Structured Streaming sofre com micro-batch latency. Flink Kubernetes Operator 1.10+ entrega lifecycle declarativo (CRDs FlinkDeployment, FlinkSessionJob); savepoints automáticos antes de upgrade. Stack maduro: Flink 1.20 + Kafka 3.7+ + RocksDB state backend + S3 checkpoint storage.
Flink vs Kafka Streams vs Spark Structured Streaming.
| Dimensão | Flink 1.20 | Kafka Streams 3.7 | Spark Struct. Streaming 3.5 |
|---|---|---|---|
| Modelo | Dedicated streaming runtime | Library embedded em app JVM | Micro-batch sobre Spark engine |
| Latency | Sub-100ms (sub-ms tunável) | 50-500ms | 100ms-2s (continuous mode experimental) |
| State | ValueState/ListState/MapState; RocksDB; petabyte-scale | RocksDB local; bounded by app heap | Stateful ops via state store (HDFSBackedStateStore/RocksDB) |
| CEP | Native CEP library | Manual (no library) | Manual |
| Languages | Java/Scala/Python (PyFlink) | Java/Scala only | Polyglot (Scala/Java/Python/R/SQL) |
| Sources/sinks | Kafka, Kinesis, Pulsar, JDBC, Iceberg, S3 | Kafka-only | Kafka, Kinesis, files, JDBC, Delta/Iceberg |
| Decision | Low-latency + complex state + CEP | Kafka-only + simplicity (cobre 04-02 §2.19) | Batch+stream unified + ML pipelines |
Flink core concepts 2026. DataStream API: typed stream operators (Java/Scala/Python). Table API + SQL: declarative; planner converts em DataStream. State: ValueState (single value per key), ListState (append-only list), MapState (keyed map); todos keyed; backed by RocksDB. Watermarks: track event-time progress; trigger window closures. Checkpoints: periodic state snapshots to S3/HDFS; recovery on failure (interval típico 30-60s). Savepoints: manual checkpoints pra upgrades; preservam state across job versions.
Watermark fundamentals. Event time = quando evento ocorreu (no device/source); processing time = quando sistema vê. Watermark W(t) = assertion "todos eventos com event_time < t já foram observados". Late events = chegam após watermark passar; dropped ou roteados pra side output. Allowed lateness: window permanece aberto após watermark por grace period. Logística: courier ping event_time from device; watermark = max(event_time) - 30s; tolera 30s de GPS lag em túneis/áreas sem cobertura.
Watermark generation:
DataStream<TrackingPing> pings = env.fromSource(
KafkaSource.<TrackingPing>builder()
.setBootstrapServers("kafka:9092")
.setTopics("tracking.pings")
.setGroupId("flink-tracking")
.setStartingOffsets(OffsetsInitializer.committedOffsets())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(TrackingPingDeserializer.class))
.build(),
WatermarkStrategy.<TrackingPing>forBoundedOutOfOrderness(Duration.ofSeconds(30))
.withTimestampAssigner((ping, ts) -> ping.eventTimeMillis())
.withIdleness(Duration.ofMinutes(1)),
"Tracking Pings"
);
forBoundedOutOfOrderness: most common; tolera late events até N segundos. forMonotonousTimestamps: assume strict ordering; faster mas error-prone em streams reais. withIdleness: marca partition idle quando sem dados, evita stalled watermark global.
Stateful operator com keyed state + event-time timer:
public class CourierIdleDetector extends KeyedProcessFunction<String, TrackingPing, IdleAlert> {
private transient ValueState<Long> lastPingTime;
@Override
public void open(Configuration parameters) {
lastPingTime = getRuntimeContext().getState(
new ValueStateDescriptor<>("lastPingTime", Long.class)
);
}
@Override
public void processElement(TrackingPing ping, Context ctx, Collector<IdleAlert> out) throws Exception {
lastPingTime.update(ping.eventTimeMillis());
ctx.timerService().registerEventTimeTimer(ping.eventTimeMillis() + 10 * 60 * 1000L);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<IdleAlert> out) throws Exception {
Long last = lastPingTime.value();
if (last != null && timestamp - last >= 10 * 60 * 1000L) {
out.collect(new IdleAlert(ctx.getCurrentKey(), timestamp));
}
}
}
pings
.keyBy(p -> p.courierId)
.process(new CourierIdleDetector()).uid("courier-idle-detector")
.sinkTo(alertsSink);
CEP (Complex Event Processing). Use case: detectar sequências (fraud, multi-step user journey, anomalies). Logística — fraud detection: 3 cancellations dentro de 5min:
Pattern<OrderEvent, ?> fraudPattern = Pattern.<OrderEvent>begin("first")
.where(SimpleCondition.of(e -> e.type.equals("OrderCancelled")))
.followedBy("second")
.where(SimpleCondition.of(e -> e.type.equals("OrderCancelled")))
.followedBy("third")
.where(SimpleCondition.of(e -> e.type.equals("OrderCancelled")))
.within(Time.minutes(5));
PatternStream<OrderEvent> patternStream = CEP.pattern(
events.keyBy(e -> e.tenantId),
fraudPattern
);
DataStream<FraudAlert> alerts = patternStream.select(matches -> {
OrderEvent first = matches.get("first").get(0);
return new FraudAlert(first.tenantId, first.eventTime, "3 cancellations em 5 min");
});
keyBy antes de CEP.pattern é mandatório; sem isso, matches cruzam tenants (alerts sem sentido).
Exactly-once (Flink + Kafka via 2PC). Two-phase commit: Flink JobManager coordena com Kafka producer transacional. Pre-commit: producer escreve batch mas não commita. Commit: após checkpoint barrier completar, Flink chama producer commit. Recovery: from last checkpoint; uncommitted batch é replayed. Config crítico: transaction.timeout.ms no producer < transaction.max.timeout.ms no broker (default 15min); checkpoint interval < transaction timeout. Sink config:
KafkaSink<AlertEvent> sink = KafkaSink.<AlertEvent>builder()
.setBootstrapServers("kafka:9092")
.setRecordSerializer(...)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("flink-alerts-")
.setProperty("transaction.timeout.ms", "600000")
.build();
Savepoints — upgrade preservando state:
# Trigger savepoint (job continua rodando)
flink savepoint <jobId> s3://logistica-savepoints/sp-2026-05-06
# Cancel job
flink cancel <jobId>
# Deploy nova versão, restore from savepoint
flink run -s s3://logistica-savepoints/sp-2026-05-06 \
-c com.logistica.flink.OrdersJob \
orders-job-v2.jar
Operator UID é mandatório pra state migration: myOperator.uid("courier-idle-detector"). Sem UID, Flink gera hash from operator chain; qualquer mudança no DAG quebra restore. Schema evolution: ValueState com Avro/JSON Schema-typed state sobrevive schema add (defaults aplicados); rename/drop requer custom migration.
Logística applied stack. Sources: Kafka topics (orders.events, tracking.pings, payments.events). Flink jobs: CourierIdleDetector (alert se courier offline > 10min); FraudPatternDetector (CEP 3-cancellation pattern); OrderJoinEnrichment (stream-stream join order + courier profile, windowed 1h); RealtimeMetrics (windowed counts → ClickHouse sink). State backend: RocksDB; checkpoints to S3 a cada 60s; savepoints nightly + antes de cada deploy. Cluster: 3-node TaskManager (4 vCPU, 16GB cada) ~$300/mês em K8s; Flink Kubernetes Operator 1.10+ gerencia lifecycle (HA via K8s ConfigMap, sem ZooKeeper).
Anti-patterns observados:
forMonotonousTimestamps em real-world stream (bursts causam infinite waits; use forBoundedOutOfOrderness)..update(null) cleanup quando key terminal (state cresce unbounded; use TTL ou explicit clear)..uid("nome-estavel")).within sem keyBy (matches cruzam keys; alerts nonsense).forBoundedOutOfOrderness(Duration.ofMinutes(5)) arbitrário (mede actual lateness P99 antes; tune accordingly).Cruza com: 04-02 §2.19 (Kafka Streams alternative); §2.18 acima (lakehouse Iceberg sink); 03-13 (analytical DBs como sink, ClickHouse); 04-09 (parallelism, Kafka partition alignment); 04-04 (resilience, exactly-once via 2PC).
§2.18 cobriu o trade-off entre Iceberg, Delta e Hudi. Em 2024-2026 a disputa terminou: Iceberg ganhou tração como table format dominante (Snowflake adotou nativo, Databricks comprou Tabular por US$2B em 2024 e abriu Delta UniForm para Iceberg, AWS lançou Glue Iceberg REST endpoint em GA, Polaris graduou como TLP no Apache em early 2026). A fronteira agora é o catalog layer — quem federa namespaces, controla credenciais e expõe REST API padronizada. Format estável; engines (Spark, Trino, DuckDB, ClickHouse, Flink, StarRocks) consomem pelo mesmo spec; catalog é onde Polaris, Unity, Lakekeeper, Nessie e Glue brigam. Engineering de dados em 2026 é desenhar partition specs corretos, configurar compaction policy, entender hidden partitioning e escolher catalog sem lock-in.
Architecture (4 camadas):
catalog (REST/Hive/Glue) → metadata.json (current snapshot pointer, schema, partition spec)
↓
manifest list (snapshot manifest = lista de manifests)
↓
manifest files (lista de data files + stats min/max por column)
↓
data files (Parquet/ORC/Avro)
Cada commit é atomic swap do metadata.json no catalog. Manifest list aponta para snapshot atual; snapshots antigos retidos para time travel. Stats em manifest enable file pruning sem abrir Parquet (planning rápido em milhares de files).
Hidden partitioning + transforms:
Iceberg aplica partition transform na ingest, mas query não precisa conhecer a partition column. Diferente de Hive (onde WHERE dt='2026-05-07' é mandatório), Iceberg deduz da timestamp original.
CREATE TABLE prod.orders (
order_id BIGINT,
tenant_id STRING,
ts TIMESTAMP,
amount DECIMAL(18,2),
status STRING
) USING iceberg
PARTITIONED BY (days(ts), bucket(16, tenant_id))
TBLPROPERTIES (
'write.target-file-size-bytes' = '536870912',
'write.parquet.compression-codec' = 'zstd',
'commit.retry.num-retries' = '4',
'history.expire.max-snapshot-age-ms' = '604800000'
);
-- Query usa ts diretamente; Iceberg aplica days(ts) e prune partitions
SELECT SUM(amount) FROM prod.orders
WHERE ts >= TIMESTAMP '2026-05-01' AND ts < TIMESTAMP '2026-05-07'
AND tenant_id = 'acme';
Transforms suportados: identity, bucket(N, col), truncate(W, col), year, month, day, hour, void. Mudança de partition spec é metadata-only — old data permanece na spec antiga, new data na nova; query planner lida com ambas.
Schema evolution (ID-based, safe):
Cada coluna tem field ID interno; nome é alias. ADD/DROP/RENAME/REORDER/PROMOTE (int → long, float → double, decimal precision up) são metadata-only. Nunca reescreve data.
ALTER TABLE prod.orders ADD COLUMN region STRING AFTER status;
ALTER TABLE prod.orders RENAME COLUMN status TO order_status; -- safe; field ID preservado
ALTER TABLE prod.orders ALTER COLUMN amount TYPE DECIMAL(20,2); -- promote OK
-- DROP + ADD com mesmo nome cria field ID novo → data antigo invisível (data loss lógico).
REST catalog ecosystem 2026:
| Catalog | Stewardship | Cred vending | Multi-engine | Federation | Notes 2026 |
|---|---|---|---|---|---|
| Polaris | Apache TLP (Snowflake) | Sim (vended) | Sim | Roadmap | Graduated TLP Q1 2026; spec-compliant |
| Unity Catalog | Linux Found (Databricks) | Sim | Sim (OSS GA 2024) | Sim | Governance + lineage built-in; pesado |
| Lakekeeper | OSS (Rust) | Sim | Sim | Sim | 0.5+ leve, alta perf; choice para self-host |
| Nessie | Dremio | Parcial | Sim | Sim | 0.95+; branching nativo Git-like |
| Glue Iceberg REST | AWS | Sim (IAM) | Sim | Não | GA 2024; lock-in AWS mas zero ops |
| HMS (legacy) | Apache Hive | Não | Limited | Não | Evitar greenfield; migrar para REST |
Polaris config (Spark REST endpoint):
{
"spark.sql.catalog.prod": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.prod.type": "rest",
"spark.sql.catalog.prod.uri": "https://polaris.internal.acme.com/api/catalog",
"spark.sql.catalog.prod.credential": "${POLARIS_CLIENT_ID}:${POLARIS_CLIENT_SECRET}",
"spark.sql.catalog.prod.warehouse": "s3://acme-lake/prod",
"spark.sql.catalog.prod.scope": "PRINCIPAL_ROLE:data_engineer",
"spark.sql.catalog.prod.header.X-Iceberg-Access-Delegation": "vended-credentials"
}
Vended credentials: catalog gera STS temporary creds escopadas ao path da tabela; engine não tem acesso direto ao bucket. Padrão 2026 para zero-trust em data lake.
Branching + tagging (WAP — Write-Audit-Publish):
CREATE BRANCH dev IN prod.orders;
INSERT INTO prod.orders.branch_dev SELECT * FROM staging.orders_today;
-- Audit em dev sem afetar consumers de main
SELECT COUNT(*), SUM(amount) FROM prod.orders.branch_dev WHERE ts >= current_date;
-- Validações passaram → fast-forward
CALL system.fast_forward('prod.orders', 'main', 'dev');
-- Tag imutável de release (compliance, rollback point)
CREATE TAG release_2026_05_07 IN prod.orders RETAIN 365 DAYS;
-- Rollback se algo quebrar downstream
CALL system.set_current_snapshot(table => 'prod.orders', ref => 'release_2026_05_06');
CI for data: pipeline DBT/Spark roda em branch, executa testes (row count delta, null ratio, distribution shift), só faz fast-forward se passar. Equivalente a PR + CI para data.
Time travel:
SELECT * FROM prod.orders TIMESTAMP AS OF '2026-05-06 14:00:00';
SELECT * FROM prod.orders VERSION AS OF 8127364521;
-- Diff entre snapshots (debugging "o que mudou?")
SELECT * FROM prod.orders.changes
WHERE snapshot_id BETWEEN 8127364521 AND 8127364999;
Retenção controlada por history.expire.max-snapshot-age-ms e min-snapshots-to-keep. Sem expiração → metadata bloat (planning lento, listing custoso em S3).
Compaction strategies:
Streaming sinks (Flink, Spark Structured Streaming) escrevem files pequenos (10-50MB) por micro-batch. Sem compaction, query latency degrada 5-10x e S3 LIST é gargalo.
-- Rewrite data files: target 512MB, sort por columns frequentes em filter
CALL system.rewrite_data_files(
table => 'prod.orders',
strategy => 'sort',
sort_order => 'ts ASC, tenant_id ASC',
options => map(
'target-file-size-bytes', '536870912',
'min-input-files', '5',
'max-concurrent-file-group-rewrites', '8',
'partial-progress.enabled', 'true'
)
);
-- Rewrite manifests (consolida manifest files após muitos commits)
CALL system.rewrite_manifests('prod.orders');
-- Expire snapshots antigos (libera storage)
CALL system.expire_snapshots(
table => 'prod.orders',
older_than => TIMESTAMP '2026-04-30 00:00:00',
retain_last => 10
);
-- Remove orphan files (data files não referenciados por nenhum snapshot)
CALL system.remove_orphan_files(
table => 'prod.orders',
older_than => TIMESTAMP '2026-04-23 00:00:00' -- buffer 7d para in-flight commits
);
Sort durante rewrite é equivalente funcional ao Z-order do Delta (data clustering por múltiplas columns; melhora pruning em queries multi-dimensionais). Schedule diário em jobs idempotentes; rodar fora de janela de write-heavy se possível.
v2 vs v3 spec preview:
v2 (stable): row-level deletes via position deletes (file + row offset) e equality deletes (predicate). MERGE/UPDATE/DELETE eficientes. v3 (em flight Q2 2026): variant types (semi-structured nativo, sem JSON cast), deletion vectors (bitmap por file, mais compacto que position deletes), default values em ADD COLUMN, geometry types. Adoção engine-side: Spark e Trino lideram; Flink e DuckDB seguindo.
Stack Logística:
s3://logistica-lake/prod/ warehouse, Polaris REST catalog em ECS Fargate (2 instâncias), Postgres metastore para Polaris.orders, events, shipments partitioned by days(ts), bucket(32, tenant_id). File size target 512MB, zstd.IcebergSink exactly-once (cruza com §2.16); commit interval 60s (~10MB files, OK pré-compaction).rewrite_data_files com sort (ts, tenant_id) em tabelas hot, weekly em cold.03-13, dashboard ops). Vended creds; nenhum engine tem IAM direto ao bucket.dev, dbt tests, fast-forward gated por PR review.10 anti-patterns:
expire_snapshots.commit.retry.num-retries baixo (default 4) em high-write tables → conflict failures cascateiam para pipelines. Subir para 8-12 + jitter.WHERE day(ts) = '...' em vez de WHERE ts >= ...) → Iceberg não consegue aplicar transform reverso, perde pruning.bucket() (ex: PARTITIONED BY (user_id) com milhões de users) → milhões de partitions, metadata explode. Use bucket(N, user_id).remove_orphan_files sem buffer de tempo (older_than < retention de write in-flight) → apaga files de commits em progresso, corrupção. Buffer mínimo 3-7 dias.sort_by ts mas queries filtram por tenant_id) → pruning inefetivo, custo de rewrite sem benefício. Análise de query patterns antes.Cruza com: §2.18 acima (lakehouse intro, comparativo Iceberg/Delta/Hudi); §2.11 (lakehouse compare); §2.12 (CDC source para Iceberg sink); §2.16 (exactly-once em sinks streaming → Iceberg commit); 03-13 (analytical DBs ClickHouse/DuckDB lendo Iceberg direto); 04-09 (partition design, bucket cardinality, parallelism em compaction); 03-05 (S3 storage costs, lifecycle policies para snapshots expirados).
Você precisa, sem consultar:
Adicionar pipeline de processamento full à Logística: streaming real-time + batch nightly + lakehouse + dashboards.
orders, events, pings).orders e users Postgres → Kafka.events_history particionada por dia.fact_deliveries, dim_courier, dim_tenant, revenue_daily).revenue_daily valida tests.SELECT ... TIMESTAMP AS OF '...').Threshold de Maestria
Acerte todas as 5 pra marcar o módulo como concluído. Sem pressa, sem timer. Tudo fica salvo no teu navegador.
Q1Qual é a diferença essencial entre arquiteturas Lambda e Kappa para data processing?
Q2Por que watermark é necessário em streaming com event-time semantics?
Q3Em dbt incremental model, por que 'append' strategy é traiçoeira?
Q4Qual é o trade-off de incremental_strategy='delete+insert' em produção?
Q5Quando vale streaming SQL incremental (Materialize/RisingWave) em vez de Flink?
Destrava
04-13 é prereq dos seguintes módulos: