Перейти к содержанию

09 outbox event

Transactional outbox для двухфазных TigerBeetle-операций: каждая строка — отложенный post_pending или void_pending для конкретного интента.

Имя таблицы

pm.outbox_event

Назначение

Очередь команд, которые OutboxWorker дозавершает асинхронно после того, как HTTP-handler уже ответил клиенту. Используется только для двухфазных каналов TigerBeetle (AUTHORIZED → SETTLED|FAILED):

  • post_pending — подтвердить pending-transfer (закоммитить деньги получателю; для IPPS дополнительно создать psp_fee transfer);
  • void_pending — отменить pending-transfer (разморозить средства у плательщика; используется при отказе PSP).

Запись добавляется внутри той же транзакции, что и AUTHORIZED-update, поэтому либо и pending-transfer в TB существует, и outbox-строка создана, либо ничего не произошло — это и есть transactional outbox pattern. Без таблицы пришлось бы либо синхронно ждать TB во время HTTP-ответа, либо ловить «деньги заморожены, но завершить не удалось» при крахе процесса.

  • Пишут: только Payment Manager:
  • каналы из src/channels/ и IPPS confirm-обработчик (src/psp/) — INSERT при переходе AUTHORIZED → settle/void;
  • src/intent/outbox-worker.ts — UPDATE status и retry_count.
  • Читают: только сам OutboxWorker. Внешние сервисы и Admin Panel в эту таблицу не лезут — её состояние видно через pm.intent (status, tb_transfer_ids).

DDL

DDL из миграции 0000_init.sql — таблица создаётся сразу в финальном виде, последующие миграции её не трогают.

CREATE TABLE "pm"."outbox_event" (
    "id" serial PRIMARY KEY NOT NULL,
    "intent_id" uuid NOT NULL,
    "action" varchar(20) NOT NULL,
    "status" varchar(20) DEFAULT 'pending' NOT NULL,
    "payload" jsonb,
    "retry_count" integer DEFAULT 0 NOT NULL,
    "created_at" timestamp with time zone DEFAULT now() NOT NULL,
    "processed_at" timestamp with time zone,
    CONSTRAINT "outbox_event_action_chk" CHECK ("pm"."outbox_event"."action" IN ('post_pending', 'void_pending')),
    CONSTRAINT "outbox_event_status_chk" CHECK ("pm"."outbox_event"."status" IN ('pending', 'processed', 'failed'))
);

CREATE INDEX "outbox_event_status_created_at_idx" ON "pm"."outbox_event" USING btree ("status","created_at");
CREATE INDEX "outbox_event_intent_id_idx" ON "pm"."outbox_event" USING btree ("intent_id");

Важно: action и status реализованы как VARCHAR(20) + CHECK-constraints, не PG enum. Это упрощает миграции (добавить новое значение — ALTER ... DROP CONSTRAINT; ADD CONSTRAINT) и согласуется с подходом по другим таблицам PM (см. 01-schema-overview.md).

Поля

Поле Тип NULL Default Описание
id serial no автогенерация PK; внутренний порядковый номер события. Используется для UPDATE ... WHERE id=$1.
intent_id uuid no Логическая ссылка на pm.intent.id (без FK; целостность гарантируется кодом). По нему OutboxWorker подгружает строку intent для построения TB-transfers.
action varchar(20) no Команда: post_pending или void_pending. Тип OutboxAction — см. src/shared/schema.ts строка 227. CHECK-constraint outbox_event_action_chk запрещает прочие значения.
status varchar(20) no 'pending' Жизненный цикл события: pendingprocessed (успех) либо failed (зарезервировано, см. ниже). Тип OutboxStatussrc/shared/schema.ts строка 228.
payload jsonb yes Доп. контекст для воркера. Сейчас используется только под feeSplits: ResolvedFeeSplit[] — pre-resolved fee-split, который OutboxWorker передаёт в writeSettlement() для генерации tx_history. Расширяемое поле — можно дописать что-то новое без миграции.
retry_count integer no 0 Число неуспешных попыток. Инкрементируется воркером (а) при fatal-ошибках TB (createTransfers вернул не-created/не-pending_*_already_*), (б) при отсутствии fee-аккаунтов в tb_account_map, (в) при необработанном исключении в основном цикле. Само по себе не ограничивает повторы — это просто счётчик; политика alerts/manual review строится поверх него (см. WORKERS).
created_at timestamptz no now() Момент INSERT (внутри транзакции AUTHORIZED-update). Также участвует в индексе outbox_event_status_created_at_idx и в ORDER BY воркера (FIFO).
processed_at timestamptz yes Заполняется при переходе в status='processed' (внутри той же транзакции, что и финальный update интента). Для диагностики «сколько прошло от INSERT до завершения».

