Новаком
SPRING-BOOT

Transactional Outbox в Spring Boot + Kafka — production-туториал на банковских кейсах

Как реализовать паттерн Outbox для гарантированной доставки событий в Kafka из Spring Boot. Реальные грабли: ordering, idempotency, retry storms. Код, миграции, метрики из проектов на 40 msg/s.

Н
Новаком
2026-05-23 · 12 минут чтения

Почему просто kafka.send() после repo.save() — это бомба замедленного действия

Каждый раз, когда я смотрю на чужой код Spring Boot сервиса, который пишет в БД и потом шлёт событие в Kafka, я ставлю мысленный счётчик: «сколько событий уже потерял этот сервис?»

Типичный код, который выглядит «как должно»:

@Transactional
public void createOrder(OrderRequest req) {
    Order order = orderRepo.save(new Order(req));
    kafkaTemplate.send("orders.created", order.getId(), order.toEvent());
}

Что здесь не так? Два сценария потери событий:

Сценарий 1 — БД зафиксировала, Kafka упала. save() уже коммитнулся, мы возвращаем 200 OK клиенту, заказ есть в orders. А kafkaTemplate.send бросил TimeoutException, потому что broker недоступен. Транзакция БД уже закрыта, откатить нельзя. Событие потеряно — склад не получит, ERP не получит, аналитика не получит.

Сценарий 2 — Kafka приняла, БД откатилась. save() уже бросил данные в connection pool, kafkaTemplate.send отработал (Kafka получила), потом на коммите БД сработал constraint violation или connection drop. Событие в Kafka есть, заказа в БД нет. На той стороне ERP видит «заказ #42», а у нас в БД его нет. Бизнес жалуется через неделю.

В банковских системах это превращается в потерянные платежи. На одном проекте мы нашли в DLQ 14 000 сообщений — реальные транзакции, накопленные за полгода, о которых никто не знал.

Хорошие новости: эта проблема решается. Решение называется Transactional Outbox — и оно проще, чем кажется.


Зачем нужен Outbox: гарантия at-least-once

Идея: вместо того, чтобы писать в БД и потом отдельно в Kafka, мы пишем только в БД — но в две таблицы за одну транзакцию: бизнес-сущность + outbox-запись с payload-ом события. Дальше отдельный процесс (relay) читает outbox и шлёт в Kafka.

Бизнес-логика
   │
   ▼ @Transactional
┌──────────────────┐
│  orders          │ INSERT order
│  outbox          │ INSERT event_id, topic, payload
└──────────────────┘
   │
   ▼ COMMIT (атомарно)
   ▼
┌──────────────────┐
│  Outbox Relay    │ читает unpublished, шлёт в Kafka
│  (scheduled)     │ помечает published_at
└──────────────────┘
   │
   ▼
   Kafka topic

Гарантии:

  • БД-коммит атомарен — либо есть и заказ, и outbox-запись, либо ничего.
  • Relay идемпотентен — может ретраить отправку в Kafka сколько угодно раз. Если упал между send() и markPublished() — на следующей итерации перепошлёт. Это даёт at-least-once: событие гарантированно дойдёт, но возможны дубли.
  • Consumer на той стороне должен быть идемпотентным по event_id. Это превращает at-least-once в эффективный exactly-once.

Реализация шаг за шагом

1. Schema outbox-таблицы

CREATE TABLE outbox (
    id              BIGSERIAL PRIMARY KEY,
    aggregate_type  VARCHAR(64) NOT NULL,   -- 'Order', 'Payment', etc.
    aggregate_id    VARCHAR(64) NOT NULL,   -- partition key для Kafka
    event_id        UUID NOT NULL,           -- для idempotency на consumer
    event_type      VARCHAR(128) NOT NULL,  -- 'order.created', 'payment.confirmed'
    payload         JSONB NOT NULL,
    headers         JSONB,
    created_at      TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    published_at    TIMESTAMPTZ
);

CREATE INDEX idx_outbox_unpublished
    ON outbox (created_at)
    WHERE published_at IS NULL;

