Developer Docs

uChat โ€” Integration Guide

Overview

The uchat subgraph is the platform's end-to-end encrypted (E2EE) messenger โ€” enabling private, real-time conversations between Visitors, Malet Owners, and fellow shoppers directly within the Mallnline ecosystem. Built as a Rust-native microservice (Axum + Async-GraphQL), it mirrors the auth service architecture and participates in the Apollo Federation gateway as the platform's 16th subgraph.

The server acts as a "dumb relay" โ€” it stores and forwards E2EE ciphertext but is mathematically incapable of reading message content. All cryptographic operations are deferred to the client's WASM layer (Phase 3).

Architecture at a Glance

Component Technology Port
GraphQL Federation Axum + Async-GraphQL 3017
WebSocket Relay Axum WS + Redis pub/sub 3018
Matrix Homeserver Conduit (Rust) 8448
Database PostgreSQL (ngwenya_uchat) โ€”
Matrix Bridge Ruma v0.14.1 (Application Service API) โ€”
Cache / Pub-Sub Redis โ€”

Core Concepts

Conversations

A Conversation represents a messaging thread. It may be tied to a specific Malet or exist independently for social interactions. Four types exist:

Type Use Case
DIRECT 1:1 DM between a Visitor and a Malet Owner (deduplicated โ€” reuses existing thread)
GROUP Multi-participant thread (e.g., team support)
SUPPORT Customer support ticket tied to a Murchase or inquiry
SOCIAL Visitor-to-Visitor chat โ€” "meeting at the mall"

Note: SOCIAL conversations do not require a maletId โ€” they can originate from the Lobby, a shared interest, or a review thread.

Meeting at the Mall โ€” Social Commerce

The SOCIAL conversation type enables the social layer of Mallnline. Just as people meet, chat, and share recommendations at a physical mall, Visitors can connect with each other through uChat. This powers several discovery mechanisms:

Feature Description
Lobby Chat Open chat rooms per Malet or Vertical โ€” like a food court seating area where Visitors can mingle
Interest Match "Others browsing this Malet right now" โ€” opt-in presence that lets you message someone
Review Threads Community Q&A participants can start private chats about shared interests
Wishlist Sharing Share a Collection link โ€” recipients can start a conversation about it

All social features are opt-in and privacy-first:

  • Visitors must enable "Allow messages from other Visitors" in their privacy settings
  • Rate-limited conversation requests prevent spam
  • Block/report mechanics are enforced at the platform level
  • Support Proxy aliases (already built) protect real identity until explicitly shared

E2EE by Design

The encrypted_body field on every Message contains opaque ciphertext. The server persists it but has zero knowledge of its contents. Only the sender and recipient WASM clients (powered by matrix-rust-sdk compiled to WebAssembly) hold the decryption keys.

Dual-Relay Architecture

Messages are delivered through two parallel relay paths for maximum reliability:

Relay Purpose Measured Latency (localhost)
Redis pub/sub โ†’ WebSocket Instant delivery to connected clients 25โ€“56ms (avg 42ms)
Matrix โ†’ Conduit Persistent E2EE relay for federation and offline delivery ~10ms

Benchmark: Run npx tsx scripts/bench-uchat-latency.ts to measure E2E latency across all delivery paths. On localhost, WS delivery averages 42ms end-to-end (faster than WhatsApp, Telegram, Slack, Discord, iMessage, and Messenger on their production networks).

When a message is sent via the sendMessage mutation:

  1. The E2EE ciphertext is persisted to PostgreSQL
  2. Redis relay: The message is PUBLISHed to both uchat:conv:{id} (per-conversation subscribers) and uchat:user:{participantId} for each participant except the sender (global WS subscribers)
  3. Matrix relay: The message is forwarded to Conduit via PUT /rooms/{roomId}/send/m.room.encrypted โ€” the returned matrix_event_id is persisted on the message entity

If the Matrix homeserver is unavailable, the Redis relay continues to function independently. Messages forwarded to Matrix are also relayed back to Redis via the AS Transaction handler, ensuring no messages are lost.


Matrix Application Service Bridge

The uchat subgraph acts as a privileged Matrix Application Service using the Ruma crate. With the Conduit homeserver deployed, it provides:

Identity Provisioning

When a Visitor or Malet Owner joins a conversation, the bridge automatically:

  1. Provisions a Matrix user: @uchat_{userId}:localhost via POST /_matrix/client/v3/register
  2. Stores the Matrix identity: The matrix_user_id field on the Participant entity
  3. Handles duplicates gracefully: If the user already exists (M_USER_IN_USE), the existing identity is reused