Значения action

Значение Когда вставляется Что делает воркер
post_pending После успешного PSP confirm (IPPS вернул OK) — для коммита замороженных средств. Строит post_pending_transfer для каждого pending_id из intent.tb_transfer_ids (linked-chain), плюс отдельный psp_fee transfer revenue → nostro если pspFeeSatang > 0. После успеха — UPDATE intent.status='SETTLED', заполнение intent.to_name из psp_tx_map, заполнение settlement_date от PSP, запись tx_history через writeSettlement(), публикация intent.{id} в Redis, push-notify.
void_pending После отказа PSP (IPPS confirm вернул ошибку) — для возврата замороженных средств. Строит void_pending_transfer для каждого pending_id (linked-chain). После успеха — UPDATE intent.status='FAILED', failure_reason (из psp_tx_map.last_error либо дефолтное 'PSP payment voided'), запись intent_event, публикация FAILED в Redis, push-notify.

Полный набор OutboxAction = 'post_pending' | 'void_pending' — больше значений нет; миграции с расширением пока не запланированы.

Значения status

Значение Кто ставит Семантика
pending Каналы / PSP confirm-handler при INSERT (default-значение колонки). Событие ждёт обработки. Воркер забирает пачкой по created_at.
processed OutboxWorker при успешной обработке (внутри транзакции вместе с финальным update интента). Событие завершено успешно; processed_at заполнен. Строки сохраняются — это аудит и материал для tracing/post-mortem.
failed Зарезервировано. На момент написания (drizzle/migrations/0000_init.sql, CHECK допускает значение) текущий код outbox-worker.ts этот статус не ставит — fatal-ошибки только инкрементируют retry_count, оставляя status='pending', и событие будет повторно подобрано в следующем цикле. Переключение в failed требует решения «когда сдаться» (например, retry_count > N → перевести в failed + alert операторам).

Полный набор OutboxStatus = 'pending' | 'processed' | 'failed' — никаких RESERVED/INIT/DONE не существует.

Индексы

Индекс Тип Колонки Назначение
outbox_event_pkey btree, PK id UPDATE по конкретному событию.
outbox_event_status_created_at_idx btree (status, created_at) Основной запрос воркера: WHERE status='pending' ORDER BY created_at LIMIT 10 (FIFO-выборка пачки на тик).
outbox_event_intent_id_idx btree (intent_id) Поиск событий конкретного интента (диагностика, повторная обработка вручную).

Связи

Логические (без FK)

Связь Куда Колонка
N : 1 pm.intent — родительский интент intent_idintent.id
(косвенно) N : 1 pm.psp_tx_map — данные PSP (display name, psp_fee, settlement_date, last_error) воркер JOIN psp_tx_map ON psp_tx_map.intent_id = outbox_event.intent_id

FK на intent.id не объявлен — это сознательное решение по аналогии с другими дочерними таблицами PM (tx_history, intent_event). Целостность держится прикладным кодом + индексом outbox_event_intent_id_idx.

Сторонние эффекты при processing

  • TigerBeetle: createTransfers([...]) — пакетная linked-chain.
  • Redis: PUBLISH intent.{id} со статусом SETTLED / FAILED — для live-подписки клиента.
  • Notifications service: publishPaymentNotification() — push в Firebase через Redis Stream.

Связанный код

Модуль / функция Роль
src/intent/outbox-worker.ts Главный потребитель таблицы.
startOutboxWorker(intervalMs=1000) Запуск таймера, тикающего раз в секунду; вызывается из bootstrap.ts только при роли outbox-worker (WORKER_ROLES: api, outbox-worker, psp-worker, invoice-expiry).
processOutboxBatch() Забирает пачку SELECT * FROM outbox_event WHERE status='pending' ORDER BY created_at LIMIT 10. Для каждого события подгружает intent, диспатчит на processPostPending / processVoidPending, ловит исключения и инкрементирует retry_count.
processPostPending(event, rec) Подгружает psp_tx_map (receiverDisplayName, pspFeeSatang, settlementDate), строит TB-transfers через buildPostTransfers(), вызывает tb.createTransfers(), в транзакции — UPDATE intent SET status='SETTLED' ... + writeSettlement() + UPDATE outbox_event SET status='processed'. После транзакции — writeIntentEvent, publishStatus, publishPaymentNotification.
processVoidPending(event, rec) Строит void_pending_transfer через buildVoidTransfers(), вызывает tb.createTransfers(), в транзакции — UPDATE intent SET status='FAILED', failure_reason=... + UPDATE outbox_event SET status='processed'.
buildPostTransfers(rec, fee?) Собирает TB-batch: post по каждому pending_id из intent.tb_transfer_ids (linked, последний — без linked-флага) + опциональный fee transfer revenue.THB → nostro.ipps.THB с кодом PSP_FEE.
buildVoidTransfers(rec) Симметрично, но void_pending_transfer для каждого pending_id.
src/intent/settlement-writer.ts writeSettlement() — пишет tx_history по feeSplits из payload.
src/intent/intent-events.ts writeIntentEvent() — пишет intent_event.

