Перейти к содержанию

Worker Roles

Payment Manager — единый Node.js процесс, чьё поведение при старте определяется переменной окружения WORKER_ROLES. Один и тот же образ может работать API-сервером, фоновым воркером Outbox, PSP-воркером, sweep-задачей по инвойсам или произвольной их комбинацией. Этот документ описывает, какие роли существуют, какой код они запускают, какие переменные окружения на них влияют и какие инварианты эксплуатации обязаны соблюдаться при масштабировании.


Назначение

WORKER_ROLES — comma-separated список ролей в формате Zod-валидированной строки (src/shared/config.ts):

WORKER_ROLES: z.string()
  .default('api,outbox-worker')
  .transform((s) => s.split(',').map((r) => r.trim()).filter(Boolean))

Дефолт — api,outbox-worker (минимальная single-pod конфигурация для dev/SIT). Парсер устойчив к пробелам и пустым элементам — api, outbox-worker, корректно превратится в ['api','outbox-worker'].

Все ветвления — в src/server.ts после загрузки конфигурации и подключения TigerBeetle:

const roles = config.WORKER_ROLES

if (roles.includes('outbox-worker'))   { ... startOutboxWorker(...) }
if (roles.includes('invoice-expiry'))  { ... startInvoiceExpirySweep(...) }
if (roles.includes('psp-worker'))      { ... startPspWorker(...); startBalanceMonitor(...) }
if (roles.includes('api'))             { app.listen(...) }

Регистрация роли в WORKER_ROLES, которой нет в этом списке, игнорируется молча — никакие ошибки не выбрасываются. Это сознательная гибкость для feature-flag сценариев, но требует аккуратности при опечатках (psp-ipps, outbox, balance-monitor — несуществующие имена, см. раздел «Запрещённые имена ролей»).


Сводная таблица ролей

Роль Запускаемый код Можно несколько процессов? Ключевые env vars Типичная инсталляция
api app.listen({ port: config.PORT, host: '0.0.0.0' }) Да (за L4-балансировщиком) PORT, WORKER_ROLES 2-4 pod-а, HPA по CPU/QPS
outbox-worker startOutboxWorker(config.OUTBOX_INTERVAL_MS) НЕТ — строго 1 процесс OUTBOX_INTERVAL_MS (default 1000) 1 pod, replicas: 1, без HPA
psp-worker startPspWorker(pspName) × PSP_NAMES Да (PG row-level lock) PSP_NAMES, PSP_POLL_INTERVAL_MS, PSP_LEASE_SEC, PSP_RETRY_LEASE_SEC, PSP_MAX_RETRIES 1-3 pod-а
invoice-expiry startInvoiceExpirySweep(intervalMs, batchSize) Да (expire атомарен) INVOICE_EXPIRY_SWEEP_INTERVAL_MS, INVOICE_EXPIRY_BATCH_SIZE 1 pod (несколько безопасны, но избыточны)

Реальные роли

api