Room Management

When a conversation is created:

  1. Creates an encrypted room: POST /_matrix/client/v3/createRoom with Megolm encryption (m.megolm.v1.aes-sha2) enabled
  2. Invites participants: Each provisioned Matrix user is invited to the room
  3. Auto-joins participants: The AS masquerades as each user (?user_id=) to accept the invitation
  4. Stores the room mapping: The matrix_room_id is persisted on the Conversation entity

Event Forwarding

Encrypted message events are sent to Conduit using AS masquerading โ€” the AS sends the event as the sender's provisioned Matrix user, preserving correct attribution in the Matrix timeline.

Transaction Handler (Inbound Events)

Conduit pushes events to uChat via the Application Service Transaction API:

PUT /_matrix/app/v1/transactions/{txnId}
Authorization: Bearer <hs_token>

The handler:

  • Authenticates using the hs_token from the AS registration
  • Deduplicates via Redis SET NX with 24h TTL (backed by uchat_matrix_txn_log table)
  • Relays m.room.encrypted and m.room.message events to Redis pub/sub for WebSocket delivery
  • Maps Matrix room IDs back to conversation IDs for correct channel routing

Health Check

# Check Matrix bridge health
curl http://localhost:3017/matrix/health

Returns:

{
	"status": "healthy",
	"matrix_enabled": true,
	"homeserver": "http://localhost:8448",
	"supported_versions": ["r0.5.0", "r0.6.0", "v1.1", "...", "v1.12"]
}

GraphQL API

Queries

Fetch a Conversation

query GetConversation($id: ID!) {
	conversation(id: $id) {
		id
		maletId
		convType
		matrixRoomId
		createdAt
		updatedAt
		participants {
			id
			userId
			matrixUserId
			role
			joinedAt
		}
	}
}

List My Conversations (Cursor-Paginated)

query MyConversations($first: Int, $after: String) {
	myConversations(first: $first, after: $after) {
		edges {
			cursor
			node {
				id
				maletId
				convType
				updatedAt
			}
		}
		pageInfo {
			hasNextPage
			endCursor
		}
	}
}

Fetch Message History

query ConversationMessages($conversationId: ID!, $first: Int, $after: String) {
	conversationMessages(conversationId: $conversationId, first: $first, after: $after) {
		edges {
			cursor
			node {
				id
				senderId
				encryptedBody
				msgType
				matrixEventId
				createdAt
				editedAt
				editHistory
				fileUrl
				fileName
				fileSize
				fileMime
				reactions {
					id
					emoji
					senderId
				}
			}
		}
		pageInfo {
			hasNextPage
			endCursor
		}
	}
}

Mutations

Create a Conversation

mutation CreateConversation($input: CreateConversationInput!) {
	createConversation(input: $input) {
		id
		maletId
		matrixRoomId
		convType
		participants {
			userId
			matrixUserId
			role
		}
	}
}

When Matrix is enabled, this mutation also:

  • Provisions Matrix users for all participants
  • Creates an E2EE-enabled Matrix room
  • Invites and auto-joins all participants

Input Example (Malet DM):

{
	"input": {
		"maletId": "malet-uuid",
		"convType": "DIRECT",
		"participantIds": ["visitor-user-id", "owner-user-id"]
	}
}

Input Example (Social โ€” Visitor-to-Visitor):

{
	"input": {
		"maletId": null,
		"convType": "SOCIAL",
		"participantIds": ["other-visitor-id"]
	}
}

DM Deduplication: For DIRECT conversations, if a thread between the same two participants already exists, the existing conversation is returned instead of creating a duplicate.

Send a Message (E2EE Ciphertext)

mutation SendMessage($input: SendMessageInput!) {
	sendMessage(input: $input) {
		id
		conversationId
		senderId
		encryptedBody
		msgType
		matrixEventId
		createdAt
		fileUrl
		fileName
		fileSize
		fileMime
		replyToMessageId
	}
}

The encryptedBody is the E2EE ciphertext produced by the client's WASM crypto layer. The server stores it verbatim and forwards it to both Redis and Matrix.

For FILE messages, the encryptedBody contains a Megolm-encrypted JSON envelope with the AES-256-GCM key + IV needed to decrypt the file blob at fileUrl. The file metadata fields (fileUrl, fileName, fileSize, fileMime) are stored in plaintext for display in file cards โ€” they do not contain sensitive content. The encrypted file blob is stored separately in S3 via the media service. See the Crypto service for the encryption protocol.

Reply to a Message

To send a reply, include the optional replyToMessageId field in the input:

{
	"input": {
		"conversationId": "conv-uuid",
		"encryptedBody": "<ciphertext>",
		"replyToMessageId": "parent-message-uuid"
	}
}

The parent message is resolved via the repliedToMessage field on the returned Message object. If the parent message has been deleted, repliedToMessage.encryptedBody returns "This message was deleted" and msgType is masked to SYSTEM. If the parent message ID is invalid or the referenced row no longer exists (via ON DELETE SET NULL), repliedToMessage resolves to null.

The clientEventId field provides idempotency โ€” if the same clientEventId is sent twice, the server returns the existing message instead of creating a duplicate.

The matrixEventId is populated after the event is successfully forwarded to the Conduit homeserver.

Issue WebSocket Token

mutation IssueWsToken {
	issueWsToken {
		token
		expiresIn
	}
}

Returns a short-lived JWT (5 minutes) used to authenticate WebSocket connections.

Mark a Conversation as Read

mutation MarkAsRead($conversationId: ID!, $messageId: ID!) {
	markAsRead(conversationId: $conversationId, messageId: $messageId)
}

Updates last_read_message_id and last_read_at on the caller's participant record. Broadcasts a read_receipt event via Redis pub/sub to all other participants' WebSocket channels.

Delete a Message

mutation DeleteMessage($messageId: ID!, $conversationId: ID!) {
	deleteMessage(messageId: $messageId, conversationId: $conversationId)
}

Soft-deletes a message (sender-only). Sets deleted_at and deleted_by on the message row. Broadcasts a message_deleted event via Redis pub/sub. Subsequent queries return "[deleted]" as the encrypted_body and msg_type: "System".

Edit a Message

mutation EditMessage($messageId: ID!, $conversationId: ID!, $newBody: String!) {
	editMessage(messageId: $messageId, conversationId: $conversationId, newBody: $newBody) {
		id
		encryptedBody
		editedAt
		editHistory
	}
}

Updates the encrypted_body to newBody, sets edited_at to NOW(), and appends the old body to the edit_history JSONB array. Sender-only. Broadcasts a message_edited event via Redis pub/sub.

Add a Reaction

mutation AddReaction($messageId: ID!, $emoji: String!) {
	addReaction(messageId: $messageId, emoji: $emoji) {
		id
		messageId
		senderId
		emoji
	}
}

Adds or replaces the caller's reaction on a message. One reaction per user per message โ€” if the user already has a different emoji on this message, it is replaced atomically via ON CONFLICT (message_id, sender_id) DO UPDATE. If the same emoji is sent again, the reaction is toggled off (deleted). Broadcasts reaction_added or reaction_removed events to both uchat:conv:{id} and uchat:user:{pid} channels for real-time delivery to per-conversation and global WS clients.

Remove a Reaction

mutation RemoveReaction($messageId: ID!, $emoji: String!) {
	removeReaction(messageId: $messageId, emoji: $emoji)
}

Deletes the caller's reaction. Broadcasts a reaction_removed event to both WS channels.


WebSocket Real-Time Relay

The WebSocket endpoint runs on port 3018 and provides real-time message delivery via Redis pub/sub.

Connection Modes

Two subscription modes are supported:

Mode URL Redis Channel Use Case
Global (recommended) ws://host:3018/ws?token=<jwt> uchat:user:{userId} Single WS for all conversations
Per-conversation (legacy) ws://host:3018/ws?token=<jwt>&conversation_id=<uuid> uchat:conv:{convId} One WS per open conversation

Note: Global mode is recommended. The sendMessage mutation fans out to each participant's uchat:user:{id} channel, so a single WS connection receives messages from all conversations. The sender is excluded from the fan-out query to prevent echo.

Connection Flow

  1. Obtain a token: Call the issueWsToken mutation to get a 5-minute JWT.
  2. Connect: Open a WebSocket to ws://localhost:3018/ws?token=<jwt> (global mode) or ws://localhost:3018/ws?token=<jwt>&conversation_id=<uuid> (per-conversation mode).
  3. Receive: Incoming messages from other participants are pushed as JSON text frames.
  4. Heartbeat: Send "ping" text frames to refresh your presence TTL (30 seconds).

Sender Echo Prevention

  • Per-conversation mode: The WS relay filters messages where sender_id matches the connected user, preventing echo.
  • Global mode: No filtering is needed โ€” the mutation's fan-out query (WHERE user_id != sender) already excludes the sender from the publish list.

Message Frame Format

Messages received over WebSocket are JSON payloads. Messages may originate from either Redis (direct relay) or Matrix (AS transaction relay):