Инвариант: один OutboxWorker

Критический инвариант (NO-GO): в кластере должна работать ровно одна инстанция OutboxWorker. Несколько воркеров одновременно — недопустимы.

Причина: processOutboxBatch() забирает пачку обычным SELECT ... WHERE status='pending' ORDER BY created_at LIMIT 10без FOR UPDATE SKIP LOCKED и без advisory locks. Если два процесса увидят одну и ту же строку, оба попытаются createTransfers() в TB. TB сам по себе идемпотентен на уровне ID transfer-а (мы пере-используем те же UUID, и второй получит pending_transfer_already_posted/already_voided — это не fatal), но второй процесс всё равно проведёт повторный UPDATE intent, повторно вызовет writeSettlement() (→ дубликаты в tx_history), повторно опубликует Redis-событие и повторно отправит push. Это и есть race-condition.

Реальные WORKER_ROLES = 'api' | 'outbox-worker' | 'psp-worker' | 'invoice-expiry' (см. WORKERS). В production-конфиге outbox-worker поднимается ровно одним подом (см. DEPLOYMENT).

Заготовка на будущее (tech-debt-outbox-multi-instance): перевести pickup на SELECT ... FOR UPDATE SKIP LOCKED LIMIT 10 (стандартный паттерн «one row → one consumer» в PostgreSQL) либо на advisory-lock per row (pg_try_advisory_xact_lock(intent_id::bigint)). Это снимет single-instance ограничение и позволит горизонтально масштабировать обработку. Альтернатива — pgmq / Listen/Notify-диспатч. Сейчас намеренно не сделано: нагрузка одной инстанции с запасом покрывает текущий поток.

Примеры запросов

Текущая очередь (то, что увидит воркер на следующем тике)

SELECT id, intent_id, action, retry_count, created_at
FROM   pm.outbox_event
WHERE  status = 'pending'
ORDER  BY created_at
LIMIT  10;

(тот же запрос, что в processOutboxBatch(); обслуживается индексом outbox_event_status_created_at_idx)

События одного интента (диагностика)

SELECT id, action, status, retry_count, created_at, processed_at
FROM   pm.outbox_event
WHERE  intent_id = $1
ORDER  BY created_at;

(индекс outbox_event_intent_id_idx)

Зависшие события: pending дольше N минут

SELECT id, intent_id, action, retry_count, created_at
FROM   pm.outbox_event
WHERE  status = 'pending'
  AND  created_at < now() - interval '5 minutes'
ORDER  BY created_at;

(должно быть пусто; если есть строки — либо воркер не запущен, либо TB/PSP проблемы; см. retry_count)

Топ-проблемные события по числу retry

SELECT id, intent_id, action, retry_count, created_at
FROM   pm.outbox_event
WHERE  status = 'pending'
  AND  retry_count > 5
ORDER  BY retry_count DESC, created_at;

(кандидаты на ручной разбор / эскалацию в MANUAL_REVIEW интента)

Распределение по action за последние сутки

SELECT action,
       status,
       count(*),
       avg(extract(epoch FROM (processed_at - created_at))) FILTER (WHERE processed_at IS NOT NULL) AS avg_latency_sec
FROM   pm.outbox_event
WHERE  created_at > now() - interval '24 hours'
GROUP  BY action, status
ORDER  BY action, status;

(latency processing — индикатор здоровья воркера)

Сценарий ручного re-enqueue (после фикса fatal-ошибки)

-- Сбросить retry_count и перезапустить процессинг конкретного события
UPDATE pm.outbox_event
SET    retry_count = 0
WHERE  id = $1
  AND  status = 'pending';

(переводить из processed обратно в pending нельзя — побочные эффекты уже произошли; для повторной обработки фейлового интента используется другой механизм через admin-эндпоинт)