Перейти к содержанию

Redis — интеграция

Redis в Payment Manager (PM) используется как легковесный транспорт событий: pub/sub-канал intent.<id> для real-time обновлений статуса интента (его читает Auth Center в streamStatus) и Redis Stream stream.notifications.jobs — для постановки задач на отправку push-уведомлений в Notifications Service. В Phase 2B Redis Streams станут основным транспортом между PM и внешними PSP-адаптерами (IPPS, QP), но в Phase 1 этот контур ещё не активирован — заготовка описана ниже.


1. Назначение

В Phase 1 Redis выполняет две роли:

  1. Pub/Sub — однонаправленная доставка fire-and-forget событий «статус интента изменился». Подписчик — Auth Center, который через WebSocket-метод streamStatus ретранслирует событие во Flutter-приложение. PM не хранит подписчиков и не ждёт подтверждения доставки.
  2. Streams (XADD) — durable очередь задач на push-уведомления. Producer — PM (после SETTLED/FAILED интента), consumer — Notifications Service. Используется только запись (XADD), чтение/ack — на стороне worker-а уведомлений.

Redis не используется в Phase 1 как:

  • транспорт между PM и IPPS adapter — Phase 1 IPPS работает in-process через PostgreSQL-очередь psp_tx_map (см. ipps.md);
  • хранилище состояния — состояние интента и saga живёт в PostgreSQL (pm.*);
  • кэш — на момент Phase 1 кэширование на PM не используется.

Любая ошибка публикации в Redis считается non-fatal: интент уже зафиксирован в БД и TigerBeetle, поэтому отсутствие события не нарушает целостность данных — лишь задерживает real-time UX.


2. Pub/Sub: канал intent.<id>

Параметр Значение
Тип Redis PUBLISH (без persistence)
Канал intent.<intentId> — отдельный канал на каждый интент
Producer PM, функция publishIntentStatus в src/intent/intent-events.ts
Consumer Auth Center, streamStatus (Serverpod method-stream)
Payload JSON: { intentId, status, updatedAt }
Доставка At-most-once, best-effort — без подтверждения, без ретрая

Публикация всегда сопровождается записью события в таблицу pm.intent_event через writeIntentEvent: БД — источник истины, Redis — лишь сигнал «обнови подписчиков». Если Redis недоступен, событие в БД всё равно появится, а ошибка публикации логируется как warn и не пробрасывается наружу:

// src/intent/intent-events.ts
export async function publishIntentStatus(intentId: string, status: IntentStatus): Promise<void> {
  await getRedis().publish(`intent.${intentId}`,
    JSON.stringify({ intentId, status, updatedAt: new Date().toISOString() }))
}

Вызывается из writeIntentEvent через опциональный аргумент publishFn: тестам передают undefined (без публикации), production-код — publishIntentStatus.

Жизненный цикл подписки

  1. Flutter-клиент вызывает Auth Center streamStatus(intentId).
  2. Auth Center подписывается на intent.<intentId> через свой Redis-клиент.
  3. PM публикует переходы статусов: PENDING → SETTLED, PENDING → FAILED, PENDING → CANCELED и т.п.
  4. Auth Center ретранслирует JSON-payload в WebSocket клиенту.
  5. Клиент отписывается → Auth Center отписывается → канал автоматически удаляется (Redis pub/sub не хранит каналы без подписчиков).

3. Streams (Phase 1): stream.notifications.jobs

Параметр Значение
Тип Redis Stream (XADD)
Stream key stream.notifications.jobs
Producer PM, функция publishPaymentNotification в src/intent/notify.ts
Consumer Notifications Service (отдельный Node.js-сервис, FCM worker)
Payload JSON-поле payload: { userId, title, body, channel, traceId, data }
Доставка At-least-once (Stream + consumer group на стороне Notifications Service)

Это единственный Stream, активно используемый PM в Phase 1. Producer — PM-handler-ы saga (после фиксации SETTLED/FAILED), consumer — отдельный сервис (projects/notifications-service), который читает Stream и отправляет push через FCM. PM не управляет consumer-group-ами — это ответственность подписчика.