HTTP-сервер на Fastify. Регистрирует все маршруты PM (/intents, /intents/:id/confirm, /intents/:id/cancel, /intents/quote, /accounts/balance, /policies/*, admin-роуты, Swagger UI на /docs).

  • Запуск: app.listen({ port: config.PORT, host: '0.0.0.0' }) — финальный шаг после bootstrap-а TB, Redis, миграций каналов и регистрации OperationTypes.
  • Масштабирование: stateless, безопасно держать N pod-ов за L4/L7-балансировщиком. Никаких локальных кешей с записью — все side-effect-ы идут через TB, PostgreSQL или Redis pub/sub.
  • HMAC-аутентификация на каждом запросе (X-Service-Id, X-Timestamp, X-Signature) — без неё API-роуты возвращают 401.
  • Без роли apiapp.listen() не вызывается; процесс работает headless (см. раздел «Headless mode»).

outbox-worker

Опрашивает таблицу pm.outbox_event (статус pending) и материализует решение саги в TigerBeetle — пост-pending для SETTLED, void-pending для FAILED. Также пишет в pm.settlement, обновляет pm.intent.status и публикует событие в Redis (intent.{id}) для подписчиков (Auth Center stream).

  • Запуск: startOutboxWorker(config.OUTBOX_INTERVAL_MS)setInterval каждые OUTBOX_INTERVAL_MS мс (default 1000), батч до 10 событий.
  • Никакой блокировки строкoutbox-worker сейчас читает WHERE status = 'pending' ORDER BY created_at LIMIT 10 без FOR UPDATE SKIP LOCKED.
  • Из этого следует ключевой инвариант: одновременно работающих процессов с ролью outbox-worker должно быть ровно один. Два процесса будут гонкой обрабатывать одни и те же события — двойные post/void в TigerBeetle (TB защитит идемпотентностью pending_transfer_already_posted / pending_transfer_already_voided, но pm.settlement и intent.status могут уйти в неконсистентное состояние).
  • Деплой: Kubernetes Deployment с replicas: 1, strategy.type=Recreate (без rolling — иначе на момент перезапуска возможны 0 или 2 одновременных воркера). HPA отключена.
  • Шкала нагрузки: один воркер обрабатывает ~10 событий/сек на дефолтном интервале; при росте нагрузки увеличивать OUTBOX_INTERVAL_MS вниз (до 200 мс) либо размер батча в коде — НЕ количество процессов.

psp-worker

Опрашивает таблицу pm.psp_tx_map (state-машина IPPS/QP/Wise), вызывает driver.process() для каждого ряда, применяет результат (completed / failed / in-progress / manual-review) и при терминальном исходе вставляет событие в pm.outbox_event для последующего материализации outbox-worker-ом.

  • Запуск: startPspWorker(pspName)setInterval каждые PSP_POLL_INTERVAL_MS мс (default 500). Один процесс запускает по одному таймеру на каждый PSP из PSP_NAMES (default IPPS).
  • Конкурентность безопасна: pickUpJobs() использует SELECT ... FOR UPDATE SKIP LOCKED внутри CTE, плюс write-ahead state transition (NEW → QUERY_PENDING, QUERIED → CONFIRM_PENDING) — параллельные воркеры никогда не возьмут одну и ту же строку.
  • Lease-механизм: leased_by = ${hostname}.${pid}, leased_at = now(). Если процесс упал в середине обработки, строка освобождается через PSP_LEASE_SEC (default 10 сек, без retries) или PSP_RETRY_LEASE_SEC (default 30 сек, с retries).
  • Шкала нагрузки: горизонтально, до количества свободных слотов в PG-пуле. Типично 1-3 pod-а; на спайки заявок можно поднять до 5-6.
  • NEVER retry IPPS transfer requests — драйвер IPPS использует inquiry для проверки статуса вместо повторной отправки transfer (см. docs/dev/integrations/ipps.md).

invoice-expiry

Sweep-задача по двухфазным INVOICE_PAYMENT интентам со статусом CREATED и expires_at < now(). Для каждого вызывает channel.expire() — атомарный UPDATE WHERE status='CREATED' AND expires_at < now(), что гарантирует отсутствие двойного перехода в EXPIRED.

  • Запуск: startInvoiceExpirySweep(config.INVOICE_EXPIRY_SWEEP_INTERVAL_MS, config.INVOICE_EXPIRY_BATCH_SIZE)setInterval каждые 30 сек (default), батч 100 (default).
  • Несколько pod-ов безопасно благодаря атомарному expire() — гонка нескольких воркеров на один просроченный invoice разрешается на уровне PostgreSQL UPDATE (выигрывает первый, остальные получают 0 affected rows и пропускают).
  • На практике достаточно одного pod-а. Несколько процессов не дают выигрыша — они конкурируют за одни и те же 100 строк раз в 30 секунд.
  • Типичная инсталляция: прицепляется к одному из имеющихся pod-ов (WORKER_ROLES=api,invoice-expiry или WORKER_ROLES=outbox-worker,invoice-expiry) — отдельный pod избыточен.

BalanceMonitor — НЕ отдельная роль

BalanceMonitor мониторит расхождение между балансом TB-аккаунта system.nostro.ipps.THB и партнёрским балансом из PSP (driver.getPartnerBalance()). Алерты: low_partner_balance, balance_drift, balance_ok.

Важно: balance-monitor НЕ является самостоятельным значением WORKER_ROLES. Он запускается внутри роли psp-worker при поднятом флаге BALANCE_MONITOR_ENABLED=true:

// src/server.ts
if (roles.includes('psp-worker')) {
  for (const pspName of config.PSP_NAMES) {
    timers.push(startPspWorker(pspName))
    if (config.BALANCE_MONITOR_ENABLED) {
      timers.push(startBalanceMonitor(pspName))
    }
  }
}
  • Интервал: BALANCE_TICK_MS (default 60_000 мс).
  • Пороги: IPPS_LOW_BALANCE_THRESHOLD_SATANG (default 100_000 = 1000 THB), IPPS_DRIFT_THRESHOLD_SATANG (default 10_000 = 100 THB).
  • Запустить только BalanceMonitor (без PSP-воркера) невозможно — флаг не имеет эффекта, если в WORKER_ROLES нет psp-worker.
  • Включать BalanceMonitor в одном pod-еBALANCE_MONITOR_ENABLED=true имеет смысл ставить только на ОДНОМ из psp-worker pod-ов, чтобы не множить алерты в логах в N раз. Сам по себе мониторинг идемпотентен и безопасен в нескольких экземплярах, но шумит.

Совмещение ролей в одном процессе

Роли через запятую — стандартный способ деплоить single-pod конфигурации:

# Dev/SIT — всё в одном процессе:
WORKER_ROLES=api,outbox-worker,psp-worker,invoice-expiry

# Prod (минимум) — API отдельно, фон отдельно:
# Pod A: WORKER_ROLES=api
# Pod B: WORKER_ROLES=outbox-worker,invoice-expiry
# Pod C-N: WORKER_ROLES=psp-worker

Никаких ограничений на совмещение нет — каждая ветка if (roles.includes(...)) независима. Единственный инвариант — суммарное число процессов с ролью outbox-worker по всему кластеру должно быть строго 1 (см. раздел outbox-worker).

Все роли разделяют те же подключения к PostgreSQL, TigerBeetle и Redis в рамках одного процесса (см. src/shared/db.ts, src/shared/tb.ts, src/shared/redis.ts), поэтому совмещение в dev не требует никаких дополнительных ресурсов.


Headless mode

Если в WORKER_ROLES отсутствует роль api, процесс не вызывает app.listen() — HTTP-сервер не поднимается:

if (roles.includes('api')) {
  app.listen({ port: config.PORT, host: '0.0.0.0' }, ...)
} else {
  logger.info({ roles }, 'PM started in headless mode (no api role)')
}

При этом Fastify-app всё равно собран и зарегистрированы все плагины — buildApp() отрабатывает полностью. Это нужно потому, что: - background-роли используют те же модули (registerChannel, bootstrapPspDrivers, OperationTypes registry), что и API-роуты; - app.close() корректно вызывает onClose-хуки (closeTb(), closeRedis(), остановка таймеров).

Сигнальные обработчики ставятся всегда, независимо от наличия api:

process.once('SIGTERM', () => { void shutdown('SIGTERM') })
process.once('SIGINT',  () => { void shutdown('SIGINT')  })

Без них в headless-режиме SIGTERM от Kubernetes убил бы процесс до того, как Fastify dispose-ит TB/Redis сокеты и остановит setInterval-таймеры (в headless-режиме Fastify сам сигналы не ловит — нечему слушать).

Health-эндпоинты в headless-режиме недоступны/livez, /readyz обслуживаются Fastify, который не слушает порт. Для liveness/readiness в headless-pod-ах используйте exec-пробы (pgrep node или скрипт, проверяющий PID-файл).


Запрещённые имена ролей

Не существуют как значения WORKER_ROLES (опечатки приведут к молчаливому пропуску):

Опечатка / устаревшее имя Правильно
outbox outbox-worker
psp-ipps psp-worker (выбор PSP — через PSP_NAMES)
balance-monitor psp-worker + BALANCE_MONITOR_ENABLED=true
balance, monitor то же
invoice, expiry invoice-expiry

Если pod стартует и в логах нет ни одной строки ... starting (PspWorker starting, BalanceMonitor starting) и нет Fastify listening on ... — значит, скорее всего, опечатка в WORKER_ROLES. Проверьте config.WORKER_ROLES сразу после buildApp() или загляните в строку PM started in headless mode — она показывает фактический массив ролей.


Инварианты эксплуатации

  1. Ровно один outbox-worker процесс в любой момент времени по всему кластеру. Деплой через replicas: 1 + strategy.type=Recreate.
  2. psp-worker, invoice-expiry, api — горизонтально масштабируемы, гонки разрешаются на уровне PostgreSQL (FOR UPDATE SKIP LOCKED для PSP, атомарный UPDATE для invoice-expiry, stateless для api).
  3. BALANCE_MONITOR_ENABLED=true — ставить только на ОДНОМ из psp-worker pod-ов; на остальных оставлять false, чтобы не дублировать алерты в low_partner_balance / balance_drift.
  4. Headless mode допустим (без api) — сигнальные обработчики и onClose-хуки работают корректно; health-пробы должны быть exec-based, а не HTTP.
  5. HMAC-аутентификация не зависит от ролей — она применяется к запросам в API-роуты. Background-роли HMAC не используют (внутрипроцессные вызовы PostgreSQL и TigerBeetle SDK не подписываются).

Startup-последовательность (любая комбинация ролей)

src/server.ts выполняет одну и ту же подготовку для всех ролей — это важно, чтобы headless-pod-ы тоже видели каналы, OperationTypes и подключения:

  1. Загрузка config из env через Zod (src/shared/config.ts).
  2. buildApp() — регистрация всех Fastify-плагинов и роутов (даже если api не указан).
  3. connectTb(config.TB_ADDRESS) с retry-loop: 5 попыток, экспоненциальная задержка 2000 × attempt мс. При полной недоступности TB — process.exit(1).
  4. seedAccounts(db) + verifySystemAccounts(db) — гарантия наличия system-аккаунтов в TB.
  5. registerChannel(...)internal-p2p, ipps-transfer, service-transfer, admin-transfer, merchant-invoice.
  6. reconcile(db) — startup-reconciler для зависших intent-ов.
  7. bootstrapPspDrivers() — регистрация PSP-драйверов в registry (нужно даже для headless-pod-ов с outbox-worker, чтобы при необходимости подгрузить metadata).
  8. Ветвления по ролям → setInterval-таймеры в массив timers.
  9. process.once('SIGTERM'|'SIGINT', shutdown) — всегда.
  10. app.listen(...) только при наличии роли api.

Любой шаг 3-8 может выбросить — в этом случае process.exit(1) с залогированной ошибкой; Kubernetes перезапустит pod.


Graceful shutdown

app.addHook('onClose', ...) регистрирует три cleanup-хука (в порядке регистрации):

  1. closeTb() — закрывает соединение с TigerBeetle.
  2. closeRedis() — закрывает Redis-клиент (pub/sub и кэши).
  3. Остановка всех setInterval-таймеров через clearInterval(t).

Хендлер shutdown(signal) вызывает app.close(), затем process.exit(0). Если app.close() зависнет (например, не отвечает PostgreSQL), Kubernetes принудительно убьёт процесс через terminationGracePeriodSeconds (по умолчанию 30 сек). На практике этого хватает с большим запасом — TB и Redis закрываются за миллисекунды, а активных HTTP-запросов в headless-режиме нет.

Между приёмом SIGTERM и завершением setInterval-таймера может пройти до одной итерации цикла (OUTBOX_INTERVAL_MS, PSP_POLL_INTERVAL_MS, INVOICE_EXPIRY_SWEEP_INTERVAL_MS). Это безопасно — все обработчики идемпотентны или используют lease/atomic-update механики.