{
	"type": "new_message",
	"id": "message-uuid",
	"conversation_id": "conv-uuid",
	"sender_id": "user-id",
	"encrypted_body": "<E2EE ciphertext>",
	"msg_type": "Text",
	"created_at": "2026-04-06T12:00:00Z",
	"file_url": null,
	"file_name": null,
	"file_size": null,
	"file_mime": null,
	"reply_to_message_id": null
}

Messages relayed from Matrix include a "source": "matrix" field and the Matrix event_id.

Event Types

In addition to new message payloads, the WebSocket relay delivers several event types via the type field:

Event Type Payload Description
new_message Full message object New message in a conversation
read_receipt conversation_id, user_id, message_id Participant read up to this message
message_deleted conversation_id, message_id Message was soft-deleted by sender
message_edited conversation_id, message_id, encrypted_body, edited_at Message was edited by sender
typing conversation_id, user_id, state Typing indicator (start/stop)
reaction_added conversation_id, message_id, reaction Reaction added/replaced on message
reaction_removed conversation_id, message_id, sender_id, emoji Reaction removed from message

Presence Tracking

User presence is stored in Redis with a 30-second TTL:

  • uchat:presence:{userId} โ†’ "online"
  • Refreshed on each "ping" heartbeat
  • Removed on WebSocket disconnect

Database Schema

The uchat subgraph uses a dedicated PostgreSQL database (ngwenya_uchat) for full microservice isolation.

Table Description
uchat_conversations Conversation metadata, Malet association, Matrix room ID
uchat_participants User membership, Matrix ID, display_name, username, last_read_message_id, last_read_at
uchat_messages E2EE ciphertext, idempotency keys, Matrix event IDs, deleted_at, deleted_by, edited_at, edit_history (JSONB), file attachment metadata (file_url, file_name, file_size, file_mime), reply_to_message_id (FK to self, ON DELETE SET NULL)
uchat_reactions Emoji reactions per message, UNIQUE (message_id, sender_id) constraint โ€” one reaction per user
uchat_matrix_txn_log AS transaction deduplication log (24h TTL)

Migrations

File Description
001_create_conversations.sql Core conversations table
002_create_participants.sql User-conversation membership
003_create_messages.sql E2EE message storage
004_add_social_conversation_type.sql SOCIAL conversation type
005_add_matrix_txn_log.sql Matrix AS transaction dedup
006_add_participant_display_name.sql Identity denormalization (display_name, username)
007_add_read_receipts.sql last_read_message_id, last_read_at on participants
008_add_message_deleted.sql deleted_at, deleted_by on messages
009_add_file_message_type.sql File message type + file_url, file_name, file_size, file_mime columns
010_add_message_reactions.sql uchat_reactions table for emoji reactions per message
011_add_message_editing.sql edited_at timestamp + edit_history JSONB column on messages
012_one_reaction_per_user.sql Enforce UNIQUE (message_id, sender_id) โ€” one reaction per user per message
013_add_reply_to_message_id.sql reply_to_message_id UUID REFERENCES uchat_messages(id) ON DELETE SET NULL โ€” threading support
014_create_link_previews.sql SHA-256 URL-keyed link preview cache table with 24h TTL
015_add_voice_message_type.sql VOICE message type
016_add_search_indexes.sql Composite indexes on (conversation_id, sender_id, created_at) and (conversation_id, msg_type, created_at) for search metadata filtering

Conduit Matrix Homeserver

The platform deploys Conduit โ€” a lightweight, memory-safe Matrix homeserver written in Rust โ€” as the sovereign relay backend.

Configuration

Setting Value Purpose
server_name localhost Domain part of Matrix IDs (production: chat.mallnline.com)
database_backend rocksdb Embedded DB โ€” no external dependency
allow_registration true Required for AS user provisioning
allow_federation false Single-instance dev setup
Port 8448 (host) โ†’ 6167 (container) Matrix Client-Server API

Application Service Registration

The uchat AS registration (uchat-appservice.yaml) is auto-discovered by Conduit on startup:

Field Value
id uchat
sender_localpart uchat_bot
namespaces.users @uchat_.* (exclusive)
namespaces.aliases #uchat_.* (exclusive)

Environment Variables

