Перейти к содержанию

Notifications Service — Integrations

Redis Stream (Inbound Jobs)

Stream Configuration

Setting Value
Stream name stream.notifications.jobs (env: JOBS_STREAM)
Consumer group notifications-worker (env: CONSUMER_GROUP)
Consumer name worker-{hostname}-{pid} (env: CONSUMER_NAME, auto-generated if not set)
Block timeout 5000ms (env: BLOCK_MS)
XAUTOCLAIM idle threshold 30000ms (env: AUTOCLAIM_MIN_IDLE_MS)
Max delivery count 5 (env: MAX_DELIVERY_COUNT)

The consumer group is created with MKSTREAM on startup — if it already exists the error is silently ignored (BUSYGROUP). XAUTOCLAIM runs at startup and then on a recurring timer (interval = AUTOCLAIM_MIN_IDLE_MS) to reclaim messages left pending by a crashed/slow consumer. Messages whose delivery count reaches MAX_DELIVERY_COUNT are ACKed and dropped (logged as notifications.max_retries). Implementation: src/consumer.js (ensureGroup, claimPending, handleMessage).

Message Format

Messages are sent using XADD stream.notifications.jobs * key1 val1 key2 val2 ...

Flat field format (preferred):

userId    <integer as string>
title     <notification title, pre-localized>
body      <notification body, pre-localized>
channel   payments | security | kyc | system
traceId   <intent_id or uuid, optional>
data      <JSON string, optional>
badge     <integer as string, optional>

JSON payload format (alternative):

payload   {"userId":1,"title":"...","body":"...","channel":"payments","traceId":"..."}

Both formats are accepted. userId, title, body are required. channel defaults to system if omitted.

Who Publishes

Publisher Scenarios
Payment Manager Payment sent, received, pending, failed
Auth Center (Serverpod) Login alerts, password changes, KYC status updates
KYC Service KYC approved / rejected (via Auth Center in practice)

Important: Publishers are responsible for localizing title and body to the user's locale before publishing. The notifications service does not translate.

Firebase FCM

  • SDK: firebase-admin ^12.4.0
  • Initialized from a service account JSON file or inline JSON string
  • Uses admin.messaging().send(message) — one call per device token
  • Firebase project is identified by the project_id field in the service account JSON

Credential Configuration

Either set FCM_SERVICE_ACCOUNT_PATH (path to a JSON file) or FCM_SERVICE_ACCOUNT_JSON (the full JSON content as a string). At least one is required — the service exits on startup if neither is set.

In production, mount the service account file as a Kubernetes secret or Docker secret and point FCM_SERVICE_ACCOUNT_PATH at it.

PostgreSQL

The service uses two schemas:

Read: public.device_token

Table owned by Auth Center (Serverpod). The notifications service has SELECT and DELETE access.

SELECT id, "token", "platform" FROM public.device_token WHERE "userId" = $1

Column names are camelCase with double quotes (Serverpod convention).

⚠️ The query does not currently select "appId", so no per-app filtering happens (see «Multi-app routing» below).

Multi-app routing (TODO — Блок 4)

device_token.appId определяет приложение-источник токена:

appId Flutter приложение FCM проект
wallet onewallet_base_flutter FCM Project A
closeloop one_loop_app FCM Project B
merchant one_merchant_app FCM Project C

Текущее состояние: - ✅ appId добавлен в public.device_token (см. device_token.spy.yaml, колонка appId String? + индекс device_token_app_idx). - ✅ appId заполняется при регистрации: NotificationEndpoint.registerDevice(...required String appId...), и Flutter (push_notification_service.dart) передаёт его при вызове. - ❌ Сервис ещё не фильтрует по appId: fetchTokens() (src/handler.js) не выбирает колонку appId, поэтому все токены пользователя получают push вне зависимости от приложения. - ❌ Используется один FCM app instance (src/fcm.js, единственный app) — multi-project routing не реализован.

Для multi-app routing нужно: 1. Инициализировать несколько FCM app instances (по одному на каждый FCM проект) в src/fcm.js 2. В fetchTokens() (src/handler.js) выбрать "appId" и маршрутизировать токен в нужный FCM instance 3. PM/публикатор должен передавать целевой appId в сообщении в stream (поле в data JSON), чтобы ограничить рассылку нужным приложением

Write: notifications.notification_log

Table owned by notifications service. Created by sql/001_init.sql.

CREATE TABLE notifications.notification_log (
  id               BIGSERIAL PRIMARY KEY,
  user_id          INTEGER     NOT NULL,
  channel          TEXT        NOT NULL,     -- payments | security | kyc | system
  title            TEXT        NOT NULL,
  body             TEXT        NOT NULL,
  trace_id         TEXT,                     -- intent_id for payments, uuid for security
  data             JSONB,
  status           TEXT        NOT NULL,     -- pending | sent | partial | failed | no_tokens
  tokens_targeted  INTEGER     NOT NULL DEFAULT 0,
  tokens_succeeded INTEGER     NOT NULL DEFAULT 0,
  tokens_failed    INTEGER     NOT NULL DEFAULT 0,
  response         JSONB,                    -- [{tokenId, fcmMessageId?, errorCode?}]
  delivery_count   INTEGER     NOT NULL DEFAULT 1,
  created_at       TIMESTAMPTZ NOT NULL DEFAULT NOW(),
  processed_at     TIMESTAMPTZ
);

Indexes: (user_id, created_at DESC), (trace_id) where not null, (status, created_at DESC).

Database Grants

The notifications_user database role needs access to both schemas. Run once by DBA (not included in 001_init.sql):

GRANT USAGE ON SCHEMA public TO notifications_user;
GRANT SELECT, DELETE ON public.device_token TO notifications_user;
GRANT ALL ON SCHEMA notifications TO notifications_user;
GRANT ALL ON ALL TABLES IN SCHEMA notifications TO notifications_user;
GRANT ALL ON ALL SEQUENCES IN SCHEMA notifications TO notifications_user;

Connection Configuration

Redis

Env var Default Description
REDIS_URL redis://localhost:6379 Redis connection URL

PostgreSQL

Env var Default Description
PG_HOST localhost PostgreSQL host
PG_PORT 5432 PostgreSQL port
PG_DATABASE — (required) Database name
PG_USER — (required) Database user
PG_PASSWORD — (required) Database password