Несколько нетривиальных моментов:

  • Частичный индекс WHERE published_at IS NULL — только по непрошедшим записям. Не раздувается по мере роста таблицы.
  • aggregate_id становится partition key в Kafka — гарантия ordering для одной сущности.
  • event_id UUID — sender генерирует один раз, consumer использует для dedup. Это центральная штука для idempotency.
  • payload JSONB — Postgres валидирует JSON; читать удобно; индексировать по полям внутри можно при необходимости.

2. JPA-entity

@Entity
@Table(name = "outbox")
class OutboxRecord(
    @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
    val id: Long? = null,

    @Column(name = "aggregate_type", nullable = false)
    val aggregateType: String,

    @Column(name = "aggregate_id", nullable = false)
    val aggregateId: String,

    @Column(name = "event_id", nullable = false, columnDefinition = "uuid")
    val eventId: UUID = UUID.randomUUID(),

    @Column(name = "event_type", nullable = false)
    val eventType: String,

    @Type(JsonBinaryType::class)
    @Column(name = "payload", nullable = false, columnDefinition = "jsonb")
    val payload: Map<String, Any?>,

    @Column(name = "created_at", nullable = false)
    val createdAt: Instant = Instant.now(),

    @Column(name = "published_at")
    var publishedAt: Instant? = null,
)

3. Бизнес-логика — пишет в outbox внутри @Transactional

@Service
class OrderService(
    private val orderRepo: OrderRepository,
    private val outboxRepo: OutboxRepository,
) {
    @Transactional
    fun createOrder(req: OrderRequest): Order {
        val order = orderRepo.save(Order.from(req))

        outboxRepo.save(
            OutboxRecord(
                aggregateType = "Order",
                aggregateId = order.id.toString(),
                eventType = "order.created",
                payload = mapOf(
                    "orderId" to order.id,
                    "customerId" to order.customerId,
                    "amount" to order.amount,
                    "currency" to order.currency,
                    "createdAt" to order.createdAt,
                ),
            )
        )

        return order
    }
}

Главное — оба save-а внутри одной транзакции. Если хоть один упадёт — оба откатятся. Kafka здесь вообще не упомянута: мы ей не доверяем доставку синхронно.

4. Outbox Relay — отправка в Kafka

@Component
class OutboxRelay(
    private val outboxRepo: OutboxRepository,
    private val kafka: KafkaTemplate<String, ByteArray>,
    private val objectMapper: ObjectMapper,
    @Value("\${outbox.batch-size:100}")
    private val batchSize: Int,
) {
    private val log = LoggerFactory.getLogger(OutboxRelay::class.java)

    @Scheduled(fixedDelayString = "\${outbox.poll-interval-ms:200}")
    @Transactional
    fun relay() {
        val batch = outboxRepo.findUnpublishedBatch(batchSize)
        if (batch.isEmpty()) return

        for (record in batch) {
            try {
                val topic = topicFor(record.eventType)
                val key = record.aggregateId
                val payload = objectMapper.writeValueAsBytes(record.payload)

                val headers = listOf(
                    RecordHeader("event-id", record.eventId.toString().toByteArray()),
                    RecordHeader("event-type", record.eventType.toByteArray()),
                    RecordHeader("created-at", record.createdAt.toString().toByteArray()),
                )

                val producerRecord = ProducerRecord(
                    topic, null, record.createdAt.toEpochMilli(),
                    key, payload, headers,
                )

                kafka.send(producerRecord).get(5, TimeUnit.SECONDS)

                record.publishedAt = Instant.now()
                outboxRepo.save(record)
            } catch (e: Exception) {
                log.error("Failed to publish outbox#${record.id}: ${e.message}")
                // не выходим — пусть следующий заберёт
            }
        }
    }

    private fun topicFor(eventType: String): String =
        eventType.substringBefore('.') + "s"  // order.created -> orders
}

5. Repository с SELECT ... FOR UPDATE SKIP LOCKED