Variable Required Description
DATABASE_URL โœ… PostgreSQL connection string for ngwenya_uchat
REDIS_URL โœ… Redis connection for pub/sub and presence
UCHAT_SERVICE_PORT โœ… GraphQL federation port (default: 3017)
UCHAT_WS_PORT โœ… WebSocket relay port (default: 3018)
WS_TOKEN_SECRET โœ… Secret for signing WebSocket JWT tokens
MATRIX_HOMESERVER_URL โœ… Conduit URL (http://localhost:8448)
MATRIX_AS_TOKEN โœ… Application Service token for Matrix
MATRIX_HS_TOKEN โœ… Homeserver token for Matrix (used for inbound transaction auth)

Note: Setting MATRIX_HOMESERVER_URL to an empty string falls back to standalone mode (Redis-only relay). All Matrix bridge calls gracefully no-op.


File Reference

File Purpose
apps/uchat/src/main.rs Dual-server entrypoint (GraphQL + WebSocket) with AS routes
apps/uchat/src/config.rs Environment configuration with matrix_enabled() toggle
apps/uchat/src/graphql/query.rs Query resolvers with participant authorization
apps/uchat/src/graphql/mutation.rs Mutations for CRUD, messaging, Matrix bridge wiring, Redis fan-out
apps/uchat/src/ws/mod.rs WebSocket handler with JWT auth, Redis relay, sender-filter logic
apps/uchat/src/services/matrix_bridge.rs Ruma-typed Matrix AS bridge (provision, invite, join, send, health)
apps/uchat/src/services/appservice_handler.rs AS Transaction API โ€” inbound event processing from Conduit
apps/uchat/src/models/pagination.rs Relay-compatible ConnectionCursor + @shareable PageInfo
apps/uchat/migrations/ SQLx migrations (conversations, participants, messages, txn log)
scripts/bench-uchat-latency.ts E2E latency benchmark for auth, HTTP, and WS delivery paths
infra/conduit/conduit.toml Conduit homeserver configuration
infra/conduit/uchat-appservice.yaml Matrix AS registration (auto-discovered by Conduit)

Phased Roadmap

Phase Status Description
Phase 1: Orchestrator โœ… Complete Rust subgraph, schema, WebSocket relay, Redis pub/sub, Matrix bridge stubs
Phase 2: Conduit Deployment โœ… Complete Conduit homeserver, live Ruma bridge, identity provisioning, dual-relay, AS transaction handler
Phase 3: Real-Time Identity Resolution โœ… Complete Global per-user WS channels, participant identity denormalization, gateway user header parsing
Phase 4: Messaging Delivery & Receipts โœ… Complete DM dedup, read receipts, soft-delete messages, delivery fix, integration test suite
Phase 4b: WS Stabilization โœ… Complete Gateway auth retry, sender-filter fix, polling fallback, 42ms avg WS delivery
Phase 4c: File Attachments โœ… Complete E2EE file attachments via Crypto service, 25 MB limit, drag-drop UI, MallIcon file cards
Phase 4d: Reactions & Editing โœ… Complete One-per-user reactions (toggle, replace), dual-channel WS broadcast, optimistic UI, message editing with history. 7 Playwright E2E tests
Phase 4e: Link Previews โœ… Complete Server-side OG scraping via scraper crate, SHA-256 cached 24h, Redis rate limiting, preview cards below URL messages
Phase 4f: Message Search โœ… Complete Hybrid E2EE message search โ€” client-side content search via IndexedDB + in-memory Map, server-side metadata filtering via search_messages resolver
Phase 5: WASM Client ๐Ÿ“‹ Planned Bundle matrix-rust-sdk into uchat.js WASM, implement E2EE in browser
Phase 6: Standalone Apps ๐Ÿ“‹ Planned Dedicated mobile and desktop clients for uChat as a standalone product

Testing

Unit Tests

cd apps/uchat && cargo test

Runs 14 unit tests + 23 integration tests covering:

  • Matrix bridge standalone fallback (provision, create, invite, join, send, health)
  • Server name extraction (HTTP + HTTPS)
  • AS transaction body deserialization
  • Platform user ID extraction from Matrix sender IDs
  • Conversation lifecycle (create, DM dedup, list)
  • Message delivery (send, pagination, WebSocket relay)
  • Read receipts (markAsRead, last_read tracking)
  • Message deletion (soft-delete, content substitution, sender-only guard)

Manual Verification

# Health check
curl http://localhost:3017/health

# Matrix bridge health
curl http://localhost:3017/matrix/health

# Federation introspection
curl -s -X POST http://localhost:3017/graphql \
  -H 'Content-Type: application/json' \
  -d '{"query":"{_service{sdl}}"}'

# Conduit versions (direct)
curl http://localhost:8448/_matrix/client/versions

# WebSocket test (after obtaining a token via issueWsToken)
websocat ws://localhost:3018/ws?token=<jwt>&conversation_id=<uuid>