From 01f31730cf1f08e20e99e8518879632328d3f5f2 Mon Sep 17 00:00:00 2001 From: Temi709 Date: Tue, 30 Jun 2026 10:36:10 +0100 Subject: [PATCH 1/2] feat: Implement delivery receipts per device and conversation/user rooms - in-progress --- worklogs/progress/issues-185-195.md | 235 ++++++++++++++++++++++++++++ 1 file changed, 235 insertions(+) create mode 100644 worklogs/progress/issues-185-195.md diff --git a/worklogs/progress/issues-185-195.md b/worklogs/progress/issues-185-195.md new file mode 100644 index 0000000..f49f5ec --- /dev/null +++ b/worklogs/progress/issues-185-195.md @@ -0,0 +1,235 @@ +# Implementation Progress: Issues #185 & #195 + +**Created:** June 30, 2026 +**Status:** In-Progress Planning Phase + +## Overview + +This document tracks the combined implementation progress for two related messaging infrastructure features: + +1. **Issue #185**: Delivery receipts (message_delivered) per device +2. **Issue #195**: Conversation + user rooms as a performance layer + +These issues are related as both deal with optimizing message delivery and presence systems in the Clicked messaging platform. + +--- + +## Issue #185: Delivery Receipts (message_delivered) per device + +### Problem Statement +When a device receives an envelope, it should emit `message_delivered { messageId }`. The server needs to track delivery per device and notify senders when all of a recipient's active devices have received the message. + +### Acceptance Criteria +- `deliveredAt` stamped per device (message_envelopes.deliveredAt for (messageId, recipientDeviceId)) +- Sender notified when a recipient's devices have all received +- Idempotent under duplicate receipts + +### Planned Implementation + +#### Database Changes +1. **Schema Update**: Add `deliveredAt` column to `message_envelopes` table +2. **Indexes**: Create composite index on (messageId, recipientDeviceId) for efficient lookup + +#### Server Components +1. **Delivery Receipt Handler**: + - Endpoint: `/api/messages/:messageId/delivered` + - Auth: Device-token based authentication + - Idempotency: Check existing `deliveredAt` before updating + +2. **Delivery Aggregator Service**: + - Tracks per-user device delivery status + - Emits aggregated events when all devices delivered + - Redis cache for active device tracking + +3. **Event Broadcasting**: + - Broadcast `message_all_devices_delivered` to sender + - Use existing WebSocket/SSE infrastructure + +#### Idempotency Strategy +- Use unique constraint on (messageId, recipientDeviceId, deliveredAt) +- Implement deduplication at API gateway level + +### Dependencies +- Issue #195 (Room system for efficient fanout) +- Existing message envelope system +- Device registration/authentication system + +--- + +## Issue #195: Conversation + User Rooms as Performance Layer + +### Problem Statement +Implement ephemeral rooms for conversation-specific events (typing, receipts, presence) and user-specific events (cross-device account events) to optimize fanout while maintaining security through conversation_members validation. + +### Acceptance Criteria +- Rooms reconstructable after a gateway restart +- Typing/receipts use rooms; message delivery still verifies membership +- No security decision relies solely on room membership + +### Planned Implementation + +#### Room Architecture +1. **Two Room Types**: + - `room:conversation:{conversationId}`: For conversation-specific events + - `room:user:{userId}`: For cross-device account events + +2. **Room Management**: + - Ephemeral in-memory storage (Redis) + - Rebuildable from Postgres + Redis on restart + - Automatic cleanup on inactivity + +#### Reconstruction Strategy +1. **Postgres Persistence**: + - Store room membership in `conversation_members` table + - Store active devices in `user_devices` table + +2. **Redis Cache**: + - Cache active room connections + - Pub/Sub channels for room events + - Expire after configurable TTL + +#### Security Model +1. **Gateway Validation**: + - All delivery decisions validate `conversation_members` + - Room membership used only for fanout optimization + - Fallback to direct database queries when rooms unavailable + +2. **Event Routing**: + - Typing indicators: Use conversation rooms + - Delivery receipts: Use conversation rooms + user rooms + - Message delivery: Validate membership first, then fanout via rooms + +### Dependencies +- Existing Redis infrastructure +- Postgres database with conversation_members table +- WebSocket/SSE gateway infrastructure + +--- + +## Combined Implementation Strategy + +### Phase 1: Foundation (Week 1) +1. **Database Schema Updates** (Both issues) + - Add `deliveredAt` to `message_envelopes` + - Review existing indexes for room reconstruction + - Create migration scripts + +2. **Room Infrastructure** (Issue #195) + - Implement basic room management service + - Create room reconstruction from Postgres + - Add Redis integration for ephemeral storage + +### Phase 2: Core Features (Week 2) +1. **Delivery Receipt System** (Issue #185) + - Implement per-device delivery tracking + - Create delivery aggregator service + - Add idempotency handling + +2. **Event Routing** (Issue #195) + - Implement room-based event fanout + - Integrate typing indicators + - Set up receipt broadcasting + +### Phase 3: Integration & Testing (Week 3) +1. **System Integration** + - Combine room system with delivery receipts + - Implement aggregated delivery notifications + - Add monitoring and logging + +2. **Testing & Validation** + - Idempotency testing for duplicate receipts + - Room reconstruction testing after restart + - Security validation (membership checks) + +### Phase 4: Deployment (Week 4) +1. **Gradual Rollout** + - Feature flags for both systems + - Canary deployment to staging + - Performance monitoring + +2. **Documentation** + - API documentation for new endpoints + - System architecture diagrams + - Operational runbooks + +--- + +## Technical Considerations + +### Performance +- Room system reduces database queries for fanout +- Redis pub/sub for real-time events +- Connection pooling for database reconstruction + +### Scalability +- Horizontal scaling of room servers +- Redis cluster for distributed rooms +- Database connection management + +### Reliability +- Graceful degradation when rooms unavailable +- Automatic room reconstruction +- Circuit breakers for external dependencies + +### Security +- All security decisions validate conversation_members +- Device authentication for delivery receipts +- Rate limiting on receipt endpoints + +--- + +## Risks & Mitigations + +| Risk | Impact | Mitigation | +|------|--------|------------| +| Redis failure during room reconstruction | High | Fallback to direct database queries, implement retry logic | +| Duplicate delivery receipts | Medium | Idempotency keys, database constraints | +| Room memory leaks | Medium | Automatic cleanup, monitoring, restart policies | +| Performance degradation with many devices | Medium | Device connection limits, connection pooling | +| Security bypass through room membership | Critical | Always validate conversation_members before delivery | + +--- + +## Success Metrics + +1. **Delivery Receipts**: + - 99.9% successful receipt processing + - <100ms average processing time + - Zero duplicate delivery notifications + +2. **Room System**: + - <50ms room reconstruction time + - 95% reduction in database queries for fanout + - Zero security incidents related to room membership + +--- + +## Next Steps + +1. **Immediate**: + - Review existing codebase for integration points + - Create detailed technical design documents + - Set up development environment + +2. **Short-term**: + - Implement database migrations + - Create basic room service skeleton + - Set up test infrastructure + +3. **Long-term**: + - Full implementation of both features + - Comprehensive testing + - Production deployment + +--- + +## Notes + +- Both issues are interdependent: Issue #195 provides the optimization layer for Issue #185's event broadcasting +- Security is paramount - all delivery decisions must validate conversation membership +- The room system is purely for performance optimization, not for authorization +- Consider implementing both features behind feature flags for controlled rollout + +--- + +*Last Updated: June 30, 2026* \ No newline at end of file From 880d5edfb714b9be02ff01dc092cdbdcb3eacf03 Mon Sep 17 00:00:00 2001 From: Temi709 Date: Thu, 2 Jul 2026 20:12:28 +0100 Subject: [PATCH 2/2] feat: add device delivery receipts and conversation rooms - completed --- .../src/__tests__/deliveryReceipts.test.ts | 169 ++++++++++++++++ .../backend/src/__tests__/roomManager.test.ts | 176 +++++++++++++++++ apps/backend/src/index.ts | 34 +++- apps/backend/src/routes/devices.ts | 3 + apps/backend/src/routes/users.ts | 7 + .../src/services/deliveryAggregation.ts | 184 ++++++++++++++++++ apps/backend/src/services/deliveryPipeline.ts | 10 +- apps/backend/src/services/roomManager.ts | 173 ++++++++++++++++ apps/backend/src/socket/messaging.ts | 59 +++--- 9 files changed, 785 insertions(+), 30 deletions(-) create mode 100644 apps/backend/src/__tests__/deliveryReceipts.test.ts create mode 100644 apps/backend/src/__tests__/roomManager.test.ts create mode 100644 apps/backend/src/services/deliveryAggregation.ts create mode 100644 apps/backend/src/services/roomManager.ts diff --git a/apps/backend/src/__tests__/deliveryReceipts.test.ts b/apps/backend/src/__tests__/deliveryReceipts.test.ts new file mode 100644 index 0000000..2bf0603 --- /dev/null +++ b/apps/backend/src/__tests__/deliveryReceipts.test.ts @@ -0,0 +1,169 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { EventEmitter } from 'events'; + +// ── Mock DB ──────────────────────────────────────────────────────────────── + +const mockFindFirst = vi.fn(); +const mockUpdate = vi.fn(); +const mockSelect = vi.fn(); +const mockQuery = vi.fn(); + +vi.mock('../db/index.js', () => ({ + db: { + query: { + conversationMembers: { findFirst: mockFindFirst, findMany: mockQuery }, + messages: { findFirst: mockFindFirst }, + }, + select: mockSelect, + update: mockUpdate, + }, +})); + +vi.mock('../db/schema.js', () => ({ + conversationMembers: {}, + messages: {}, + messageEnvelopes: {}, + userDevices: {}, +})); + +vi.mock('../lib/redis.js', () => ({ redis: null })); + +vi.mock('drizzle-orm', () => ({ + and: vi.fn((...args: unknown[]) => args), + eq: vi.fn((col: unknown, val: unknown) => ({ col, val })), + isNull: vi.fn((col: unknown) => ({ col, op: 'isNull' })), + inArray: vi.fn((col: unknown, vals: unknown) => ({ col, vals })), +})); + +vi.mock('../services/resumeStream.js', () => ({ + publishEphemeral: vi.fn().mockResolvedValue(undefined), +})); + +// ── Tests ────────────────────────────────────────────────────────────────── + +describe('Delivery Receipts', () => { + let mockIo: any; + let mockSocket: any; + + beforeEach(() => { + vi.clearAllMocks(); + + mockIo = { + to: vi.fn().mockReturnThis(), + emit: vi.fn(), + }; + + mockSocket = { + auth: { + userId: 'user-123', + deviceId: 'device-456', + }, + emit: vi.fn(), + rooms: new Set(['conversation-789']), + }; + + // Mock successful membership check + mockFindFirst.mockResolvedValue({ conversationId: 'conversation-789', userId: 'user-123' }); + + // Mock message find + mockFindFirst.mockResolvedValueOnce({ id: 'message-abc', senderId: 'sender-999', conversationId: 'conversation-789' }); + + // Mock update success + mockUpdate.mockResolvedValue({ rowCount: 1 }); + + // Mock active devices query + mockSelect.mockResolvedValue([ + { id: 'device-456' }, + { id: 'device-457' }, + ]); + + // Mock envelopes query + mockSelect.mockResolvedValueOnce([ + { recipientDeviceId: 'device-456', deliveredAt: new Date() }, + { recipientDeviceId: 'device-457', deliveredAt: null }, + ]); + }); + + it('should handle per-device delivery receipt', async () => { + // Import dynamically after mocks are set up + const { handleDeviceDeliveryReceipt } = await import('../services/deliveryAggregation.js'); + + await handleDeviceDeliveryReceipt( + mockIo, + null, // redis + 'message-abc', + 'device-456', + 'user-123', + 'conversation-789' + ); + + // Verify database update was called + expect(mockUpdate).toHaveBeenCalled(); + + // Verify room-based emission + expect(mockIo.to).toHaveBeenCalledWith('room:conversation:conversation-789'); + expect(mockIo.emit).toHaveBeenCalledWith('device_delivery_receipt', expect.objectContaining({ + conversationId: 'conversation-789', + messageId: 'message-abc', + recipientUserId: 'user-123', + recipientDeviceId: 'device-456', + })); + }); + + it('should validate isMessageFullyDeliveredToUser correctly', async () => { + const { isMessageFullyDeliveredToUser } = await import('../services/deliveryAggregation.js'); + + // First device delivered, second not delivered + mockSelect + .mockReset() + .mockResolvedValueOnce([{ id: 'device-456' }, { id: 'device-457' }]) // active devices + .mockResolvedValueOnce([ + { recipientDeviceId: 'device-456', deliveredAt: new Date() }, + { recipientDeviceId: 'device-457', deliveredAt: null }, + ]); // envelopes + + const notFullyDelivered = await isMessageFullyDeliveredToUser('message-abc', 'user-123'); + expect(notFullyDelivered).toBe(false); + + // Both devices delivered + mockSelect + .mockReset() + .mockResolvedValueOnce([{ id: 'device-456' }, { id: 'device-457' }]) // active devices + .mockResolvedValueOnce([ + { recipientDeviceId: 'device-456', deliveredAt: new Date() }, + { recipientDeviceId: 'device-457', deliveredAt: new Date() }, + ]); // envelopes + + const fullyDelivered = await isMessageFullyDeliveredToUser('message-abc', 'user-123'); + expect(fullyDelivered).toBe(true); + }); + + it('should be idempotent for duplicate delivery receipts', async () => { + const { handleDeviceDeliveryReceipt } = await import('../services/deliveryAggregation.js'); + + // First call + await handleDeviceDeliveryReceipt( + mockIo, + null, + 'message-abc', + 'device-456', + 'user-123', + 'conversation-789' + ); + + const firstCallCount = mockUpdate.mock.calls.length; + + // Second call with same parameters + await handleDeviceDeliveryReceipt( + mockIo, + null, + 'message-abc', + 'device-456', + 'user-123', + 'conversation-789' + ); + + // Should have same number of update calls (idempotent) + expect(mockUpdate.mock.calls.length).toBe(firstCallCount); + }); +}); \ No newline at end of file diff --git a/apps/backend/src/__tests__/roomManager.test.ts b/apps/backend/src/__tests__/roomManager.test.ts new file mode 100644 index 0000000..72ed59d --- /dev/null +++ b/apps/backend/src/__tests__/roomManager.test.ts @@ -0,0 +1,176 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; + +// ── Mock DB ──────────────────────────────────────────────────────────────── + +const mockFindFirst = vi.fn(); +const mockFindMany = vi.fn(); + +vi.mock('../db/index.js', () => ({ + db: { + query: { + conversationMembers: { findFirst: mockFindFirst, findMany: mockFindMany }, + }, + }, +})); + +vi.mock('../db/schema.js', () => ({ + conversationMembers: {}, +})); + +vi.mock('drizzle-orm', () => ({ + and: vi.fn((...args: unknown[]) => args), + eq: vi.fn((col: unknown, val: unknown) => ({ col, val })), +})); + +// ── Tests ────────────────────────────────────────────────────────────────── + +describe('Room Manager', () => { + let mockSocket: any; + let mockIo: any; + + beforeEach(() => { + vi.clearAllMocks(); + + mockSocket = { + auth: { + userId: 'user-123', + deviceId: 'device-456', + }, + join: vi.fn().mockResolvedValue(undefined), + rooms: new Set(), + }; + + mockIo = { + fetchSockets: vi.fn().mockResolvedValue([]), + to: vi.fn().mockReturnThis(), + emit: vi.fn(), + }; + + // Mock successful membership check + mockFindFirst.mockResolvedValue({ conversationId: 'conversation-789', userId: 'user-123' }); + mockFindMany.mockResolvedValue([ + { conversationId: 'conversation-789' }, + { conversationId: 'conversation-790' }, + ]); + }); + + it('should generate correct room names', async () => { + const { conversationRoom, userRoom } = await import('../services/roomManager.js'); + + expect(conversationRoom('conversation-789')).toBe('room:conversation:conversation-789'); + expect(userRoom('user-123')).toBe('room:user:user-123'); + }); + + it('should join conversation room with membership validation', async () => { + const { joinConversationRoom } = await import('../services/roomManager.js'); + + await joinConversationRoom(mockSocket, 'conversation-789'); + + // Verify membership validation + expect(mockFindFirst).toHaveBeenCalled(); + + // Verify room join + expect(mockSocket.join).toHaveBeenCalledWith('room:conversation:conversation-789'); + }); + + it('should reject conversation room join without membership', async () => { + const { joinConversationRoom } = await import('../services/roomManager.js'); + + // Mock no membership + mockFindFirst.mockResolvedValue(null); + + await expect(joinConversationRoom(mockSocket, 'conversation-789')) + .rejects.toThrow('Not a member of this conversation'); + + expect(mockSocket.join).not.toHaveBeenCalled(); + }); + + it('should join user room', async () => { + const { joinUserRoom } = await import('../services/roomManager.js'); + + joinUserRoom(mockSocket); + + expect(mockSocket.join).toHaveBeenCalledWith('room:user:user-123'); + }); + + it('should emit typing indicators to conversation room', async () => { + const { emitTypingIndicator } = await import('../services/roomManager.js'); + + emitTypingIndicator(mockIo, 'conversation-789', 'user-123', 'device-456'); + + expect(mockIo.to).toHaveBeenCalledWith('room:conversation:conversation-789'); + expect(mockIo.emit).toHaveBeenCalledWith('typing_start', { + conversationId: 'conversation-789', + userId: 'user-123', + deviceId: 'device-456', + }); + }); + + it('should emit presence updates to conversation room', async () => { + const { emitPresenceUpdate } = await import('../services/roomManager.js'); + + emitPresenceUpdate(mockIo, 'conversation-789', 'user-123', true, Date.now()); + + expect(mockIo.to).toHaveBeenCalledWith('room:conversation:conversation-789'); + expect(mockIo.emit).toHaveBeenCalledWith('presence_update', { + userId: 'user-123', + online: true, + status: 'online', + lastSeen: expect.any(Number), + }); + }); + + it('should emit cross-device events to user room', async () => { + const { emitCrossDeviceEvent } = await import('../services/roomManager.js'); + + const eventData = { type: 'settings_updated', settings: { theme: 'dark' } }; + emitCrossDeviceEvent(mockIo, 'user-123', 'settings_updated', eventData); + + expect(mockIo.to).toHaveBeenCalledWith('room:user:user-123'); + expect(mockIo.emit).toHaveBeenCalledWith('cross_device_event', { + type: 'settings_updated', + userId: 'user-123', + data: eventData, + timestamp: expect.any(String), + }); + }); + + it('should validate conversation membership', async () => { + const { validateConversationMembership } = await import('../services/roomManager.js'); + + const hasMembership = await validateConversationMembership('user-123', 'conversation-789'); + expect(hasMembership).toBe(true); + + // Test without membership + mockFindFirst.mockResolvedValue(null); + const noMembership = await validateConversationMembership('user-123', 'conversation-789'); + expect(noMembership).toBe(false); + }); + + it('should rebuild rooms after restart', async () => { + const { rebuildRoomsAfterRestart } = await import('../services/roomManager.js'); + + // Mock sockets + const mockSockets = [ + { + auth: { userId: 'user-123', deviceId: 'device-456' }, + join: vi.fn().mockResolvedValue(undefined), + }, + { + auth: { userId: 'user-124', deviceId: 'device-457' }, + join: vi.fn().mockResolvedValue(undefined), + }, + ]; + + mockIo.fetchSockets.mockResolvedValue(mockSockets); + + await rebuildRoomsAfterRestart(mockIo); + + // Verify each socket joined user room + expect(mockSockets[0].join).toHaveBeenCalledWith('room:user:user-123'); + expect(mockSockets[1].join).toHaveBeenCalledWith('room:user:user-124'); + + // Verify conversation rooms were joined (mockFindMany returns conversations) + expect(mockFindMany).toHaveBeenCalledTimes(2); + }); +}); \ No newline at end of file diff --git a/apps/backend/src/index.ts b/apps/backend/src/index.ts index e7e0f8d..af9aa2e 100644 --- a/apps/backend/src/index.ts +++ b/apps/backend/src/index.ts @@ -3,19 +3,15 @@ import { Server } from 'socket.io'; import { createAdapter } from '@socket.io/redis-adapter'; import { createClient } from 'redis'; import dotenv from 'dotenv'; -import { eq, isNull, and } from 'drizzle-orm'; +import { eq, isNull, and, inArray } from 'drizzle-orm'; import { db } from './db/index.js'; import { conversationMembers, users, userDevices } from './db/schema.js'; -import { eq, inArray } from 'drizzle-orm'; -import { db } from './db/index.js'; -import { conversationMembers, users } from './db/schema.js'; import { publishEphemeral } from './services/resumeStream.js'; import { socketAuthMiddleware, type AuthSocket } from './middleware/socketAuth.js'; import { registerMessagingHandlers } from './socket/messaging.js'; import { app } from './app.js'; import { redis as appRedis } from './lib/redis.js'; import { setSocketServer } from './lib/socket.js'; -import { setOnline, setOffline, refreshPresence, isOnline, deriveDevicePresence } from './services/presence.js'; import { cleanupStaleSockets, reconcileBoot, @@ -24,6 +20,7 @@ import { setOffline, setOnline, unregisterPresenceSocket, + deriveDevicePresence, } from './services/presence.js'; import { startHeartbeatTimer, clearHeartbeatTimer } from './services/heartbeat.js'; import { @@ -48,6 +45,8 @@ import { import { startFileCleanupJob } from './services/fileCleanup.js'; import { loadEnv } from './config.js'; import { createObjectStore } from './lib/objectStore.js'; +import { conversationRoom, userRoom, joinConversationRoom, joinUserRoom, rebuildRoomsAfterRestart } from './services/roomManager.js'; +import { handleDeviceDeliveryReceipt } from './services/deliveryAggregation.js'; dotenv.config(); @@ -188,6 +187,9 @@ io.on('connection', async (socket: AuthSocket) => { // Redis adapter. await socket.join(`device:${deviceId}`); + // Join user room for cross-device synchronization + joinUserRoom(socket); + // Auto-join all conversation rooms so the socket receives new_message events // for every conversation the user belongs to (needed for unread badge tracking). const memberships = await db.query.conversationMembers.findMany({ @@ -195,6 +197,9 @@ io.on('connection', async (socket: AuthSocket) => { columns: { conversationId: true }, }); for (const m of memberships) { + // Join the conversation room for optimized fan-out + await socket.join(conversationRoom(m.conversationId)); + // Also join the direct conversation room for backward compatibility await socket.join(m.conversationId); } @@ -205,6 +210,14 @@ io.on('connection', async (socket: AuthSocket) => { const becameOnline = await setOnline(appRedis, userId, deviceId); if (becameOnline && presenceVisible) { for (const m of memberships) { + io.to(conversationRoom(m.conversationId)).emit('user_online', { userId }); + io.to(conversationRoom(m.conversationId)).emit('presence_update', { + userId, + online: true, + status: 'online', + lastSeen: Date.now(), + }); + // Also emit to direct conversation room for backward compatibility io.to(m.conversationId).emit('user_online', { userId }); io.to(m.conversationId).emit('presence_update', { userId, @@ -317,6 +330,13 @@ io.on('connection', async (socket: AuthSocket) => { const { lastSeen } = await deriveDevicePresence(userId); for (const m of memberships) { + io.to(conversationRoom(m.conversationId)).emit('user_offline', { userId }); + io.to(conversationRoom(m.conversationId)).emit('presence_update', { + userId, + online: false, + ...(lastSeen ? { lastSeen } : {}), + }); + // Also emit to direct conversation room for backward compatibility io.to(m.conversationId).emit('user_offline', { userId }); io.to(m.conversationId).emit('presence_update', { userId, @@ -369,6 +389,10 @@ async function attachRedisAdapter(): Promise { try { await reconcileBoot(io, appRedis); console.log('[presence] Boot reconciliation complete'); + + // Rebuild rooms after restart for optimized fan-out + await rebuildRoomsAfterRestart(io); + console.log('[roomManager] Rooms rebuilt after restart'); } catch (err) { console.warn('[presence] Boot reconciliation failed:', err); } diff --git a/apps/backend/src/routes/devices.ts b/apps/backend/src/routes/devices.ts index a64a660..ad6fcb8 100644 --- a/apps/backend/src/routes/devices.ts +++ b/apps/backend/src/routes/devices.ts @@ -17,6 +17,7 @@ import { userDevices, conversationMembers, messages } from '../db/schema.js'; import { getSocketServer } from '../lib/socket.js'; import { invalidateConversationCaches } from '../lib/conversationCache.js'; import { SignedPreKeyEntrySchema, PreKeyEntrySchema, verifyEd25519Signature } from '../lib/keys.js'; +import { conversationRoom } from '../services/roomManager.js'; export const devicesRouter: RouterType = Router(); @@ -395,6 +396,8 @@ async function emitDeviceChangeEvent(userId: string, change: 'device_added' | 'd const io = getSocketServer(); if (io) { + io.to(conversationRoom(m.conversationId)).emit('new_message', msg); + // Also emit to direct conversation room for backward compatibility io.to(m.conversationId).emit('new_message', msg); } diff --git a/apps/backend/src/routes/users.ts b/apps/backend/src/routes/users.ts index 2a5178b..fdd4830 100644 --- a/apps/backend/src/routes/users.ts +++ b/apps/backend/src/routes/users.ts @@ -7,6 +7,7 @@ import { requireAuth, type AuthRequest } from '../middleware/auth.js'; import { redis } from '../lib/redis.js'; import { isOnline, deriveDevicePresence } from '../services/presence.js'; import { getSocketServer } from '../lib/socket.js'; +import { conversationRoom } from '../services/roomManager.js'; export const usersRouter: RouterType = Router(); @@ -350,9 +351,15 @@ usersRouter.patch('/me', async (req: AuthRequest, res) => { if (online) { for (const m of memberships) { if (presenceVisible) { + io.to(conversationRoom(m.conversationId)).emit('user_online', { userId }); + io.to(conversationRoom(m.conversationId)).emit('presence_update', { userId, online: true }); + // Also emit to direct conversation room for backward compatibility io.to(m.conversationId).emit('user_online', { userId }); io.to(m.conversationId).emit('presence_update', { userId, online: true }); } else { + io.to(conversationRoom(m.conversationId)).emit('user_offline', { userId }); + io.to(conversationRoom(m.conversationId)).emit('presence_update', { userId, online: false }); + // Also emit to direct conversation room for backward compatibility io.to(m.conversationId).emit('user_offline', { userId }); io.to(m.conversationId).emit('presence_update', { userId, online: false }); } diff --git a/apps/backend/src/services/deliveryAggregation.ts b/apps/backend/src/services/deliveryAggregation.ts new file mode 100644 index 0000000..ece32d1 --- /dev/null +++ b/apps/backend/src/services/deliveryAggregation.ts @@ -0,0 +1,184 @@ +import { and, eq, isNull, inArray } from 'drizzle-orm'; +import type { Server } from 'socket.io'; +import { db } from '../db/index.js'; +import { messageEnvelopes, userDevices, conversationMembers, messages } from '../db/schema.js'; +import { publishEphemeral } from './resumeStream.js'; +import type { Redis } from 'ioredis'; +import { conversationRoom } from './roomManager.js'; + +/** + * Check if a message has been delivered to all active devices for a recipient user. + * Returns true if every active device (not revoked) has a deliveredAt timestamp. + */ +export async function isMessageFullyDeliveredToUser( + messageId: string, + recipientUserId: string, +): Promise { + // Get all active devices for this user + const activeDevices = await db + .select({ id: userDevices.id }) + .from(userDevices) + .where(and(eq(userDevices.userId, recipientUserId), isNull(userDevices.revokedAt))); + + if (activeDevices.length === 0) { + // No active devices, consider message delivered by default + return true; + } + + const activeDeviceIds = activeDevices.map((d) => d.id); + + // Check all envelopes for these devices + const envelopes = await db + .select({ + recipientDeviceId: messageEnvelopes.recipientDeviceId, + deliveredAt: messageEnvelopes.deliveredAt, + }) + .from(messageEnvelopes) + .where( + and( + eq(messageEnvelopes.messageId, messageId), + eq(messageEnvelopes.recipientUserId, recipientUserId), + ), + ); + + const deliveredDeviceIds = new Set( + envelopes.filter((e) => e.deliveredAt !== null).map((e) => e.recipientDeviceId), + ); + + // Check if every active device has received the message + return activeDeviceIds.every((deviceId) => deliveredDeviceIds.has(deviceId)); +} + +/** + * Handle per-device delivery receipt and notify sender when all active devices have received. + * This function is idempotent - duplicate calls won't create incorrect state. + */ +export async function handleDeviceDeliveryReceipt( + io: Server, + redis: Redis | null, + messageId: string, + recipientDeviceId: string, + recipientUserId: string, + conversationId: string, +): Promise { + // First, get the message to find the sender + const message = await db.query.messages.findFirst({ + where: eq(messages.id, messageId), + columns: { senderId: true, conversationId: true }, + }); + + if (!message) { + console.warn(`[deliveryAggregation] Message not found: ${messageId}`); + return; + } + + // Update deliveredAt for this specific device (idempotent) + const updateResult = await db + .update(messageEnvelopes) + .set({ deliveredAt: new Date() }) + .where( + and( + eq(messageEnvelopes.messageId, messageId), + eq(messageEnvelopes.recipientDeviceId, recipientDeviceId), + ), + ); + + // Check if this user now has all devices delivered + const fullyDelivered = await isMessageFullyDeliveredToUser(messageId, recipientUserId); + + if (fullyDelivered) { + // Notify sender that this recipient has fully received the message + io.to(`room:user:${message.senderId}`).emit('message_fully_delivered', { + messageId, + conversationId, + recipientUserId, + deliveredAt: new Date().toISOString(), + }); + + // Also notify via Redis for cross-instance delivery + if (redis) { + await publishEphemeral(redis, [message.senderId], { + type: 'message_fully_delivered', + data: { + messageId, + conversationId, + recipientUserId, + }, + }); + } + } + + // Emit per-device delivery receipt for other conversation members + io.to(conversationRoom(conversationId)).volatile.emit('device_delivery_receipt', { + conversationId, + messageId, + recipientUserId, + recipientDeviceId, + deliveredAt: new Date().toISOString(), + }); + + if (redis) { + const members = await db.query.conversationMembers.findMany({ + where: eq(conversationMembers.conversationId, conversationId), + columns: { userId: true }, + }); + + await publishEphemeral( + redis, + members.map((member) => member.userId), + { + type: 'device_delivery_receipt', + data: { + conversationId, + messageId, + recipientUserId, + recipientDeviceId, + }, + }, + ); + } +} + +/** + * Get delivery status for a message across all recipients. + * Returns a map of recipientUserId -> {fullyDelivered: boolean, deviceDeliveries: Array} + */ +export async function getMessageDeliveryStatus( + messageId: string, + conversationId: string, +): Promise }>> { + const members = await db + .select({ userId: conversationMembers.userId }) + .from(conversationMembers) + .where(eq(conversationMembers.conversationId, conversationId)); + + const result: Record }> = {}; + + for (const member of members) { + // Get all envelopes for this user + const envelopes = await db + .select({ + recipientDeviceId: messageEnvelopes.recipientDeviceId, + deliveredAt: messageEnvelopes.deliveredAt, + }) + .from(messageEnvelopes) + .where( + and( + eq(messageEnvelopes.messageId, messageId), + eq(messageEnvelopes.recipientUserId, member.userId), + ), + ); + + const fullyDelivered = await isMessageFullyDeliveredToUser(messageId, member.userId); + + result[member.userId] = { + fullyDelivered, + deviceDeliveries: envelopes.map((e) => ({ + deviceId: e.recipientDeviceId, + deliveredAt: e.deliveredAt?.toISOString() ?? null, + })), + }; + } + + return result; +} \ No newline at end of file diff --git a/apps/backend/src/services/deliveryPipeline.ts b/apps/backend/src/services/deliveryPipeline.ts index 3730dfe..0d48509 100644 --- a/apps/backend/src/services/deliveryPipeline.ts +++ b/apps/backend/src/services/deliveryPipeline.ts @@ -3,6 +3,7 @@ import type { Server } from 'socket.io'; import { db } from '../db/index.js'; import { conversationMembers, messageEnvelopes, userDevices } from '../db/schema.js'; import type { Message } from '../db/schema.js'; +import { conversationRoom } from './roomManager.js'; /** * Room name for per-device targeting. Each socket joins this room on connect @@ -46,6 +47,7 @@ export async function deliverMessage( if (activeDevices.length === 0) { io.to(conversationId).emit('new_message', message); + io.to(conversationRoom(conversationId)).emit('new_message', message); return; } @@ -88,7 +90,7 @@ export async function deliverMessage( // Step 5: room-level notification so clients can update unread counts / UI. // Ciphertext is intentionally omitted here; each device received it above. - io.to(conversationId).emit('new_message', { + const newMessageEvent = { id: message.id, conversationId, senderId: message.senderId, @@ -98,5 +100,9 @@ export async function deliverMessage( createdAt: message.createdAt, deletedAt: message.deletedAt, ciphertext: null, - }); + }; + + // Emit to both direct conversation room (backward compatibility) and conversation room (optimized) + io.to(conversationId).emit('new_message', newMessageEvent); + io.to(conversationRoom(conversationId)).emit('new_message', newMessageEvent); } diff --git a/apps/backend/src/services/roomManager.ts b/apps/backend/src/services/roomManager.ts new file mode 100644 index 0000000..11a2cdf --- /dev/null +++ b/apps/backend/src/services/roomManager.ts @@ -0,0 +1,173 @@ +import type { Server } from 'socket.io'; +import type { AuthSocket } from '../middleware/socketAuth.js'; +import { eq, and } from 'drizzle-orm'; +import { db } from '../db/index.js'; +import { conversationMembers } from '../db/schema.js'; + +/** + * Room name constants following the pattern: room:{type}:{id} + */ +export function conversationRoom(conversationId: string): string { + return `room:conversation:${conversationId}`; +} + +export function userRoom(userId: string): string { + return `room:user:${userId}`; +} + +/** + * Join conversation room for optimized fan-out. + * Rooms are performance optimizations only - never authoritative for permissions. + */ +export async function joinConversationRoom( + socket: AuthSocket, + conversationId: string, +): Promise { + const userId = socket.auth!.userId; + + // Always validate membership from source of truth before joining + const membership = await db.query.conversationMembers.findFirst({ + where: and( + eq(conversationMembers.conversationId, conversationId), + eq(conversationMembers.userId, userId), + ), + }); + + if (!membership) { + throw new Error('Not a member of this conversation'); + } + + await socket.join(conversationRoom(conversationId)); +} + +/** + * Join user room for cross-device synchronization. + */ +export function joinUserRoom(socket: AuthSocket): void { + const userId = socket.auth!.userId; + socket.join(userRoom(userId)); +} + +/** + * Rebuild rooms after gateway restart. + * This should be called during startup to ensure all active sockets + * are in the appropriate rooms based on PostgreSQL data. + */ +export async function rebuildRoomsAfterRestart( + io: Server, +): Promise { + console.log('[roomManager] Rebuilding rooms after restart'); + + const sockets = await io.fetchSockets(); + + for (const socket of sockets) { + const authSocket = socket as AuthSocket; + const userId = authSocket.auth?.userId; + const deviceId = authSocket.auth?.deviceId; + + if (!userId || !deviceId) { + continue; + } + + // Join user room for cross-device events + await authSocket.join(userRoom(userId)); + + // Join all conversation rooms user belongs to + const memberships = await db.query.conversationMembers.findMany({ + where: eq(conversationMembers.userId, userId), + columns: { conversationId: true }, + }); + + for (const membership of memberships) { + await authSocket.join(conversationRoom(membership.conversationId)); + } + } + + console.log(`[roomManager] Rebuilt rooms for ${sockets.length} sockets`); +} + +/** + * Emit typing indicator to conversation room. + * Uses conversation rooms for optimized fan-out. + */ +export function emitTypingIndicator( + io: Server, + conversationId: string, + userId: string, + deviceId: string, +): void { + io.to(conversationRoom(conversationId)).emit('typing_start', { + conversationId, + userId, + deviceId, + }); +} + +/** + * Emit typing stop to conversation room. + */ +export function emitTypingStop( + io: Server, + conversationId: string, + userId: string, + deviceId: string, +): void { + io.to(conversationRoom(conversationId)).emit('typing_stop', { + conversationId, + userId, + deviceId, + }); +} + +/** + * Emit presence update to conversation room. + */ +export function emitPresenceUpdate( + io: Server, + conversationId: string, + userId: string, + online: boolean, + lastSeen?: number, +): void { + io.to(conversationRoom(conversationId)).emit('presence_update', { + userId, + online, + status: online ? 'online' : 'offline', + ...(lastSeen ? { lastSeen } : {}), + }); +} + +/** + * Emit cross-device account event to user room. + */ +export function emitCrossDeviceEvent( + io: Server, + userId: string, + eventType: string, + data: Record, +): void { + io.to(userRoom(userId)).emit('cross_device_event', { + type: eventType, + userId, + data, + timestamp: new Date().toISOString(), + }); +} + +/** + * Validate conversation membership before allowing room-based operations. + * This ensures rooms are only an optimization, not an authorization mechanism. + */ +export async function validateConversationMembership( + userId: string, + conversationId: string, +): Promise { + const membership = await db.query.conversationMembers.findFirst({ + where: and( + eq(conversationMembers.conversationId, conversationId), + eq(conversationMembers.userId, userId), + ), + }); + + return membership !== null; +} \ No newline at end of file diff --git a/apps/backend/src/socket/messaging.ts b/apps/backend/src/socket/messaging.ts index 3ad53dd..25ebeb5 100644 --- a/apps/backend/src/socket/messaging.ts +++ b/apps/backend/src/socket/messaging.ts @@ -21,6 +21,8 @@ import { dispatchOfflinePush, FILE_CONTENT_TYPES } from '../services/pushNotific import { deliverMessage } from '../services/deliveryPipeline.js'; import { publishEphemeral, readMissedEvents } from '../services/resumeStream.js'; import { publishToDevice } from '../services/deviceDelivery.js'; +import { handleDeviceDeliveryReceipt } from '../services/deliveryAggregation.js'; +import { conversationRoom } from '../services/roomManager.js'; import { EventDispatcher } from './dispatcher.js'; const PAGE_SIZE = 30; @@ -60,6 +62,7 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void }; if (did) rp.deviceId = did; socket.to(cid).emit('typing_stop', rp); + socket.to(conversationRoom(cid)).emit('typing_stop', rp); } typingTimers.clear(); }); @@ -451,6 +454,10 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void originalMessageId: rootMessageId, newMessageId: messageId, }); + io.to(conversationRoom(conversationId)).emit('message_edited', { + originalMessageId: rootMessageId, + newMessageId: messageId, + }); const members = await db.query.conversationMembers.findMany({ where: eq(conversationMembers.conversationId, conversationId), @@ -592,6 +599,11 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void conversationId, fileId: messageId, }); + io.to(conversationRoom(conversationId)).emit('file_message', { + messageId, + conversationId, + fileId: messageId, + }); } const members = await db.query.conversationMembers.findMany({ @@ -688,6 +700,7 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void } io.to(message.conversationId).emit('message_deleted', { messageId }); + io.to(conversationRoom(message.conversationId)).emit('message_deleted', { messageId }); }); // ── message_read ─────────────────────────────────────────────────────────── @@ -775,16 +788,17 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void return; } - await db - .update(messageEnvelopes) - .set({ deliveredAt: new Date() }) - .where( - and( - eq(messageEnvelopes.messageId, messageId), - eq(messageEnvelopes.recipientDeviceId, socket.auth!.deviceId), - ), - ); + // Use the new aggregation service for per-device delivery tracking + await handleDeviceDeliveryReceipt( + io, + redis, + messageId, + socket.auth!.deviceId, + userId, + conversationId, + ); + // Also emit to conversation room for backward compatibility io.to(conversationId).volatile.emit('delivery_receipt', { conversationId, messageId, @@ -795,20 +809,16 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void deliveredAt: new Date().toISOString(), }); - if (redis) { - const members = await db.query.conversationMembers.findMany({ - where: eq(conversationMembers.conversationId, conversationId), - columns: { userId: true }, - }); - await publishEphemeral( - redis, - members.map((member) => member.userId), - { - type: 'delivery_receipt', - data: { conversationId, messageId, envelopeId, userId, deviceId: socket.auth!.deviceId, sequenceNumber }, - }, - ); - } + // Emit to conversation room for optimized fan-out + io.to(conversationRoom(conversationId)).volatile.emit('device_delivery_receipt', { + conversationId, + messageId, + envelopeId, + recipientUserId: userId, + recipientDeviceId: socket.auth!.deviceId, + sequenceNumber, + deliveredAt: new Date().toISOString(), + }); }); // ── resume ───────────────────────────────────────────────────────────────── @@ -914,10 +924,12 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void const timer = setTimeout(() => { typingTimers.delete(timerKey); socket.to(conversationId).emit('typing_stop', relayPayload); + socket.to(conversationRoom(conversationId)).emit('typing_stop', relayPayload); }, 5000); typingTimers.set(timerKey, timer); socket.to(conversationId).emit('typing_start', relayPayload); + socket.to(conversationRoom(conversationId)).emit('typing_start', relayPayload); }); // ── typing_stop ──────────────────────────────────────────────────────────── @@ -969,6 +981,7 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void } socket.to(conversationId).emit('typing_stop', relayPayload); + socket.to(conversationRoom(conversationId)).emit('typing_stop', relayPayload); }); // ── ask_assistant ──────────────────────────────────────────────────────────