Если у вас несколько инстансов сервиса — несколько relay-ов будут пытаться читать одни и те же записи. Решение: SELECT FOR UPDATE SKIP LOCKED — Postgres скипает залоченные строки.

@Repository
interface OutboxRepository : JpaRepository<OutboxRecord, Long> {

    @Query(
        value = """
            SELECT * FROM outbox
            WHERE published_at IS NULL
            ORDER BY created_at
            LIMIT :limit
            FOR UPDATE SKIP LOCKED
        """,
        nativeQuery = true,
    )
    fun findUnpublishedBatch(@Param("limit") limit: Int): List<OutboxRecord>
}

Без SKIP LOCKED второй relay будет ждать первого, пропускная способность не вырастет.


Идемпотентный consumer на принимающей стороне

Outbox даёт at-least-once. Чтобы это стало эффективным exactly-once — consumer должен помнить, какие event_id уже обработал:

@Component
class OrderEventHandler(
    private val processedEventsRepo: ProcessedEventsRepository,
    private val warehouseService: WarehouseService,
) {
    @KafkaListener(topics = ["orders"])
    @Transactional
    fun handle(
        record: ConsumerRecord<String, ByteArray>,
        ack: Acknowledgment,
    ) {
        val eventId = record.headers().lastHeader("event-id")
            ?.value()?.toString(Charsets.UTF_8)
            ?.let(UUID::fromString)
            ?: throw IllegalStateException("missing event-id header")

        if (processedEventsRepo.existsByEventId(eventId)) {
            // дубль — просто ack, не повторяем побочные действия
            ack.acknowledge()
            return
        }

        val event = objectMapper.readValue(record.value(), OrderCreatedEvent::class.java)

        warehouseService.dispatch(event)
        processedEventsRepo.save(ProcessedEvent(eventId = eventId, processedAt = Instant.now()))

        ack.acknowledge()
    }
}

processed_events — простая таблица (event_id UUID PRIMARY KEY, processed_at TIMESTAMPTZ). Размер ограничивается ретеншеном (можно чистить старше N дней).

💡 *Если ваш consumer записывает в БД — processedEventsRepo.save должен идти в той же транзакции, что и бизнес-write, иначе при падении посередине будете дублировать._


Когда мы это начали внедрять у клиентов

Самый частый кейс — payment processing. У одного банковского клиента до внедрения outbox:

МетрикаДоПосле outbox
Lost events в DLQ за полгода~14 0000
Дубль платежей из-за retry в kafkaTemplate.sendда, изредканет (consumer dedup)
RTO при падении Kafka на 5 минуткаждое событие = потеря0 потерь (relay перезапустится после восстановления)
Throughput~30 msg/s sustained40 msg/s +

После замены kafkaTemplate.send внутри @Transactional на outbox — incident-postmortem-ы про «потерянные сообщения» прекратились полностью.


Микро-CTA в середине

Если вы читаете и думаете «у нас наверняка часть событий теряется так же» — скорее всего, да. Это типовая проблема, мы её находим примерно на каждом втором аудите. Если хотите проверить свой backend без долгих переговоров — у нас есть Backend Health-Check за 3 дня, Senior-инженер найдёт топ-10 проблем и отдаст отчёт.


Ловушки, на которые мы наступили (и о них нигде не пишут)

Ловушка 1: ordering ломается при параллельном relay

Если у вас 5 инстансов сервиса и 5 relay-ов читают outbox через SKIP LOCKED — порядок отправки в Kafka не гарантирован даже для одной aggregate_id.

Решение: партиционируйте outbox-таблицу по aggregate_id (через hash modulo) + каждый инстанс работает только со своими партициями. Либо используйте Kafka partition key = aggregate_id — Kafka гарантирует ordering внутри одной партиции (но не между партициями).

// при отправке
val producerRecord = ProducerRecord(
    topic,
    partitionForAggregate(record.aggregateId),  // явный partition
    key, payload, headers,
)

Ловушка 2: retry storm после восстановления Kafka

