Перейти к содержанию

Модуль: Workers

Фоновые процессы и периодические задачи Payment Manager — все запускаются как setInterval-петли (без node-cron, без BullMQ) и активируются по списку WORKER_ROLES в env.


1. Назначение модуля

Workers — это асинхронные движки PM, которые работают вне HTTP-цикла. Они закрывают четыре независимые задачи:

  • OutboxWorker — атомарно проводит TigerBeetle post_pending/void_pending транзакции из таблицы pm.outbox_event после решения PSP (или после успешного двухфазного pre-flight).
  • PspWorker — конечный автомат интеграции с PSP (на сегодня — IPPS): query → confirm с лизами строк в pm.psp_tx_map.
  • BalanceMonitor — периодически сравнивает баланс system.nostro.ipps.THB в TB с балансом партнёра у PSP, алертит при drift и низком остатке.
  • InvoiceExpirySweep — переводит просроченные INVOICE_PAYMENT интенты из CREATED в EXPIRED.

Все четыре поднимаются из src/server.ts в зависимости от того, какие роли есть в WORKER_ROLES. Один pod может совмещать несколько ролей; критичные роли (outbox-worker) запускаются строго в одном экземпляре.


2. Структура файлов

Файл Назначение
src/intent/outbox-worker.ts OutboxWorker — TB post_pending / void_pending из pm.outbox_event. Формально лежит в intent/ (тесно связан с saga-логикой intent-а), но логически — фоновый worker.
src/workers/psp-worker.ts PspWorker — лизирует строки pm.psp_tx_map через FOR UPDATE SKIP LOCKED, прогоняет их через PSP-драйвер, применяет PspProcessOutcome к БД и пишет outbox-событие при terminal-исходе.
src/workers/balance-monitor.ts BalanceMonitor — runBalanceTick(pspName): lookup TB nostro + driver.getPartnerBalance() → сравнение → alert events balance_drift / low_partner_balance / balance_ok.
src/jobs/invoice-expiry.ts InvoiceExpirySweep — SELECT … WHERE operationType='INVOICE_PAYMENT' AND status='CREATED' AND expires_at < now() + channel.expire() для каждой строки.

3. Ключевые типы

// Роли воркеров (значения env WORKER_ROLES; объявлены неявно через перечисление в server.ts)
type WorkerRole = 'api' | 'outbox-worker' | 'psp-worker' | 'invoice-expiry'

// Действия, которые OutboxWorker применяет к TigerBeetle
// (src/shared/schema.ts)
type OutboxAction = 'post_pending' | 'void_pending'

// Состояния PSP-машины (src/shared/schema.ts)
type PspState =
  | 'NEW'
  | 'QUERY_PENDING'
  | 'QUERIED'
  | 'CONFIRM_PENDING'
  | 'INQUIRING'
  | 'CONFIRMED'
  | 'FAILED'
  | 'MANUAL_REVIEW'   // ручное вмешательство ops — пограничный orphan-rqUID

MANUAL_REVIEW — terminal-pending: worker не может авторазрешить (например, потерян confirm_rqUid после crash перед записью). Ops через admin-роуты переводит в CONFIRMED либо FAILED.


4. Основные функции

Функция Файл Делает
startOutboxWorker(intervalMs) intent/outbox-worker.ts Запускает setInterval(processOutboxBatch, intervalMs).
processOutboxBatch() intent/outbox-worker.ts Берёт до 10 pending событий, для каждого: загружает intent, разводит на processPostPending / processVoidPending, инкрементит retry_count при ошибке.
startPspWorker(pspName) workers/psp-worker.ts setInterval(processPspBatch(pspName), PSP_POLL_INTERVAL_MS).
processPspBatch(pspName) workers/psp-worker.ts pickUpJobs (FOR UPDATE SKIP LOCKED) → loadContextdriver.processapplyOutcome (completed / failed / in-progress / manual-review).
startBalanceMonitor(pspName) workers/balance-monitor.ts setInterval(runBalanceTick(pspName), BALANCE_TICK_MS).
runBalanceTick(pspName) workers/balance-monitor.ts TB lookup system.nostro.ipps.THB + driver.getPartnerBalance() → сравнение с порогами.
startInvoiceExpirySweep(intervalMs, batchSize) jobs/invoice-expiry.ts setInterval(processExpiredInvoices(batchSize), intervalMs).
processExpiredInvoices(batchSize) jobs/invoice-expiry.ts SELECT просроченных INVOICE_PAYMENT + вызов channel.expire() для каждого.

