Почему просто kafka.send() после repo.save() — это бомба замедленного действия
Каждый раз, когда я смотрю на чужой код Spring Boot сервиса, который пишет в БД и потом шлёт событие в Kafka, я ставлю мысленный счётчик: «сколько событий уже потерял этот сервис?»
Типичный код, который выглядит «как должно»:
@Transactional
public void createOrder(OrderRequest req) {
Order order = orderRepo.save(new Order(req));
kafkaTemplate.send("orders.created", order.getId(), order.toEvent());
}
Что здесь не так? Два сценария потери событий:
Сценарий 1 — БД зафиксировала, Kafka упала. save() уже коммитнулся, мы возвращаем 200 OK клиенту, заказ есть в orders. А kafkaTemplate.send бросил TimeoutException, потому что broker недоступен. Транзакция БД уже закрыта, откатить нельзя. Событие потеряно — склад не получит, ERP не получит, аналитика не получит.
Сценарий 2 — Kafka приняла, БД откатилась. save() уже бросил данные в connection pool, kafkaTemplate.send отработал (Kafka получила), потом на коммите БД сработал constraint violation или connection drop. Событие в Kafka есть, заказа в БД нет. На той стороне ERP видит «заказ #42», а у нас в БД его нет. Бизнес жалуется через неделю.
В банковских системах это превращается в потерянные платежи. На одном проекте мы нашли в DLQ 14 000 сообщений — реальные транзакции, накопленные за полгода, о которых никто не знал.
Хорошие новости: эта проблема решается. Решение называется Transactional Outbox — и оно проще, чем кажется.
Зачем нужен Outbox: гарантия at-least-once
Идея: вместо того, чтобы писать в БД и потом отдельно в Kafka, мы пишем только в БД — но в две таблицы за одну транзакцию: бизнес-сущность + outbox-запись с payload-ом события. Дальше отдельный процесс (relay) читает outbox и шлёт в Kafka.
Бизнес-логика
│
▼ @Transactional
┌──────────────────┐
│ orders │ INSERT order
│ outbox │ INSERT event_id, topic, payload
└──────────────────┘
│
▼ COMMIT (атомарно)
▼
┌──────────────────┐
│ Outbox Relay │ читает unpublished, шлёт в Kafka
│ (scheduled) │ помечает published_at
└──────────────────┘
│
▼
Kafka topic
Гарантии:
- БД-коммит атомарен — либо есть и заказ, и outbox-запись, либо ничего.
- Relay идемпотентен — может ретраить отправку в Kafka сколько угодно раз. Если упал между
send()иmarkPublished()— на следующей итерации перепошлёт. Это даёт at-least-once: событие гарантированно дойдёт, но возможны дубли. - Consumer на той стороне должен быть идемпотентным по
event_id. Это превращает at-least-once в эффективный exactly-once.
Реализация шаг за шагом
1. Schema outbox-таблицы
CREATE TABLE outbox (
id BIGSERIAL PRIMARY KEY,
aggregate_type VARCHAR(64) NOT NULL, -- 'Order', 'Payment', etc.
aggregate_id VARCHAR(64) NOT NULL, -- partition key для Kafka
event_id UUID NOT NULL, -- для idempotency на consumer
event_type VARCHAR(128) NOT NULL, -- 'order.created', 'payment.confirmed'
payload JSONB NOT NULL,
headers JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
published_at TIMESTAMPTZ
);
CREATE INDEX idx_outbox_unpublished
ON outbox (created_at)
WHERE published_at IS NULL;
Несколько нетривиальных моментов:
- Частичный индекс
WHERE published_at IS NULL— только по непрошедшим записям. Не раздувается по мере роста таблицы. aggregate_idстановится partition key в Kafka — гарантия ordering для одной сущности.event_idUUID — sender генерирует один раз, consumer использует для dedup. Это центральная штука для idempotency.payload JSONB— Postgres валидирует JSON; читать удобно; индексировать по полям внутри можно при необходимости.
2. JPA-entity
@Entity
@Table(name = "outbox")
class OutboxRecord(
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
val id: Long? = null,
@Column(name = "aggregate_type", nullable = false)
val aggregateType: String,
@Column(name = "aggregate_id", nullable = false)
val aggregateId: String,
@Column(name = "event_id", nullable = false, columnDefinition = "uuid")
val eventId: UUID = UUID.randomUUID(),
@Column(name = "event_type", nullable = false)
val eventType: String,
@Type(JsonBinaryType::class)
@Column(name = "payload", nullable = false, columnDefinition = "jsonb")
val payload: Map<String, Any?>,
@Column(name = "created_at", nullable = false)
val createdAt: Instant = Instant.now(),
@Column(name = "published_at")
var publishedAt: Instant? = null,
)
3. Бизнес-логика — пишет в outbox внутри @Transactional
@Service
class OrderService(
private val orderRepo: OrderRepository,
private val outboxRepo: OutboxRepository,
) {
@Transactional
fun createOrder(req: OrderRequest): Order {
val order = orderRepo.save(Order.from(req))
outboxRepo.save(
OutboxRecord(
aggregateType = "Order",
aggregateId = order.id.toString(),
eventType = "order.created",
payload = mapOf(
"orderId" to order.id,
"customerId" to order.customerId,
"amount" to order.amount,
"currency" to order.currency,
"createdAt" to order.createdAt,
),
)
)
return order
}
}
Главное — оба save-а внутри одной транзакции. Если хоть один упадёт — оба откатятся. Kafka здесь вообще не упомянута: мы ей не доверяем доставку синхронно.
4. Outbox Relay — отправка в Kafka
@Component
class OutboxRelay(
private val outboxRepo: OutboxRepository,
private val kafka: KafkaTemplate<String, ByteArray>,
private val objectMapper: ObjectMapper,
@Value("\${outbox.batch-size:100}")
private val batchSize: Int,
) {
private val log = LoggerFactory.getLogger(OutboxRelay::class.java)
@Scheduled(fixedDelayString = "\${outbox.poll-interval-ms:200}")
@Transactional
fun relay() {
val batch = outboxRepo.findUnpublishedBatch(batchSize)
if (batch.isEmpty()) return
for (record in batch) {
try {
val topic = topicFor(record.eventType)
val key = record.aggregateId
val payload = objectMapper.writeValueAsBytes(record.payload)
val headers = listOf(
RecordHeader("event-id", record.eventId.toString().toByteArray()),
RecordHeader("event-type", record.eventType.toByteArray()),
RecordHeader("created-at", record.createdAt.toString().toByteArray()),
)
val producerRecord = ProducerRecord(
topic, null, record.createdAt.toEpochMilli(),
key, payload, headers,
)
kafka.send(producerRecord).get(5, TimeUnit.SECONDS)
record.publishedAt = Instant.now()
outboxRepo.save(record)
} catch (e: Exception) {
log.error("Failed to publish outbox#${record.id}: ${e.message}")
// не выходим — пусть следующий заберёт
}
}
}
private fun topicFor(eventType: String): String =
eventType.substringBefore('.') + "s" // order.created -> orders
}
5. Repository с SELECT ... FOR UPDATE SKIP LOCKED
Если у вас несколько инстансов сервиса — несколько relay-ов будут пытаться читать одни и те же записи. Решение: SELECT FOR UPDATE SKIP LOCKED — Postgres скипает залоченные строки.
@Repository
interface OutboxRepository : JpaRepository<OutboxRecord, Long> {
@Query(
value = """
SELECT * FROM outbox
WHERE published_at IS NULL
ORDER BY created_at
LIMIT :limit
FOR UPDATE SKIP LOCKED
""",
nativeQuery = true,
)
fun findUnpublishedBatch(@Param("limit") limit: Int): List<OutboxRecord>
}
Без SKIP LOCKED второй relay будет ждать первого, пропускная способность не вырастет.
Идемпотентный consumer на принимающей стороне
Outbox даёт at-least-once. Чтобы это стало эффективным exactly-once — consumer должен помнить, какие event_id уже обработал:
@Component
class OrderEventHandler(
private val processedEventsRepo: ProcessedEventsRepository,
private val warehouseService: WarehouseService,
) {
@KafkaListener(topics = ["orders"])
@Transactional
fun handle(
record: ConsumerRecord<String, ByteArray>,
ack: Acknowledgment,
) {
val eventId = record.headers().lastHeader("event-id")
?.value()?.toString(Charsets.UTF_8)
?.let(UUID::fromString)
?: throw IllegalStateException("missing event-id header")
if (processedEventsRepo.existsByEventId(eventId)) {
// дубль — просто ack, не повторяем побочные действия
ack.acknowledge()
return
}
val event = objectMapper.readValue(record.value(), OrderCreatedEvent::class.java)
warehouseService.dispatch(event)
processedEventsRepo.save(ProcessedEvent(eventId = eventId, processedAt = Instant.now()))
ack.acknowledge()
}
}
processed_events — простая таблица (event_id UUID PRIMARY KEY, processed_at TIMESTAMPTZ). Размер ограничивается ретеншеном (можно чистить старше N дней).
💡 *Если ваш consumer записывает в БД —
processedEventsRepo.saveдолжен идти в той же транзакции, что и бизнес-write, иначе при падении посередине будете дублировать._
Когда мы это начали внедрять у клиентов
Самый частый кейс — payment processing. У одного банковского клиента до внедрения outbox:
| Метрика | До | После outbox |
|---|---|---|
| Lost events в DLQ за полгода | ~14 000 | 0 |
Дубль платежей из-за retry в kafkaTemplate.send | да, изредка | нет (consumer dedup) |
| RTO при падении Kafka на 5 минут | каждое событие = потеря | 0 потерь (relay перезапустится после восстановления) |
| Throughput | ~30 msg/s sustained | 40 msg/s + |
После замены kafkaTemplate.send внутри @Transactional на outbox — incident-postmortem-ы про «потерянные сообщения» прекратились полностью.
Микро-CTA в середине
Если вы читаете и думаете «у нас наверняка часть событий теряется так же» — скорее всего, да. Это типовая проблема, мы её находим примерно на каждом втором аудите. Если хотите проверить свой backend без долгих переговоров — у нас есть Backend Health-Check за 3 дня, Senior-инженер найдёт топ-10 проблем и отдаст отчёт.
Ловушки, на которые мы наступили (и о них нигде не пишут)
Ловушка 1: ordering ломается при параллельном relay
Если у вас 5 инстансов сервиса и 5 relay-ов читают outbox через SKIP LOCKED — порядок отправки в Kafka не гарантирован даже для одной aggregate_id.
Решение: партиционируйте outbox-таблицу по aggregate_id (через hash modulo) + каждый инстанс работает только со своими партициями. Либо используйте Kafka partition key = aggregate_id — Kafka гарантирует ordering внутри одной партиции (но не между партициями).
// при отправке
val producerRecord = ProducerRecord(
topic,
partitionForAggregate(record.aggregateId), // явный partition
key, payload, headers,
)
Ловушка 2: retry storm после восстановления Kafka
Если Kafka была недоступна 30 минут, в outbox накопилось 100 000 сообщений. После восстановления relay начинает молотить с максимальной скоростью — забивает broker, ломает остальной трафик.
Решение: ограничьте throughput relay-а через rate limiter или просто настройте batchSize так, чтобы он не выгребал больше, чем broker может переварить.
outbox:
batch-size: 100
poll-interval-ms: 200 # 100 msg / 200 ms = 500 msg/s ceiling
Ловушка 3: outbox-таблица растёт бесконечно
Через год у нас было 50 GB в outbox — все опубликованные записи никто не удалял.
Решение: scheduled cleanup:
DELETE FROM outbox
WHERE published_at < NOW() - INTERVAL '7 days';
7 дней — компромисс между «безопасно отдебажить» и «не раздувать».
Ловушка 4: Spring Data JPA flush в неожиданный момент
Если у вас в @Transactional методе сначала outboxRepo.save(), потом RuntimeException — Hibernate в дефолте может flush-нуть outbox-запись до прихода исключения. Транзакция откатится, но если у вас другой connection видит uncommitted данные (read-uncommitted на стороне relay) — relay подхватит и отправит.
Решение 1: убедитесь, что у вас isolation level = READ_COMMITTED (дефолт в Postgres — но проверьте JPA конфиг).
Решение 2: ставьте outboxRepo.save() в самом конце метода, после всех проверок.
Ловушка 5: payload растёт и забивает БД
JSONB-payload в 2 MB на запись × 100 событий/сек = 200 MB/sec в БД. Постгрес такую нагрузку выдерживает, но репликация на slave может отстать.
Решение: в outbox кладите только { id, key, version }, а полный объект восстанавливайте в relay из бизнес-таблицы перед отправкой. Это «outbox с ленивой загрузкой».
Альтернативы и когда они уместны
Outbox — не единственный паттерн. Что ещё бывает:
| Подход | Когда |
|---|---|
| CDC (Debezium) | Если уже есть Debezium-инфраструктура и не хочется отдельной таблицы — Debezium читает WAL Postgres и стримит в Kafka сам. Минус: tight coupling БД и Kafka, нельзя гибко формировать payload |
| Listen/Notify Postgres | Для in-process нотификаций между инстансами одного сервиса. Не подходит для cross-service event-driven |
| Eventuate Tram | Open-source библиотека, реализует outbox + saga из коробки. Минус: меньше контроля, vendor lock на их API |
kafka.send() без транзакции | Только если потеря 0.01% событий допустима (метрики, telemetry, fire-and-forget) |
Для финтеха и e-commerce, где deadline на event delivery меряется в часах, а не миллисекундах, outbox — стандарт де-факто. Мы делаем его на каждом втором проекте.
Финальный чек-лист
Если вы реализуете outbox у себя — пройдитесь по нему:
- Outbox-таблица с частичным индексом
WHERE published_at IS NULL -
event_id UUIDпишется на стороне sender, читается на consumer для dedup - Relay использует
SELECT FOR UPDATE SKIP LOCKEDдля horizontal scale - Consumer хранит processed
event_idв той же транзакции, что бизнес-write - Cleanup published records через
DELETE WHERE published_at < NOW() - INTERVAL '7 days' - Метрика «глубина outbox» в Grafana (
COUNT(*) WHERE published_at IS NULL) с алертом - Партиционирование Kafka по
aggregate_idдля ordering гарантий - Rate limit на relay throughput (защита от retry storm)
Этого достаточно, чтобы выдержать продакшен на 50+ msg/s. Дальше — оптимизации по конкретным паттернам нагрузки.
Если у вас уже есть Spring Boot + Kafka в проде и не уверены в том, что события не теряются — это типичная боль, мы её диагностируем за 3 дня в рамках Backend Health-Check. Без давления продаж — отчёт остаётся у вас в любом случае.