Сообщения формируются с учётом канала интента:

  • INVOICE_PAYMENT → два уведомления: покупателю (Payment sent) и мерчанту (Payment received).
  • INTERNAL_P2P / SERVICE_TRANSFER / ADMIN → отправителю (Sent / ${label} Complete) и, если в metadata.recipientUserId задан получатель, ему — Received.
  • прочие операции (TOPUP, WITHDRAWAL, PAYMENT) → только отправителю.
  • FAILED → одно уведомление инициатору (Payment Failed).

Ошибка XADD логируется как warn и не блокирует завершение saga — push-уведомление считается «приятным дополнением», а не частью контракта платежа.


4. Phase 2B: заготовка Redis Streams для PSP-адаптеров

Заготовка на будущее. В Phase 2B все PSP-адаптеры (IPPS, QP) будут вынесены в отдельные Node.js-процессы, общающиеся с PM исключительно через Redis Streams — никакого прямого HTTP-вызова из PM в адаптеры (и наоборот) быть не должно. Ниже — справочный перечень streams из корневого CLAUDE.md. В Phase 1 эти ключи ещё не используются.

Stream Producer → Consumer Назначение Статус в Phase 1
stream.ipps.jobs PM → IPPS Adapter Постановка transfer/inquiry задач в IPPS не используется
stream.ipps.results IPPS Adapter → PM Возврат результатов IPPS (success/failure/inquiry-status) не используется
stream.qp.jobs PM → QP Adapter Постановка задач в QP-провайдер не используется
stream.qp.results QP Adapter → PM Возврат результатов QP не используется
stream.webhook.ipps Webhook Gateway → PM Доставка асинхронных IPPS-webhook-ов в PM не используется
stream.notifications.jobs PM / Auth Center / KYC → Notifications Push-уведомления через FCM активен в Phase 1

Миграция Phase 1 → Phase 2B:

  1. IPPS-логика переезжает из in-process psp-worker (PostgreSQL queue psp_tx_map) в отдельный процесс, читающий stream.ipps.jobs.
  2. Webhook-приёмник IPPS вырезается из PM в отдельный gateway, который пишет в stream.webhook.ipps; PM-worker читает его и обновляет интенты.
  3. БД-очередь psp_tx_map остаётся только как audit-trail (mapping intentId ↔ pspTxRef), а драйв обработки переезжает в Streams.

До этого момента ключи stream.ipps.*, stream.qp.*, stream.webhook.ipps в PM не создаются и не читаются.


5. Connection

Реализация: src/shared/redis.ts.

  • Singleton-клиент через ioredis. Один процесс PM держит один TCP-коннект к Redis на весь жизненный цикл; коннект создаётся лениво при первом getRedis().
  • lazyConnect: false — клиент подключается сразу при создании; maxRetriesPerRequest: 3 — ограничивает накопление зависших команд при недоступности Redis.
  • Обработчик error логирует ошибки соединения как error, но не падает — публикации best-effort.
  • closeRedis() вызывается при graceful shutdown (signal-handler в bootstrap) и закрывает соединение через quit().
// src/shared/redis.ts
let client: Redis | null = null

export function getRedis(): Redis {
  if (!client) {
    client = new Redis(config.REDIS_URL, { lazyConnect: false, maxRetriesPerRequest: 3 })
    client.on('error', (err) => logger.error({ err }, 'redis: connection error'))
  }
  return client
}

Отдельный клиент для уведомлений

src/intent/notify.ts поддерживает второй Redis-клиент — для случая, когда Notifications Service живёт в отдельном Redis-кластере (например, чтобы push-нагрузка не мешала intent-pub/sub):

// src/intent/notify.ts
function getNotifRedis(): Redis {
  if (config.NOTIFICATIONS_REDIS_URL) {
    if (!notifClient) {
      notifClient = new Redis(config.NOTIFICATIONS_REDIS_URL, { lazyConnect: false, maxRetriesPerRequest: 3 })
      notifClient.on('error', (err) => logger.error({ err }, 'notify: notifications Redis connection error'))
    }
    return notifClient
  }
  return getRedis() // fallback на общий клиент
}