5. Жизненный цикл

Все четыре worker-а построены по единому шаблону:

export function startXxxWorker(...): NodeJS.Timeout {
  return setInterval(() => {
    doBatch().catch((err) => logger.error({ err }, '...'))
  }, INTERVAL_MS)
}

Запуск. В src/server.ts после старта приложения:

const roles = config.WORKER_ROLES
const timers: NodeJS.Timeout[] = []

if (roles.includes('outbox-worker'))   timers.push(startOutboxWorker(config.OUTBOX_INTERVAL_MS))
if (roles.includes('invoice-expiry'))  timers.push(startInvoiceExpirySweep(...))
if (roles.includes('psp-worker')) {
  for (const pspName of config.PSP_NAMES) {
    timers.push(startPspWorker(pspName))
    if (config.BALANCE_MONITOR_ENABLED) timers.push(startBalanceMonitor(pspName))
  }
}

Остановка. Серверный onClose-хук:

app.addHook('onClose', async () => {
  for (const t of timers) clearInterval(t)
})

Плюс собственный обработчик SIGTERM/SIGINT, чтобы headless-pod (без роли api) тоже корректно завершался — без него Fastify не зарегистрировал бы сигналы и таймеры не успели бы дрейнить.

НЕ используются: node-cron, BullMQ, agenda. Любой periodic-flow в PM — это setInterval поверх БД-очереди (Postgres). Это сознательная архитектурная инвестиция: BullMQ остаётся внутри PSP-адаптеров и не пересекает границы PM (см. CLAUDE.md).


6. Конфигурация

Переменная Default Кем используется
WORKER_ROLES api,outbox-worker server.ts — какие фоновые задачи поднимать на pod-е. Допустимые значения: api, outbox-worker, psp-worker, invoice-expiry.
PSP_NAMES IPPS server.ts — список PSP, для которых стартует psp-worker (и опционально balance-monitor).
OUTBOX_INTERVAL_MS 1000 startOutboxWorker — пауза между batch-ами.
PSP_POLL_INTERVAL_MS 500 startPspWorker — пауза между batch-ами.
PSP_LEASE_SEC 10 pickUpJobs — TTL первичного лиза (retry_count = 0).
PSP_RETRY_LEASE_SEC 30 pickUpJobs — TTL лиза при retry_count ≥ 1 (backoff).
BALANCE_MONITOR_ENABLED false server.ts — включает startBalanceMonitor внутри роли psp-worker. Это не отдельная роль.
BALANCE_TICK_MS 60000 startBalanceMonitor — частота сравнения TB ↔ PSP.
IPPS_DRIFT_THRESHOLD_SATANG 10000 (100 THB) порог balance_drift-алерта.
IPPS_LOW_BALANCE_THRESHOLD_SATANG 100000 (1000 THB) порог low_partner_balance-алерта.
INVOICE_EXPIRY_SWEEP_INTERVAL_MS 30000 startInvoiceExpirySweep — пауза между sweep-ами.
INVOICE_EXPIRY_BATCH_SIZE 100 размер одного batch-а sweep-а.

Реальные значения WORKER_ROLES — только эти четыре: api, outbox-worker, psp-worker, invoice-expiry. Никакого outbox, psp-ipps, balance-monitor как самостоятельных ролей не существует — это типовая ошибка при чтении старых doc-ов.

BalanceMonitor — это не роль, а флаг BALANCE_MONITOR_ENABLED=true внутри роли psp-worker. На pod-е без psp-worker балансы никогда не проверяются.


7. Инварианты

OutboxWorker — один процесс одновременно

CLAUDE.md фиксирует NO-GO:

