Notifications Service — Integrations¶
Redis Stream (Inbound Jobs)¶
Stream Configuration¶
| Setting | Value |
|---|---|
| Stream name | stream.notifications.jobs (env: JOBS_STREAM) |
| Consumer group | notifications-worker (env: CONSUMER_GROUP) |
| Consumer name | worker-{hostname}-{pid} (env: CONSUMER_NAME, auto-generated if not set) |
| Block timeout | 5000ms (env: BLOCK_MS) |
| XAUTOCLAIM idle threshold | 30000ms (env: AUTOCLAIM_MIN_IDLE_MS) |
| Max delivery count | 5 (env: MAX_DELIVERY_COUNT) |
The consumer group is created with MKSTREAM on startup — if it already exists the error is silently ignored (BUSYGROUP). XAUTOCLAIM runs at startup and then on a recurring timer (interval = AUTOCLAIM_MIN_IDLE_MS) to reclaim messages left pending by a crashed/slow consumer. Messages whose delivery count reaches MAX_DELIVERY_COUNT are ACKed and dropped (logged as notifications.max_retries). Implementation: src/consumer.js (ensureGroup, claimPending, handleMessage).
Message Format¶
Messages are sent using XADD stream.notifications.jobs * key1 val1 key2 val2 ...
Flat field format (preferred):
userId <integer as string>
title <notification title, pre-localized>
body <notification body, pre-localized>
channel payments | security | kyc | system
traceId <intent_id or uuid, optional>
data <JSON string, optional>
badge <integer as string, optional>
JSON payload format (alternative):
Both formats are accepted. userId, title, body are required. channel defaults to system if omitted.
Who Publishes¶
| Publisher | Scenarios |
|---|---|
| Payment Manager | Payment sent, received, pending, failed |
| Auth Center (Serverpod) | Login alerts, password changes, KYC status updates |
| KYC Service | KYC approved / rejected (via Auth Center in practice) |
Important: Publishers are responsible for localizing title and body to the user's locale before publishing. The notifications service does not translate.
Firebase FCM¶
- SDK:
firebase-admin^12.4.0 - Initialized from a service account JSON file or inline JSON string
- Uses
admin.messaging().send(message)— one call per device token - Firebase project is identified by the
project_idfield in the service account JSON
Credential Configuration¶
Either set FCM_SERVICE_ACCOUNT_PATH (path to a JSON file) or FCM_SERVICE_ACCOUNT_JSON (the full JSON content as a string). At least one is required — the service exits on startup if neither is set.
In production, mount the service account file as a Kubernetes secret or Docker secret and point FCM_SERVICE_ACCOUNT_PATH at it.
PostgreSQL¶
The service uses two schemas:
Read: public.device_token¶
Table owned by Auth Center (Serverpod). The notifications service has SELECT and DELETE access.
Column names are camelCase with double quotes (Serverpod convention).
⚠️ The query does not currently select
"appId", so no per-app filtering happens (see «Multi-app routing» below).
Multi-app routing (TODO — Блок 4)¶
device_token.appId определяет приложение-источник токена:
| appId | Flutter приложение | FCM проект |
|---|---|---|
wallet |
onewallet_base_flutter | FCM Project A |
closeloop |
one_loop_app | FCM Project B |
merchant |
one_merchant_app | FCM Project C |
Текущее состояние:
- ✅ appId добавлен в public.device_token (см. device_token.spy.yaml, колонка appId String? + индекс device_token_app_idx).
- ✅ appId заполняется при регистрации: NotificationEndpoint.registerDevice(...required String appId...), и Flutter (push_notification_service.dart) передаёт его при вызове.
- ❌ Сервис ещё не фильтрует по appId: fetchTokens() (src/handler.js) не выбирает колонку appId, поэтому все токены пользователя получают push вне зависимости от приложения.
- ❌ Используется один FCM app instance (src/fcm.js, единственный app) — multi-project routing не реализован.
Для multi-app routing нужно:
1. Инициализировать несколько FCM app instances (по одному на каждый FCM проект) в src/fcm.js
2. В fetchTokens() (src/handler.js) выбрать "appId" и маршрутизировать токен в нужный FCM instance
3. PM/публикатор должен передавать целевой appId в сообщении в stream (поле в data JSON), чтобы ограничить рассылку нужным приложением
Write: notifications.notification_log¶
Table owned by notifications service. Created by sql/001_init.sql.
CREATE TABLE notifications.notification_log (
id BIGSERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
channel TEXT NOT NULL, -- payments | security | kyc | system
title TEXT NOT NULL,
body TEXT NOT NULL,
trace_id TEXT, -- intent_id for payments, uuid for security
data JSONB,
status TEXT NOT NULL, -- pending | sent | partial | failed | no_tokens
tokens_targeted INTEGER NOT NULL DEFAULT 0,
tokens_succeeded INTEGER NOT NULL DEFAULT 0,
tokens_failed INTEGER NOT NULL DEFAULT 0,
response JSONB, -- [{tokenId, fcmMessageId?, errorCode?}]
delivery_count INTEGER NOT NULL DEFAULT 1,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
processed_at TIMESTAMPTZ
);
Indexes: (user_id, created_at DESC), (trace_id) where not null, (status, created_at DESC).
Database Grants¶
The notifications_user database role needs access to both schemas. Run once by DBA (not included in 001_init.sql):
GRANT USAGE ON SCHEMA public TO notifications_user;
GRANT SELECT, DELETE ON public.device_token TO notifications_user;
GRANT ALL ON SCHEMA notifications TO notifications_user;
GRANT ALL ON ALL TABLES IN SCHEMA notifications TO notifications_user;
GRANT ALL ON ALL SEQUENCES IN SCHEMA notifications TO notifications_user;
Connection Configuration¶
Redis¶
| Env var | Default | Description |
|---|---|---|
REDIS_URL |
redis://localhost:6379 |
Redis connection URL |
PostgreSQL¶
| Env var | Default | Description |
|---|---|---|
PG_HOST |
localhost |
PostgreSQL host |
PG_PORT |
5432 |
PostgreSQL port |
PG_DATABASE |
— (required) | Database name |
PG_USER |
— (required) | Database user |
PG_PASSWORD |
— (required) | Database password |