Worker Roles¶
Payment Manager — единый Node.js процесс, чьё поведение при старте определяется переменной окружения WORKER_ROLES. Один и тот же образ может работать API-сервером, фоновым воркером Outbox, PSP-воркером, sweep-задачей по инвойсам или произвольной их комбинацией. Этот документ описывает, какие роли существуют, какой код они запускают, какие переменные окружения на них влияют и какие инварианты эксплуатации обязаны соблюдаться при масштабировании.
Назначение¶
WORKER_ROLES — comma-separated список ролей в формате Zod-валидированной строки (src/shared/config.ts):
WORKER_ROLES: z.string()
.default('api,outbox-worker')
.transform((s) => s.split(',').map((r) => r.trim()).filter(Boolean))
Дефолт — api,outbox-worker (минимальная single-pod конфигурация для dev/SIT).
Парсер устойчив к пробелам и пустым элементам — api, outbox-worker, корректно превратится в ['api','outbox-worker'].
Все ветвления — в src/server.ts после загрузки конфигурации и подключения TigerBeetle:
const roles = config.WORKER_ROLES
if (roles.includes('outbox-worker')) { ... startOutboxWorker(...) }
if (roles.includes('invoice-expiry')) { ... startInvoiceExpirySweep(...) }
if (roles.includes('psp-worker')) { ... startPspWorker(...); startBalanceMonitor(...) }
if (roles.includes('api')) { app.listen(...) }
Регистрация роли в WORKER_ROLES, которой нет в этом списке, игнорируется молча — никакие ошибки не выбрасываются. Это сознательная гибкость для feature-flag сценариев, но требует аккуратности при опечатках (psp-ipps, outbox, balance-monitor — несуществующие имена, см. раздел «Запрещённые имена ролей»).
Сводная таблица ролей¶
| Роль | Запускаемый код | Можно несколько процессов? | Ключевые env vars | Типичная инсталляция |
|---|---|---|---|---|
api |
app.listen({ port: config.PORT, host: '0.0.0.0' }) |
Да (за L4-балансировщиком) | PORT, WORKER_ROLES |
2-4 pod-а, HPA по CPU/QPS |
outbox-worker |
startOutboxWorker(config.OUTBOX_INTERVAL_MS) |
НЕТ — строго 1 процесс | OUTBOX_INTERVAL_MS (default 1000) |
1 pod, replicas: 1, без HPA |
psp-worker |
startPspWorker(pspName) × PSP_NAMES |
Да (PG row-level lock) | PSP_NAMES, PSP_POLL_INTERVAL_MS, PSP_LEASE_SEC, PSP_RETRY_LEASE_SEC, PSP_MAX_RETRIES |
1-3 pod-а |
invoice-expiry |
startInvoiceExpirySweep(intervalMs, batchSize) |
Да (expire атомарен) | INVOICE_EXPIRY_SWEEP_INTERVAL_MS, INVOICE_EXPIRY_BATCH_SIZE |
1 pod (несколько безопасны, но избыточны) |
Реальные роли¶
api¶
HTTP-сервер на Fastify. Регистрирует все маршруты PM (/intents, /intents/:id/confirm, /intents/:id/cancel, /intents/quote, /accounts/balance, /policies/*, admin-роуты, Swagger UI на /docs).
- Запуск:
app.listen({ port: config.PORT, host: '0.0.0.0' })— финальный шаг после bootstrap-а TB, Redis, миграций каналов и регистрации OperationTypes. - Масштабирование: stateless, безопасно держать N pod-ов за L4/L7-балансировщиком. Никаких локальных кешей с записью — все side-effect-ы идут через TB, PostgreSQL или Redis pub/sub.
- HMAC-аутентификация на каждом запросе (
X-Service-Id,X-Timestamp,X-Signature) — без неё API-роуты возвращают401. - Без роли
api—app.listen()не вызывается; процесс работает headless (см. раздел «Headless mode»).
outbox-worker¶
Опрашивает таблицу pm.outbox_event (статус pending) и материализует решение саги в TigerBeetle — пост-pending для SETTLED, void-pending для FAILED. Также пишет в pm.settlement, обновляет pm.intent.status и публикует событие в Redis (intent.{id}) для подписчиков (Auth Center stream).
- Запуск:
startOutboxWorker(config.OUTBOX_INTERVAL_MS)—setIntervalкаждыеOUTBOX_INTERVAL_MSмс (default1000), батч до 10 событий. - Никакой блокировки строк —
outbox-workerсейчас читаетWHERE status = 'pending' ORDER BY created_at LIMIT 10безFOR UPDATE SKIP LOCKED. - Из этого следует ключевой инвариант: одновременно работающих процессов с ролью
outbox-workerдолжно быть ровно один. Два процесса будут гонкой обрабатывать одни и те же события — двойные post/void в TigerBeetle (TB защитит идемпотентностьюpending_transfer_already_posted/pending_transfer_already_voided, ноpm.settlementиintent.statusмогут уйти в неконсистентное состояние). - Деплой: Kubernetes Deployment с
replicas: 1,strategy.type=Recreate(без rolling — иначе на момент перезапуска возможны 0 или 2 одновременных воркера). HPA отключена. - Шкала нагрузки: один воркер обрабатывает ~10 событий/сек на дефолтном интервале; при росте нагрузки увеличивать
OUTBOX_INTERVAL_MSвниз (до 200 мс) либо размер батча в коде — НЕ количество процессов.
psp-worker¶
Опрашивает таблицу pm.psp_tx_map (state-машина IPPS/QP/Wise), вызывает driver.process() для каждого ряда, применяет результат (completed / failed / in-progress / manual-review) и при терминальном исходе вставляет событие в pm.outbox_event для последующего материализации outbox-worker-ом.
- Запуск:
startPspWorker(pspName)—setIntervalкаждыеPSP_POLL_INTERVAL_MSмс (default500). Один процесс запускает по одному таймеру на каждый PSP изPSP_NAMES(defaultIPPS). - Конкурентность безопасна:
pickUpJobs()используетSELECT ... FOR UPDATE SKIP LOCKEDвнутри CTE, плюс write-ahead state transition (NEW → QUERY_PENDING,QUERIED → CONFIRM_PENDING) — параллельные воркеры никогда не возьмут одну и ту же строку. - Lease-механизм:
leased_by = ${hostname}.${pid},leased_at = now(). Если процесс упал в середине обработки, строка освобождается черезPSP_LEASE_SEC(default 10 сек, без retries) илиPSP_RETRY_LEASE_SEC(default 30 сек, с retries). - Шкала нагрузки: горизонтально, до количества свободных слотов в PG-пуле. Типично 1-3 pod-а; на спайки заявок можно поднять до 5-6.
NEVER retry IPPS transfer requests— драйвер IPPS использует inquiry для проверки статуса вместо повторной отправки transfer (см.docs/dev/integrations/ipps.md).
invoice-expiry¶
Sweep-задача по двухфазным INVOICE_PAYMENT интентам со статусом CREATED и expires_at < now(). Для каждого вызывает channel.expire() — атомарный UPDATE WHERE status='CREATED' AND expires_at < now(), что гарантирует отсутствие двойного перехода в EXPIRED.
- Запуск:
startInvoiceExpirySweep(config.INVOICE_EXPIRY_SWEEP_INTERVAL_MS, config.INVOICE_EXPIRY_BATCH_SIZE)—setIntervalкаждые 30 сек (default), батч 100 (default). - Несколько pod-ов безопасно благодаря атомарному
expire()— гонка нескольких воркеров на один просроченный invoice разрешается на уровне PostgreSQL UPDATE (выигрывает первый, остальные получают 0 affected rows и пропускают). - На практике достаточно одного pod-а. Несколько процессов не дают выигрыша — они конкурируют за одни и те же 100 строк раз в 30 секунд.
- Типичная инсталляция: прицепляется к одному из имеющихся pod-ов (
WORKER_ROLES=api,invoice-expiryилиWORKER_ROLES=outbox-worker,invoice-expiry) — отдельный pod избыточен.
BalanceMonitor — НЕ отдельная роль¶
BalanceMonitor мониторит расхождение между балансом TB-аккаунта system.nostro.ipps.THB и партнёрским балансом из PSP (driver.getPartnerBalance()). Алерты: low_partner_balance, balance_drift, balance_ok.
Важно: balance-monitor НЕ является самостоятельным значением WORKER_ROLES. Он запускается внутри роли psp-worker при поднятом флаге BALANCE_MONITOR_ENABLED=true:
// src/server.ts
if (roles.includes('psp-worker')) {
for (const pspName of config.PSP_NAMES) {
timers.push(startPspWorker(pspName))
if (config.BALANCE_MONITOR_ENABLED) {
timers.push(startBalanceMonitor(pspName))
}
}
}
- Интервал:
BALANCE_TICK_MS(default60_000мс). - Пороги:
IPPS_LOW_BALANCE_THRESHOLD_SATANG(default100_000= 1000 THB),IPPS_DRIFT_THRESHOLD_SATANG(default10_000= 100 THB). - Запустить только BalanceMonitor (без PSP-воркера) невозможно — флаг не имеет эффекта, если в
WORKER_ROLESнетpsp-worker. - Включать BalanceMonitor в одном pod-е —
BALANCE_MONITOR_ENABLED=trueимеет смысл ставить только на ОДНОМ из psp-worker pod-ов, чтобы не множить алерты в логах в N раз. Сам по себе мониторинг идемпотентен и безопасен в нескольких экземплярах, но шумит.
Совмещение ролей в одном процессе¶
Роли через запятую — стандартный способ деплоить single-pod конфигурации:
# Dev/SIT — всё в одном процессе:
WORKER_ROLES=api,outbox-worker,psp-worker,invoice-expiry
# Prod (минимум) — API отдельно, фон отдельно:
# Pod A: WORKER_ROLES=api
# Pod B: WORKER_ROLES=outbox-worker,invoice-expiry
# Pod C-N: WORKER_ROLES=psp-worker
Никаких ограничений на совмещение нет — каждая ветка if (roles.includes(...)) независима. Единственный инвариант — суммарное число процессов с ролью outbox-worker по всему кластеру должно быть строго 1 (см. раздел outbox-worker).
Все роли разделяют те же подключения к PostgreSQL, TigerBeetle и Redis в рамках одного процесса (см. src/shared/db.ts, src/shared/tb.ts, src/shared/redis.ts), поэтому совмещение в dev не требует никаких дополнительных ресурсов.
Headless mode¶
Если в WORKER_ROLES отсутствует роль api, процесс не вызывает app.listen() — HTTP-сервер не поднимается:
if (roles.includes('api')) {
app.listen({ port: config.PORT, host: '0.0.0.0' }, ...)
} else {
logger.info({ roles }, 'PM started in headless mode (no api role)')
}
При этом Fastify-app всё равно собран и зарегистрированы все плагины — buildApp() отрабатывает полностью. Это нужно потому, что:
- background-роли используют те же модули (registerChannel, bootstrapPspDrivers, OperationTypes registry), что и API-роуты;
- app.close() корректно вызывает onClose-хуки (closeTb(), closeRedis(), остановка таймеров).
Сигнальные обработчики ставятся всегда, независимо от наличия api:
process.once('SIGTERM', () => { void shutdown('SIGTERM') })
process.once('SIGINT', () => { void shutdown('SIGINT') })
Без них в headless-режиме SIGTERM от Kubernetes убил бы процесс до того, как Fastify dispose-ит TB/Redis сокеты и остановит setInterval-таймеры (в headless-режиме Fastify сам сигналы не ловит — нечему слушать).
Health-эндпоинты в headless-режиме недоступны — /livez, /readyz обслуживаются Fastify, который не слушает порт. Для liveness/readiness в headless-pod-ах используйте exec-пробы (pgrep node или скрипт, проверяющий PID-файл).
Запрещённые имена ролей¶
Не существуют как значения WORKER_ROLES (опечатки приведут к молчаливому пропуску):
| Опечатка / устаревшее имя | Правильно |
|---|---|
outbox |
outbox-worker |
psp-ipps |
psp-worker (выбор PSP — через PSP_NAMES) |
balance-monitor |
psp-worker + BALANCE_MONITOR_ENABLED=true |
balance, monitor |
то же |
invoice, expiry |
invoice-expiry |
Если pod стартует и в логах нет ни одной строки ... starting (PspWorker starting, BalanceMonitor starting) и нет Fastify listening on ... — значит, скорее всего, опечатка в WORKER_ROLES. Проверьте config.WORKER_ROLES сразу после buildApp() или загляните в строку PM started in headless mode — она показывает фактический массив ролей.
Инварианты эксплуатации¶
- Ровно один
outbox-workerпроцесс в любой момент времени по всему кластеру. Деплой черезreplicas: 1+strategy.type=Recreate. psp-worker,invoice-expiry,api— горизонтально масштабируемы, гонки разрешаются на уровне PostgreSQL (FOR UPDATE SKIP LOCKEDдля PSP, атомарный UPDATE для invoice-expiry, stateless для api).BALANCE_MONITOR_ENABLED=true— ставить только на ОДНОМ из psp-worker pod-ов; на остальных оставлятьfalse, чтобы не дублировать алерты вlow_partner_balance/balance_drift.- Headless mode допустим (без
api) — сигнальные обработчики и onClose-хуки работают корректно; health-пробы должны быть exec-based, а не HTTP. - HMAC-аутентификация не зависит от ролей — она применяется к запросам в API-роуты. Background-роли HMAC не используют (внутрипроцессные вызовы PostgreSQL и TigerBeetle SDK не подписываются).
Startup-последовательность (любая комбинация ролей)¶
src/server.ts выполняет одну и ту же подготовку для всех ролей — это важно, чтобы headless-pod-ы тоже видели каналы, OperationTypes и подключения:
- Загрузка
configиз env через Zod (src/shared/config.ts). buildApp()— регистрация всех Fastify-плагинов и роутов (даже еслиapiне указан).connectTb(config.TB_ADDRESS)с retry-loop: 5 попыток, экспоненциальная задержка2000 × attemptмс. При полной недоступности TB —process.exit(1).seedAccounts(db)+verifySystemAccounts(db)— гарантия наличия system-аккаунтов в TB.registerChannel(...)—internal-p2p,ipps-transfer,service-transfer,admin-transfer,merchant-invoice.reconcile(db)— startup-reconciler для зависших intent-ов.bootstrapPspDrivers()— регистрация PSP-драйверов в registry (нужно даже для headless-pod-ов сoutbox-worker, чтобы при необходимости подгрузить metadata).- Ветвления по ролям →
setInterval-таймеры в массивtimers. process.once('SIGTERM'|'SIGINT', shutdown)— всегда.app.listen(...)только при наличии ролиapi.
Любой шаг 3-8 может выбросить — в этом случае process.exit(1) с залогированной ошибкой; Kubernetes перезапустит pod.
Graceful shutdown¶
app.addHook('onClose', ...) регистрирует три cleanup-хука (в порядке регистрации):
closeTb()— закрывает соединение с TigerBeetle.closeRedis()— закрывает Redis-клиент (pub/sub и кэши).- Остановка всех
setInterval-таймеров черезclearInterval(t).
Хендлер shutdown(signal) вызывает app.close(), затем process.exit(0). Если app.close() зависнет (например, не отвечает PostgreSQL), Kubernetes принудительно убьёт процесс через terminationGracePeriodSeconds (по умолчанию 30 сек). На практике этого хватает с большим запасом — TB и Redis закрываются за миллисекунды, а активных HTTP-запросов в headless-режиме нет.
Между приёмом SIGTERM и завершением setInterval-таймера может пройти до одной итерации цикла (OUTBOX_INTERVAL_MS, PSP_POLL_INTERVAL_MS, INVOICE_EXPIRY_SWEEP_INTERVAL_MS). Это безопасно — все обработчики идемпотентны или используют lease/atomic-update механики.