Если Kafka была недоступна 30 минут, в outbox накопилось 100 000 сообщений. После восстановления relay начинает молотить с максимальной скоростью — забивает broker, ломает остальной трафик.

Решение: ограничьте throughput relay-а через rate limiter или просто настройте batchSize так, чтобы он не выгребал больше, чем broker может переварить.

outbox:
  batch-size: 100
  poll-interval-ms: 200   # 100 msg / 200 ms = 500 msg/s ceiling

Ловушка 3: outbox-таблица растёт бесконечно

Через год у нас было 50 GB в outbox — все опубликованные записи никто не удалял.

Решение: scheduled cleanup:

DELETE FROM outbox
WHERE published_at < NOW() - INTERVAL '7 days';

7 дней — компромисс между «безопасно отдебажить» и «не раздувать».

Ловушка 4: Spring Data JPA flush в неожиданный момент

Если у вас в @Transactional методе сначала outboxRepo.save(), потом RuntimeException — Hibernate в дефолте может flush-нуть outbox-запись до прихода исключения. Транзакция откатится, но если у вас другой connection видит uncommitted данные (read-uncommitted на стороне relay) — relay подхватит и отправит.

Решение 1: убедитесь, что у вас isolation level = READ_COMMITTED (дефолт в Postgres — но проверьте JPA конфиг).

Решение 2: ставьте outboxRepo.save() в самом конце метода, после всех проверок.

Ловушка 5: payload растёт и забивает БД

JSONB-payload в 2 MB на запись × 100 событий/сек = 200 MB/sec в БД. Постгрес такую нагрузку выдерживает, но репликация на slave может отстать.

Решение: в outbox кладите только { id, key, version }, а полный объект восстанавливайте в relay из бизнес-таблицы перед отправкой. Это «outbox с ленивой загрузкой».


Альтернативы и когда они уместны

Outbox — не единственный паттерн. Что ещё бывает:

ПодходКогда
CDC (Debezium)Если уже есть Debezium-инфраструктура и не хочется отдельной таблицы — Debezium читает WAL Postgres и стримит в Kafka сам. Минус: tight coupling БД и Kafka, нельзя гибко формировать payload
Listen/Notify PostgresДля in-process нотификаций между инстансами одного сервиса. Не подходит для cross-service event-driven
Eventuate TramOpen-source библиотека, реализует outbox + saga из коробки. Минус: меньше контроля, vendor lock на их API
kafka.send() без транзакцииТолько если потеря 0.01% событий допустима (метрики, telemetry, fire-and-forget)

Для финтеха и e-commerce, где deadline на event delivery меряется в часах, а не миллисекундах, outbox — стандарт де-факто. Мы делаем его на каждом втором проекте.


Финальный чек-лист

Если вы реализуете outbox у себя — пройдитесь по нему:

  • Outbox-таблица с частичным индексом WHERE published_at IS NULL
  • event_id UUID пишется на стороне sender, читается на consumer для dedup
  • Relay использует SELECT FOR UPDATE SKIP LOCKED для horizontal scale
  • Consumer хранит processed event_id в той же транзакции, что бизнес-write
  • Cleanup published records через DELETE WHERE published_at < NOW() - INTERVAL '7 days'
  • Метрика «глубина outbox» в Grafana (COUNT(*) WHERE published_at IS NULL) с алертом
  • Партиционирование Kafka по aggregate_id для ordering гарантий
  • Rate limit на relay throughput (защита от retry storm)

Этого достаточно, чтобы выдержать продакшен на 50+ msg/s. Дальше — оптимизации по конкретным паттернам нагрузки.


Если у вас уже есть Spring Boot + Kafka в проде и не уверены в том, что события не теряются — это типичная боль, мы её диагностируем за 3 дня в рамках Backend Health-Check. Без давления продаж — отчёт остаётся у вас в любом случае.

РАЗРАБОТКА

Нужна похожая задача?

Обсудим вашу задачу и предложим решение за 30 минут.

Обсудить проект