09 outbox event
Transactional outbox для двухфазных TigerBeetle-операций: каждая строка — отложенный post_pending или void_pending для конкретного интента.
Имя таблицы¶
pm.outbox_event
Назначение¶
Очередь команд, которые OutboxWorker дозавершает асинхронно после того, как HTTP-handler уже ответил клиенту. Используется только для двухфазных каналов TigerBeetle (AUTHORIZED → SETTLED|FAILED):
post_pending— подтвердить pending-transfer (закоммитить деньги получателю; для IPPS дополнительно создатьpsp_feetransfer);void_pending— отменить pending-transfer (разморозить средства у плательщика; используется при отказе PSP).
Запись добавляется внутри той же транзакции, что и AUTHORIZED-update, поэтому либо и pending-transfer в TB существует, и outbox-строка создана, либо ничего не произошло — это и есть transactional outbox pattern. Без таблицы пришлось бы либо синхронно ждать TB во время HTTP-ответа, либо ловить «деньги заморожены, но завершить не удалось» при крахе процесса.
- Пишут: только Payment Manager:
- каналы из
src/channels/иIPPS confirm-обработчик (src/psp/) — INSERT при переходеAUTHORIZED → settle/void; src/intent/outbox-worker.ts— UPDATEstatusиretry_count.- Читают: только сам
OutboxWorker. Внешние сервисы и Admin Panel в эту таблицу не лезут — её состояние видно черезpm.intent(status,tb_transfer_ids).
DDL¶
DDL из миграции 0000_init.sql — таблица создаётся сразу в финальном виде, последующие миграции её не трогают.
CREATE TABLE "pm"."outbox_event" (
"id" serial PRIMARY KEY NOT NULL,
"intent_id" uuid NOT NULL,
"action" varchar(20) NOT NULL,
"status" varchar(20) DEFAULT 'pending' NOT NULL,
"payload" jsonb,
"retry_count" integer DEFAULT 0 NOT NULL,
"created_at" timestamp with time zone DEFAULT now() NOT NULL,
"processed_at" timestamp with time zone,
CONSTRAINT "outbox_event_action_chk" CHECK ("pm"."outbox_event"."action" IN ('post_pending', 'void_pending')),
CONSTRAINT "outbox_event_status_chk" CHECK ("pm"."outbox_event"."status" IN ('pending', 'processed', 'failed'))
);
CREATE INDEX "outbox_event_status_created_at_idx" ON "pm"."outbox_event" USING btree ("status","created_at");
CREATE INDEX "outbox_event_intent_id_idx" ON "pm"."outbox_event" USING btree ("intent_id");
Важно:
actionиstatusреализованы какVARCHAR(20)+ CHECK-constraints, не PG enum. Это упрощает миграции (добавить новое значение —ALTER ... DROP CONSTRAINT; ADD CONSTRAINT) и согласуется с подходом по другим таблицам PM (см. 01-schema-overview.md).
Поля¶
| Поле | Тип | NULL | Default | Описание |
|---|---|---|---|---|
id |
serial |
no | автогенерация | PK; внутренний порядковый номер события. Используется для UPDATE ... WHERE id=$1. |
intent_id |
uuid |
no | — | Логическая ссылка на pm.intent.id (без FK; целостность гарантируется кодом). По нему OutboxWorker подгружает строку intent для построения TB-transfers. |
action |
varchar(20) |
no | — | Команда: post_pending или void_pending. Тип OutboxAction — см. src/shared/schema.ts строка 227. CHECK-constraint outbox_event_action_chk запрещает прочие значения. |
status |
varchar(20) |
no | 'pending' |
Жизненный цикл события: pending → processed (успех) либо failed (зарезервировано, см. ниже). Тип OutboxStatus — src/shared/schema.ts строка 228. |
payload |
jsonb |
yes | — | Доп. контекст для воркера. Сейчас используется только под feeSplits: ResolvedFeeSplit[] — pre-resolved fee-split, который OutboxWorker передаёт в writeSettlement() для генерации tx_history. Расширяемое поле — можно дописать что-то новое без миграции. |
retry_count |
integer |
no | 0 |
Число неуспешных попыток. Инкрементируется воркером (а) при fatal-ошибках TB (createTransfers вернул не-created/не-pending_*_already_*), (б) при отсутствии fee-аккаунтов в tb_account_map, (в) при необработанном исключении в основном цикле. Само по себе не ограничивает повторы — это просто счётчик; политика alerts/manual review строится поверх него (см. WORKERS). |
created_at |
timestamptz |
no | now() |
Момент INSERT (внутри транзакции AUTHORIZED-update). Также участвует в индексе outbox_event_status_created_at_idx и в ORDER BY воркера (FIFO). |
processed_at |
timestamptz |
yes | — | Заполняется при переходе в status='processed' (внутри той же транзакции, что и финальный update интента). Для диагностики «сколько прошло от INSERT до завершения». |
Значения action¶
| Значение | Когда вставляется | Что делает воркер |
|---|---|---|
post_pending |
После успешного PSP confirm (IPPS вернул OK) — для коммита замороженных средств. |
Строит post_pending_transfer для каждого pending_id из intent.tb_transfer_ids (linked-chain), плюс отдельный psp_fee transfer revenue → nostro если pspFeeSatang > 0. После успеха — UPDATE intent.status='SETTLED', заполнение intent.to_name из psp_tx_map, заполнение settlement_date от PSP, запись tx_history через writeSettlement(), публикация intent.{id} в Redis, push-notify. |
void_pending |
После отказа PSP (IPPS confirm вернул ошибку) — для возврата замороженных средств. |
Строит void_pending_transfer для каждого pending_id (linked-chain). После успеха — UPDATE intent.status='FAILED', failure_reason (из psp_tx_map.last_error либо дефолтное 'PSP payment voided'), запись intent_event, публикация FAILED в Redis, push-notify. |
Полный набор OutboxAction = 'post_pending' | 'void_pending' — больше значений нет; миграции с расширением пока не запланированы.
Значения status¶
| Значение | Кто ставит | Семантика |
|---|---|---|
pending |
Каналы / PSP confirm-handler при INSERT (default-значение колонки). | Событие ждёт обработки. Воркер забирает пачкой по created_at. |
processed |
OutboxWorker при успешной обработке (внутри транзакции вместе с финальным update интента). |
Событие завершено успешно; processed_at заполнен. Строки сохраняются — это аудит и материал для tracing/post-mortem. |
failed |
Зарезервировано. На момент написания (drizzle/migrations/0000_init.sql, CHECK допускает значение) текущий код outbox-worker.ts этот статус не ставит — fatal-ошибки только инкрементируют retry_count, оставляя status='pending', и событие будет повторно подобрано в следующем цикле. Переключение в failed требует решения «когда сдаться» (например, retry_count > N → перевести в failed + alert операторам). |
Полный набор OutboxStatus = 'pending' | 'processed' | 'failed' — никаких RESERVED/INIT/DONE не существует.
Индексы¶
| Индекс | Тип | Колонки | Назначение |
|---|---|---|---|
outbox_event_pkey |
btree, PK | id |
UPDATE по конкретному событию. |
outbox_event_status_created_at_idx |
btree | (status, created_at) |
Основной запрос воркера: WHERE status='pending' ORDER BY created_at LIMIT 10 (FIFO-выборка пачки на тик). |
outbox_event_intent_id_idx |
btree | (intent_id) |
Поиск событий конкретного интента (диагностика, повторная обработка вручную). |
Связи¶
Логические (без FK)¶
| Связь | Куда | Колонка |
|---|---|---|
N : 1 |
pm.intent — родительский интент |
intent_id → intent.id |
(косвенно) N : 1 |
pm.psp_tx_map — данные PSP (display name, psp_fee, settlement_date, last_error) |
воркер JOIN psp_tx_map ON psp_tx_map.intent_id = outbox_event.intent_id |
FK на intent.id не объявлен — это сознательное решение по аналогии с другими дочерними таблицами PM (tx_history, intent_event). Целостность держится прикладным кодом + индексом outbox_event_intent_id_idx.
Сторонние эффекты при processing¶
- TigerBeetle:
createTransfers([...])— пакетная linked-chain. - Redis:
PUBLISH intent.{id}со статусомSETTLED/FAILED— для live-подписки клиента. - Notifications service:
publishPaymentNotification()— push в Firebase через Redis Stream.
Связанный код¶
| Модуль / функция | Роль |
|---|---|
src/intent/outbox-worker.ts |
Главный потребитель таблицы. |
startOutboxWorker(intervalMs=1000) |
Запуск таймера, тикающего раз в секунду; вызывается из bootstrap.ts только при роли outbox-worker (WORKER_ROLES: api, outbox-worker, psp-worker, invoice-expiry). |
processOutboxBatch() |
Забирает пачку SELECT * FROM outbox_event WHERE status='pending' ORDER BY created_at LIMIT 10. Для каждого события подгружает intent, диспатчит на processPostPending / processVoidPending, ловит исключения и инкрементирует retry_count. |
processPostPending(event, rec) |
Подгружает psp_tx_map (receiverDisplayName, pspFeeSatang, settlementDate), строит TB-transfers через buildPostTransfers(), вызывает tb.createTransfers(), в транзакции — UPDATE intent SET status='SETTLED' ... + writeSettlement() + UPDATE outbox_event SET status='processed'. После транзакции — writeIntentEvent, publishStatus, publishPaymentNotification. |
processVoidPending(event, rec) |
Строит void_pending_transfer через buildVoidTransfers(), вызывает tb.createTransfers(), в транзакции — UPDATE intent SET status='FAILED', failure_reason=... + UPDATE outbox_event SET status='processed'. |
buildPostTransfers(rec, fee?) |
Собирает TB-batch: post по каждому pending_id из intent.tb_transfer_ids (linked, последний — без linked-флага) + опциональный fee transfer revenue.THB → nostro.ipps.THB с кодом PSP_FEE. |
buildVoidTransfers(rec) |
Симметрично, но void_pending_transfer для каждого pending_id. |
src/intent/settlement-writer.ts |
writeSettlement() — пишет tx_history по feeSplits из payload. |
src/intent/intent-events.ts |
writeIntentEvent() — пишет intent_event. |
Инвариант: один OutboxWorker¶
Критический инвариант (NO-GO): в кластере должна работать ровно одна инстанция
OutboxWorker. Несколько воркеров одновременно — недопустимы.Причина:
processOutboxBatch()забирает пачку обычнымSELECT ... WHERE status='pending' ORDER BY created_at LIMIT 10— безFOR UPDATE SKIP LOCKEDи без advisory locks. Если два процесса увидят одну и ту же строку, оба попытаютсяcreateTransfers()в TB. TB сам по себе идемпотентен на уровне ID transfer-а (мы пере-используем те же UUID, и второй получитpending_transfer_already_posted/already_voided— это не fatal), но второй процесс всё равно проведёт повторныйUPDATE intent, повторно вызоветwriteSettlement()(→ дубликаты вtx_history), повторно опубликует Redis-событие и повторно отправит push. Это и есть race-condition.
Реальные WORKER_ROLES = 'api' | 'outbox-worker' | 'psp-worker' | 'invoice-expiry' (см. WORKERS). В production-конфиге outbox-worker поднимается ровно одним подом (см. DEPLOYMENT).
Заготовка на будущее (tech-debt-outbox-multi-instance): перевести pickup на
SELECT ... FOR UPDATE SKIP LOCKED LIMIT 10(стандартный паттерн «one row → one consumer» в PostgreSQL) либо на advisory-lock per row (pg_try_advisory_xact_lock(intent_id::bigint)). Это снимет single-instance ограничение и позволит горизонтально масштабировать обработку. Альтернатива — pgmq / Listen/Notify-диспатч. Сейчас намеренно не сделано: нагрузка одной инстанции с запасом покрывает текущий поток.
Примеры запросов¶
Текущая очередь (то, что увидит воркер на следующем тике)¶
SELECT id, intent_id, action, retry_count, created_at
FROM pm.outbox_event
WHERE status = 'pending'
ORDER BY created_at
LIMIT 10;
(тот же запрос, что в processOutboxBatch(); обслуживается индексом outbox_event_status_created_at_idx)
События одного интента (диагностика)¶
SELECT id, action, status, retry_count, created_at, processed_at
FROM pm.outbox_event
WHERE intent_id = $1
ORDER BY created_at;
(индекс outbox_event_intent_id_idx)
Зависшие события: pending дольше N минут¶
SELECT id, intent_id, action, retry_count, created_at
FROM pm.outbox_event
WHERE status = 'pending'
AND created_at < now() - interval '5 minutes'
ORDER BY created_at;
(должно быть пусто; если есть строки — либо воркер не запущен, либо TB/PSP проблемы; см. retry_count)
Топ-проблемные события по числу retry¶
SELECT id, intent_id, action, retry_count, created_at
FROM pm.outbox_event
WHERE status = 'pending'
AND retry_count > 5
ORDER BY retry_count DESC, created_at;
(кандидаты на ручной разбор / эскалацию в MANUAL_REVIEW интента)
Распределение по action за последние сутки¶
SELECT action,
status,
count(*),
avg(extract(epoch FROM (processed_at - created_at))) FILTER (WHERE processed_at IS NOT NULL) AS avg_latency_sec
FROM pm.outbox_event
WHERE created_at > now() - interval '24 hours'
GROUP BY action, status
ORDER BY action, status;
(latency processing — индикатор здоровья воркера)
Сценарий ручного re-enqueue (после фикса fatal-ошибки)¶
-- Сбросить retry_count и перезапустить процессинг конкретного события
UPDATE pm.outbox_event
SET retry_count = 0
WHERE id = $1
AND status = 'pending';
(переводить из processed обратно в pending нельзя — побочные эффекты уже произошли; для повторной обработки фейлового интента используется другой механизм через admin-эндпоинт)