Поведение:

  • если NOTIFICATIONS_REDIS_URL задан → XADD идёт в отдельный кластер;
  • если не задан → fallback на общий REDIS_URL-клиент (и intent-pub/sub, и notifications живут в одном Redis).

6. Переменные окружения

Env Назначение Обязательность
REDIS_URL Основной Redis: pub/sub intent.<id> + дефолт для streams обязательна
NOTIFICATIONS_REDIS_URL Отдельный Redis для stream.notifications.jobs (опционально) опциональна

Формат URL — стандартный redis://[:password@]host:port[/db] (см. ioredis docs). При отсутствии NOTIFICATIONS_REDIS_URL PM работает с одним Redis-инстансом для всех целей — это допустимый сценарий для dev/staging.


7. Поведение при сбоях Redis

Redis в Phase 1 — не критичная зависимость PM с точки зрения целостности данных. Возможные сценарии:

Сценарий Последствия для интента Последствия для UX
Redis недоступен при publishIntentStatus Интент уже зафиксирован в БД и TB — без эффекта streamStatus не получит событие; клиент полагается на polling/retry-логику Auth Center
Redis недоступен при XADD notifications Интент уже зафиксирован — без эффекта Push-уведомление не отправится (FCM-сообщения теряются для конкретного интента)
Сетевые таймауты (maxRetriesPerRequest: 3) Команда отклоняется после 3 попыток То же, что выше
Auth Center потерял подписку (re-connect) Auth Center должен пере-подписаться и дочитать состояние через REST GET /intents/:id

Что никогда не происходит: интент не «зависает» в PENDING из-за Redis. Все статусные переходы пишутся в pm.intent_event транзакционно с обновлением pm.intent.status; Redis-публикация — пост-фактум сигнализация.

Идемпотентность

  • publishIntentStatus не идемпотентна на уровне Redis (повторный PUBLISH отправит сообщение ещё раз), но получатель (streamStatus → Flutter) должен быть толерантен к повторам — payload содержит intentId + status + updatedAt, дублирование безопасно.
  • XADD в stream.notifications.jobs — at-least-once: Notifications Service сам обеспечивает идемпотентность по traceId (он же intentId), чтобы не отправить два push-а на один и тот же интент при retry consumer-группы.

8. Observability

Метрики (план Phase 2)

В Phase 1 метрики по Redis не собираются явно; ошибки логируются. Phase 2 добавит:

  • redis_publish_total{channel="intent",result="ok|error"} — счётчик публикаций intent-status;
  • redis_xadd_total{stream,result} — счётчик добавлений в streams;
  • redis_connection_state — gauge состояния соединения (connected/reconnecting/end).

Логи

Все Redis-операции в PM логируются через общий pino-logger (src/shared/logger.ts):

  • redis: connection error — ошибка TCP-соединения (level error);
  • intent-events: Redis publish failed (non-fatal) — публикация в intent.<id> упала (warn, с intentId и statusTo);
  • notify: publishing to stream — старт XADD (info, с intentId, status, count);
  • notify: stream publish ok — успешный XADD (info);
  • notify: Redis XADD failed (non-fatal)XADD упал (warn, с intentId).

trace_id (он же intentId) присутствует во всех платёжных логах — для сквозной корреляции с TB transfer-id и event-trail в pm.intent_event.


9. Тестирование

В Vitest-тестах Redis не поднимается — публикации мокаются:

  • publishIntentStatus вызывается через опциональный publishFn в writeIntentEvent — в тестах аргумент не передаётся (publishFn = undefined), сетевой вызов отсутствует.
  • publishPaymentNotification в saga-тестах либо обёрнут моком (vi.mock('../intent/notify.js')), либо тест проверяет только запись в pm.intent_event и pm.intent.status, игнорируя side-effect уведомлений.

Интеграционные тесты Redis (живое соединение) пишутся только для smoke-проверки ioredis-клиента в CI с docker-compose Redis — отдельный набор, не входящий в основной npm test.


10. Связанные документы