Redis — интеграция¶
Redis в Payment Manager (PM) используется как легковесный транспорт событий: pub/sub-канал intent.<id> для real-time обновлений статуса интента (его читает Auth Center в streamStatus) и Redis Stream stream.notifications.jobs — для постановки задач на отправку push-уведомлений в Notifications Service. В Phase 2B Redis Streams станут основным транспортом между PM и внешними PSP-адаптерами (IPPS, QP), но в Phase 1 этот контур ещё не активирован — заготовка описана ниже.
1. Назначение¶
В Phase 1 Redis выполняет две роли:
- Pub/Sub — однонаправленная доставка fire-and-forget событий «статус интента изменился». Подписчик — Auth Center, который через WebSocket-метод
streamStatusретранслирует событие во Flutter-приложение. PM не хранит подписчиков и не ждёт подтверждения доставки. - Streams (XADD) — durable очередь задач на push-уведомления. Producer — PM (после
SETTLED/FAILEDинтента), consumer — Notifications Service. Используется только запись (XADD), чтение/ack — на стороне worker-а уведомлений.
Redis не используется в Phase 1 как:
- транспорт между PM и IPPS adapter — Phase 1 IPPS работает in-process через PostgreSQL-очередь
psp_tx_map(см. ipps.md); - хранилище состояния — состояние интента и saga живёт в PostgreSQL (
pm.*); - кэш — на момент Phase 1 кэширование на PM не используется.
Любая ошибка публикации в Redis считается non-fatal: интент уже зафиксирован в БД и TigerBeetle, поэтому отсутствие события не нарушает целостность данных — лишь задерживает real-time UX.
2. Pub/Sub: канал intent.<id>¶
| Параметр | Значение |
|---|---|
| Тип | Redis PUBLISH (без persistence) |
| Канал | intent.<intentId> — отдельный канал на каждый интент |
| Producer | PM, функция publishIntentStatus в src/intent/intent-events.ts |
| Consumer | Auth Center, streamStatus (Serverpod method-stream) |
| Payload | JSON: { intentId, status, updatedAt } |
| Доставка | At-most-once, best-effort — без подтверждения, без ретрая |
Публикация всегда сопровождается записью события в таблицу pm.intent_event через writeIntentEvent: БД — источник истины, Redis — лишь сигнал «обнови подписчиков». Если Redis недоступен, событие в БД всё равно появится, а ошибка публикации логируется как warn и не пробрасывается наружу:
// src/intent/intent-events.ts
export async function publishIntentStatus(intentId: string, status: IntentStatus): Promise<void> {
await getRedis().publish(`intent.${intentId}`,
JSON.stringify({ intentId, status, updatedAt: new Date().toISOString() }))
}
Вызывается из writeIntentEvent через опциональный аргумент publishFn: тестам передают undefined (без публикации), production-код — publishIntentStatus.
Жизненный цикл подписки¶
- Flutter-клиент вызывает Auth Center
streamStatus(intentId). - Auth Center подписывается на
intent.<intentId>через свой Redis-клиент. - PM публикует переходы статусов:
PENDING → SETTLED,PENDING → FAILED,PENDING → CANCELEDи т.п. - Auth Center ретранслирует JSON-payload в WebSocket клиенту.
- Клиент отписывается → Auth Center отписывается → канал автоматически удаляется (Redis pub/sub не хранит каналы без подписчиков).
3. Streams (Phase 1): stream.notifications.jobs¶
| Параметр | Значение |
|---|---|
| Тип | Redis Stream (XADD) |
| Stream key | stream.notifications.jobs |
| Producer | PM, функция publishPaymentNotification в src/intent/notify.ts |
| Consumer | Notifications Service (отдельный Node.js-сервис, FCM worker) |
| Payload | JSON-поле payload: { userId, title, body, channel, traceId, data } |
| Доставка | At-least-once (Stream + consumer group на стороне Notifications Service) |
Это единственный Stream, активно используемый PM в Phase 1. Producer — PM-handler-ы saga (после фиксации SETTLED/FAILED), consumer — отдельный сервис (projects/notifications-service), который читает Stream и отправляет push через FCM. PM не управляет consumer-group-ами — это ответственность подписчика.
Сообщения формируются с учётом канала интента:
INVOICE_PAYMENT→ два уведомления: покупателю (Payment sent) и мерчанту (Payment received).INTERNAL_P2P/SERVICE_TRANSFER/ADMIN→ отправителю (Sent/${label} Complete) и, если вmetadata.recipientUserIdзадан получатель, ему —Received.- прочие операции (TOPUP, WITHDRAWAL, PAYMENT) → только отправителю.
FAILED→ одно уведомление инициатору (Payment Failed).
Ошибка XADD логируется как warn и не блокирует завершение saga — push-уведомление считается «приятным дополнением», а не частью контракта платежа.
4. Phase 2B: заготовка Redis Streams для PSP-адаптеров¶
Заготовка на будущее. В Phase 2B все PSP-адаптеры (IPPS, QP) будут вынесены в отдельные Node.js-процессы, общающиеся с PM исключительно через Redis Streams — никакого прямого HTTP-вызова из PM в адаптеры (и наоборот) быть не должно. Ниже — справочный перечень streams из корневого
CLAUDE.md. В Phase 1 эти ключи ещё не используются.
| Stream | Producer → Consumer | Назначение | Статус в Phase 1 |
|---|---|---|---|
stream.ipps.jobs |
PM → IPPS Adapter | Постановка transfer/inquiry задач в IPPS | не используется |
stream.ipps.results |
IPPS Adapter → PM | Возврат результатов IPPS (success/failure/inquiry-status) | не используется |
stream.qp.jobs |
PM → QP Adapter | Постановка задач в QP-провайдер | не используется |
stream.qp.results |
QP Adapter → PM | Возврат результатов QP | не используется |
stream.webhook.ipps |
Webhook Gateway → PM | Доставка асинхронных IPPS-webhook-ов в PM | не используется |
stream.notifications.jobs |
PM / Auth Center / KYC → Notifications | Push-уведомления через FCM | активен в Phase 1 |
Миграция Phase 1 → Phase 2B:
- IPPS-логика переезжает из in-process
psp-worker(PostgreSQL queuepsp_tx_map) в отдельный процесс, читающийstream.ipps.jobs. - Webhook-приёмник IPPS вырезается из PM в отдельный gateway, который пишет в
stream.webhook.ipps; PM-worker читает его и обновляет интенты. - БД-очередь
psp_tx_mapостаётся только как audit-trail (mappingintentId ↔ pspTxRef), а драйв обработки переезжает в Streams.
До этого момента ключи stream.ipps.*, stream.qp.*, stream.webhook.ipps в PM не создаются и не читаются.
5. Connection¶
Реализация: src/shared/redis.ts.
- Singleton-клиент через
ioredis. Один процесс PM держит один TCP-коннект к Redis на весь жизненный цикл; коннект создаётся лениво при первомgetRedis(). lazyConnect: false— клиент подключается сразу при создании;maxRetriesPerRequest: 3— ограничивает накопление зависших команд при недоступности Redis.- Обработчик
errorлогирует ошибки соединения какerror, но не падает — публикации best-effort. closeRedis()вызывается при graceful shutdown (signal-handler вbootstrap) и закрывает соединение черезquit().
// src/shared/redis.ts
let client: Redis | null = null
export function getRedis(): Redis {
if (!client) {
client = new Redis(config.REDIS_URL, { lazyConnect: false, maxRetriesPerRequest: 3 })
client.on('error', (err) => logger.error({ err }, 'redis: connection error'))
}
return client
}
Отдельный клиент для уведомлений¶
src/intent/notify.ts поддерживает второй Redis-клиент — для случая, когда Notifications Service живёт в отдельном Redis-кластере (например, чтобы push-нагрузка не мешала intent-pub/sub):
// src/intent/notify.ts
function getNotifRedis(): Redis {
if (config.NOTIFICATIONS_REDIS_URL) {
if (!notifClient) {
notifClient = new Redis(config.NOTIFICATIONS_REDIS_URL, { lazyConnect: false, maxRetriesPerRequest: 3 })
notifClient.on('error', (err) => logger.error({ err }, 'notify: notifications Redis connection error'))
}
return notifClient
}
return getRedis() // fallback на общий клиент
}
Поведение:
- если
NOTIFICATIONS_REDIS_URLзадан →XADDидёт в отдельный кластер; - если не задан → fallback на общий
REDIS_URL-клиент (и intent-pub/sub, и notifications живут в одном Redis).
6. Переменные окружения¶
| Env | Назначение | Обязательность |
|---|---|---|
REDIS_URL |
Основной Redis: pub/sub intent.<id> + дефолт для streams |
обязательна |
NOTIFICATIONS_REDIS_URL |
Отдельный Redis для stream.notifications.jobs (опционально) |
опциональна |
Формат URL — стандартный redis://[:password@]host:port[/db] (см. ioredis docs). При отсутствии NOTIFICATIONS_REDIS_URL PM работает с одним Redis-инстансом для всех целей — это допустимый сценарий для dev/staging.
7. Поведение при сбоях Redis¶
Redis в Phase 1 — не критичная зависимость PM с точки зрения целостности данных. Возможные сценарии:
| Сценарий | Последствия для интента | Последствия для UX |
|---|---|---|
Redis недоступен при publishIntentStatus |
Интент уже зафиксирован в БД и TB — без эффекта | streamStatus не получит событие; клиент полагается на polling/retry-логику Auth Center |
Redis недоступен при XADD notifications |
Интент уже зафиксирован — без эффекта | Push-уведомление не отправится (FCM-сообщения теряются для конкретного интента) |
Сетевые таймауты (maxRetriesPerRequest: 3) |
Команда отклоняется после 3 попыток | То же, что выше |
| Auth Center потерял подписку (re-connect) | — | Auth Center должен пере-подписаться и дочитать состояние через REST GET /intents/:id |
Что никогда не происходит: интент не «зависает» в PENDING из-за Redis. Все статусные переходы пишутся в pm.intent_event транзакционно с обновлением pm.intent.status; Redis-публикация — пост-фактум сигнализация.
Идемпотентность¶
publishIntentStatusне идемпотентна на уровне Redis (повторныйPUBLISHотправит сообщение ещё раз), но получатель (streamStatus→ Flutter) должен быть толерантен к повторам — payload содержитintentId+status+updatedAt, дублирование безопасно.XADDвstream.notifications.jobs— at-least-once: Notifications Service сам обеспечивает идемпотентность поtraceId(он жеintentId), чтобы не отправить два push-а на один и тот же интент при retry consumer-группы.
8. Observability¶
Метрики (план Phase 2)¶
В Phase 1 метрики по Redis не собираются явно; ошибки логируются. Phase 2 добавит:
redis_publish_total{channel="intent",result="ok|error"}— счётчик публикаций intent-status;redis_xadd_total{stream,result}— счётчик добавлений в streams;redis_connection_state— gauge состояния соединения (connected/reconnecting/end).
Логи¶
Все Redis-операции в PM логируются через общий pino-logger (src/shared/logger.ts):
redis: connection error— ошибка TCP-соединения (levelerror);intent-events: Redis publish failed (non-fatal)— публикация вintent.<id>упала (warn, сintentIdиstatusTo);notify: publishing to stream— стартXADD(info, сintentId,status,count);notify: stream publish ok— успешныйXADD(info);notify: Redis XADD failed (non-fatal)—XADDупал (warn, сintentId).
trace_id (он же intentId) присутствует во всех платёжных логах — для сквозной корреляции с TB transfer-id и event-trail в pm.intent_event.
9. Тестирование¶
В Vitest-тестах Redis не поднимается — публикации мокаются:
publishIntentStatusвызывается через опциональныйpublishFnвwriteIntentEvent— в тестах аргумент не передаётся (publishFn = undefined), сетевой вызов отсутствует.publishPaymentNotificationв saga-тестах либо обёрнут моком (vi.mock('../intent/notify.js')), либо тест проверяет только запись вpm.intent_eventиpm.intent.status, игнорируя side-effect уведомлений.
Интеграционные тесты Redis (живое соединение) пишутся только для smoke-проверки ioredis-клиента в CI с docker-compose Redis — отдельный набор, не входящий в основной npm test.
10. Связанные документы¶
- docs/dev/integrations/auth-center.md — Auth Center как consumer pub/sub
intent.<id>. - docs/dev/integrations/ipps.md — Phase 1 IPPS через PostgreSQL queue (Redis Streams не используется).
- /CLAUDE.md — таблица Phase 2B Redis Streams и кросс-сервисные правила.
- docs/dev/modules/workers.md — место
publishIntentStatusиpublishPaymentNotificationв жизненном цикле saga.