NEVER run multiple OutboxWorker instances simultaneously

Текущая реализация (processOutboxBatch) не использует FOR UPDATE SKIP LOCKED при выборе событий из outbox_event — это SELECT … WHERE status='pending' ORDER BY created_at LIMIT 10. Два параллельных worker-а возьмут одни и те же 10 строк, и оба попробуют вызвать tb.createTransfers — TB защитит от двойной проводки (pending_transfer_already_posted фильтруется как не-fatal), но это не означает безопасности: side-effects (writeSettlement, publishStatus, publishPaymentNotification) сработают дважды. Поэтому деплоят строго один pod с ролью outbox-worker до миграции на advisory locks / FOR UPDATE SKIP LOCKED (см. §10).

PspWorker — multi-instance OK

pickUpJobs уже использует WITH picked AS (… FOR UPDATE SKIP LOCKED) UPDATE … — параллельные worker-ы конкурируют за строки безопасно. leased_by = ${hostname}.${pid} помогает аудиту.

invoice-expiry — multi-instance OK, но избыточно

channel.expire() атомарен (UPDATE WHERE status='CREATED' AND expires_at < now() RETURNING …). Несколько pod-ов с этой ролью не сломают данные, но даром нагружают БД. По умолчанию — один pod.

transit.balance = 0

Главный финансовый инвариант (CLAUDE.md). OutboxWorker не нарушает его, потому что использует только post_pending / void_pending для уже зарезервированных pending-трансферов — TB сам блокирует повторную проводку.


8. Тестирование

Файл теста Покрытие
test/workers/psp-worker.test.ts unit: pickUpJobs (race), loadContext (missing intent / wallet), applyOutcome для всех четырёх kind, escalateMissingContext.
test/workers/balance-monitor.test.ts unit: runBalanceTick — drift > threshold, low_balance, oba alerts, отсутствие getPartnerBalance, TB failure, PSP failure.
test/integration/invoice-flow.test.ts integration: полный flow CREATED → expire → EXPIRED для INVOICE_PAYMENT.
test/integration/notifications.test.ts integration: outbox → SETTLED/FAILED → publishPaymentNotification → Redis stream.

Запуск:

npm test -- test/workers
npm test -- test/integration/invoice-flow

OutboxWorker сам по себе не имеет отдельного unit-теста — его логика тестируется через end-to-end intent-flow в test/integration/* (intent → confirm → outbox → SETTLED).


9. Связанные модули


10. Заготовки на будущее

Multi-instance OutboxWorker. Сейчас — strict single-instance. Чтобы убрать это ограничение, понадобится одно из:

  • FOR UPDATE SKIP LOCKED при чтении outbox_event (как уже сделано в psp-worker).
  • PostgreSQL advisory locks (pg_try_advisory_xact_lock(hashtext('outbox-worker'))) — глобальный mutex на batch.
  • Лидер-выбор через Redis lease (Redlock).

До этого момента — один pod, и это явная обязательная инвестиция в надёжность.

Redis Streams для PSP (Phase 2B). Текущая модель — Postgres-очередь (psp_tx_map + lease-poll). Phase 2B (см. ARCHITECTURE.md) выносит PSP-адаптеры в отдельные сервисы, общаясь через stream.ipps.jobs / stream.ipps.results. PspWorker превратится в продюсера jobs + консьюмера results; state-machine останется в PM, но driver.process уедет в адаптер. Это даст изоляцию failure-domain-ов (если IPPS-driver падает, PM продолжает обслуживать INTERNAL_P2P и INVOICE_PAYMENT).

Distributed tracing. Сейчас в логах есть intentId, rowId, event, но нет trace_id propagation между worker-ами. План — добавить OpenTelemetry-span с trace_id = intent_id (CLAUDE.md cross-service правило).

Adaptive polling. Все четыре worker-а опрашивают БД с фиксированным интервалом. При пустых батчах — лишняя нагрузка; при бурст-трафике — задержка. Решение — exponential backoff на пустых батчах + NOTIFY/LISTEN-нотификация при INSERT в outbox_event / psp_tx_map.