Модуль: Workers¶
Фоновые процессы и периодические задачи Payment Manager — все запускаются как setInterval-петли (без node-cron, без BullMQ) и активируются по списку WORKER_ROLES в env.
1. Назначение модуля¶
Workers — это асинхронные движки PM, которые работают вне HTTP-цикла. Они закрывают четыре независимые задачи:
- OutboxWorker — атомарно проводит TigerBeetle
post_pending/void_pendingтранзакции из таблицыpm.outbox_eventпосле решения PSP (или после успешного двухфазного pre-flight). - PspWorker — конечный автомат интеграции с PSP (на сегодня — IPPS): query → confirm с лизами строк в
pm.psp_tx_map. - BalanceMonitor — периодически сравнивает баланс
system.nostro.ipps.THBв TB с балансом партнёра у PSP, алертит при drift и низком остатке. - InvoiceExpirySweep — переводит просроченные
INVOICE_PAYMENTинтенты изCREATEDвEXPIRED.
Все четыре поднимаются из src/server.ts в зависимости от того, какие роли есть в WORKER_ROLES. Один pod может совмещать несколько ролей; критичные роли (outbox-worker) запускаются строго в одном экземпляре.
2. Структура файлов¶
| Файл | Назначение |
|---|---|
src/intent/outbox-worker.ts |
OutboxWorker — TB post_pending / void_pending из pm.outbox_event. Формально лежит в intent/ (тесно связан с saga-логикой intent-а), но логически — фоновый worker. |
src/workers/psp-worker.ts |
PspWorker — лизирует строки pm.psp_tx_map через FOR UPDATE SKIP LOCKED, прогоняет их через PSP-драйвер, применяет PspProcessOutcome к БД и пишет outbox-событие при terminal-исходе. |
src/workers/balance-monitor.ts |
BalanceMonitor — runBalanceTick(pspName): lookup TB nostro + driver.getPartnerBalance() → сравнение → alert events balance_drift / low_partner_balance / balance_ok. |
src/jobs/invoice-expiry.ts |
InvoiceExpirySweep — SELECT … WHERE operationType='INVOICE_PAYMENT' AND status='CREATED' AND expires_at < now() + channel.expire() для каждой строки. |
3. Ключевые типы¶
// Роли воркеров (значения env WORKER_ROLES; объявлены неявно через перечисление в server.ts)
type WorkerRole = 'api' | 'outbox-worker' | 'psp-worker' | 'invoice-expiry'
// Действия, которые OutboxWorker применяет к TigerBeetle
// (src/shared/schema.ts)
type OutboxAction = 'post_pending' | 'void_pending'
// Состояния PSP-машины (src/shared/schema.ts)
type PspState =
| 'NEW'
| 'QUERY_PENDING'
| 'QUERIED'
| 'CONFIRM_PENDING'
| 'INQUIRING'
| 'CONFIRMED'
| 'FAILED'
| 'MANUAL_REVIEW' // ручное вмешательство ops — пограничный orphan-rqUID
MANUAL_REVIEW — terminal-pending: worker не может авторазрешить (например, потерян confirm_rqUid после crash перед записью). Ops через admin-роуты переводит в CONFIRMED либо FAILED.
4. Основные функции¶
| Функция | Файл | Делает |
|---|---|---|
startOutboxWorker(intervalMs) |
intent/outbox-worker.ts |
Запускает setInterval(processOutboxBatch, intervalMs). |
processOutboxBatch() |
intent/outbox-worker.ts |
Берёт до 10 pending событий, для каждого: загружает intent, разводит на processPostPending / processVoidPending, инкрементит retry_count при ошибке. |
startPspWorker(pspName) |
workers/psp-worker.ts |
setInterval(processPspBatch(pspName), PSP_POLL_INTERVAL_MS). |
processPspBatch(pspName) |
workers/psp-worker.ts |
pickUpJobs (FOR UPDATE SKIP LOCKED) → loadContext → driver.process → applyOutcome (completed / failed / in-progress / manual-review). |
startBalanceMonitor(pspName) |
workers/balance-monitor.ts |
setInterval(runBalanceTick(pspName), BALANCE_TICK_MS). |
runBalanceTick(pspName) |
workers/balance-monitor.ts |
TB lookup system.nostro.ipps.THB + driver.getPartnerBalance() → сравнение с порогами. |
startInvoiceExpirySweep(intervalMs, batchSize) |
jobs/invoice-expiry.ts |
setInterval(processExpiredInvoices(batchSize), intervalMs). |
processExpiredInvoices(batchSize) |
jobs/invoice-expiry.ts |
SELECT просроченных INVOICE_PAYMENT + вызов channel.expire() для каждого. |
5. Жизненный цикл¶
Все четыре worker-а построены по единому шаблону:
export function startXxxWorker(...): NodeJS.Timeout {
return setInterval(() => {
doBatch().catch((err) => logger.error({ err }, '...'))
}, INTERVAL_MS)
}
Запуск. В src/server.ts после старта приложения:
const roles = config.WORKER_ROLES
const timers: NodeJS.Timeout[] = []
if (roles.includes('outbox-worker')) timers.push(startOutboxWorker(config.OUTBOX_INTERVAL_MS))
if (roles.includes('invoice-expiry')) timers.push(startInvoiceExpirySweep(...))
if (roles.includes('psp-worker')) {
for (const pspName of config.PSP_NAMES) {
timers.push(startPspWorker(pspName))
if (config.BALANCE_MONITOR_ENABLED) timers.push(startBalanceMonitor(pspName))
}
}
Остановка. Серверный onClose-хук:
Плюс собственный обработчик SIGTERM/SIGINT, чтобы headless-pod (без роли api) тоже корректно завершался — без него Fastify не зарегистрировал бы сигналы и таймеры не успели бы дрейнить.
НЕ используются: node-cron, BullMQ, agenda. Любой periodic-flow в PM — это setInterval поверх БД-очереди (Postgres). Это сознательная архитектурная инвестиция: BullMQ остаётся внутри PSP-адаптеров и не пересекает границы PM (см. CLAUDE.md).
6. Конфигурация¶
| Переменная | Default | Кем используется |
|---|---|---|
WORKER_ROLES |
api,outbox-worker |
server.ts — какие фоновые задачи поднимать на pod-е. Допустимые значения: api, outbox-worker, psp-worker, invoice-expiry. |
PSP_NAMES |
IPPS |
server.ts — список PSP, для которых стартует psp-worker (и опционально balance-monitor). |
OUTBOX_INTERVAL_MS |
1000 |
startOutboxWorker — пауза между batch-ами. |
PSP_POLL_INTERVAL_MS |
500 |
startPspWorker — пауза между batch-ами. |
PSP_LEASE_SEC |
10 |
pickUpJobs — TTL первичного лиза (retry_count = 0). |
PSP_RETRY_LEASE_SEC |
30 |
pickUpJobs — TTL лиза при retry_count ≥ 1 (backoff). |
BALANCE_MONITOR_ENABLED |
false |
server.ts — включает startBalanceMonitor внутри роли psp-worker. Это не отдельная роль. |
BALANCE_TICK_MS |
60000 |
startBalanceMonitor — частота сравнения TB ↔ PSP. |
IPPS_DRIFT_THRESHOLD_SATANG |
10000 (100 THB) |
порог balance_drift-алерта. |
IPPS_LOW_BALANCE_THRESHOLD_SATANG |
100000 (1000 THB) |
порог low_partner_balance-алерта. |
INVOICE_EXPIRY_SWEEP_INTERVAL_MS |
30000 |
startInvoiceExpirySweep — пауза между sweep-ами. |
INVOICE_EXPIRY_BATCH_SIZE |
100 |
размер одного batch-а sweep-а. |
Реальные значения WORKER_ROLES — только эти четыре: api, outbox-worker, psp-worker, invoice-expiry. Никакого outbox, psp-ipps, balance-monitor как самостоятельных ролей не существует — это типовая ошибка при чтении старых doc-ов.
BalanceMonitor — это не роль, а флаг BALANCE_MONITOR_ENABLED=true внутри роли psp-worker. На pod-е без psp-worker балансы никогда не проверяются.
7. Инварианты¶
OutboxWorker — один процесс одновременно¶
CLAUDE.md фиксирует NO-GO:
NEVER run multiple OutboxWorker instances simultaneously
Текущая реализация (processOutboxBatch) не использует FOR UPDATE SKIP LOCKED при выборе событий из outbox_event — это SELECT … WHERE status='pending' ORDER BY created_at LIMIT 10. Два параллельных worker-а возьмут одни и те же 10 строк, и оба попробуют вызвать tb.createTransfers — TB защитит от двойной проводки (pending_transfer_already_posted фильтруется как не-fatal), но это не означает безопасности: side-effects (writeSettlement, publishStatus, publishPaymentNotification) сработают дважды. Поэтому деплоят строго один pod с ролью outbox-worker до миграции на advisory locks / FOR UPDATE SKIP LOCKED (см. §10).
PspWorker — multi-instance OK¶
pickUpJobs уже использует WITH picked AS (… FOR UPDATE SKIP LOCKED) UPDATE … — параллельные worker-ы конкурируют за строки безопасно. leased_by = ${hostname}.${pid} помогает аудиту.
invoice-expiry — multi-instance OK, но избыточно¶
channel.expire() атомарен (UPDATE WHERE status='CREATED' AND expires_at < now() RETURNING …). Несколько pod-ов с этой ролью не сломают данные, но даром нагружают БД. По умолчанию — один pod.
transit.balance = 0¶
Главный финансовый инвариант (CLAUDE.md). OutboxWorker не нарушает его, потому что использует только post_pending / void_pending для уже зарезервированных pending-трансферов — TB сам блокирует повторную проводку.
8. Тестирование¶
| Файл теста | Покрытие |
|---|---|
test/workers/psp-worker.test.ts |
unit: pickUpJobs (race), loadContext (missing intent / wallet), applyOutcome для всех четырёх kind, escalateMissingContext. |
test/workers/balance-monitor.test.ts |
unit: runBalanceTick — drift > threshold, low_balance, oba alerts, отсутствие getPartnerBalance, TB failure, PSP failure. |
test/integration/invoice-flow.test.ts |
integration: полный flow CREATED → expire → EXPIRED для INVOICE_PAYMENT. |
test/integration/notifications.test.ts |
integration: outbox → SETTLED/FAILED → publishPaymentNotification → Redis stream. |
Запуск:
OutboxWorker сам по себе не имеет отдельного unit-теста — его логика тестируется через end-to-end intent-flow в test/integration/* (intent → confirm → outbox → SETTLED).
9. Связанные модули¶
psp.md— PSP-драйвер иprocess()-контракт, который вызывает PspWorker.intent.md— OutboxWorker формально живёт вintent/; описание saga,intent_event,outbox_event.channels.md—channel.expire(), который вызывает InvoiceExpirySweep.ledger.md— TBpost_pending/void_pending, account-id derivation для BalanceMonitor.../reference/database/09-outbox-event.md— схемаpm.outbox_event.../reference/database/07-psp-tx-map.md— схемаpm.psp_tx_map, CHECK наstate.../operations/worker-roles.md— как раскладывать роли по pod-ам в production.
10. Заготовки на будущее¶
Multi-instance OutboxWorker. Сейчас — strict single-instance. Чтобы убрать это ограничение, понадобится одно из:
FOR UPDATE SKIP LOCKEDпри чтенииoutbox_event(как уже сделано вpsp-worker).- PostgreSQL advisory locks (
pg_try_advisory_xact_lock(hashtext('outbox-worker'))) — глобальный mutex на batch. - Лидер-выбор через Redis lease (Redlock).
До этого момента — один pod, и это явная обязательная инвестиция в надёжность.
Redis Streams для PSP (Phase 2B). Текущая модель — Postgres-очередь (psp_tx_map + lease-poll). Phase 2B (см. ARCHITECTURE.md) выносит PSP-адаптеры в отдельные сервисы, общаясь через stream.ipps.jobs / stream.ipps.results. PspWorker превратится в продюсера jobs + консьюмера results; state-machine останется в PM, но driver.process уедет в адаптер. Это даст изоляцию failure-domain-ов (если IPPS-driver падает, PM продолжает обслуживать INTERNAL_P2P и INVOICE_PAYMENT).
Distributed tracing. Сейчас в логах есть intentId, rowId, event, но нет trace_id propagation между worker-ами. План — добавить OpenTelemetry-span с trace_id = intent_id (CLAUDE.md cross-service правило).
Adaptive polling. Все четыре worker-а опрашивают БД с фиксированным интервалом. При пустых батчах — лишняя нагрузка; при бурст-трафике — задержка. Решение — exponential backoff на пустых батчах + NOTIFY/LISTEN-нотификация при INSERT в outbox_event / psp_tx_map.