JSON in GraphQL Subscriptions: graphql-ws, AsyncIterator, SSE & Apollo Client
Last updated:
GraphQL subscriptions stream JSON events to clients over a persistent connection — each event arrives as a JSON payload matching the subscription's selection set. The graphql-ws subprotocol encodes all messages as JSON: the client sends {"type":"subscribe","id":"1","payload":{"query":"subscription { messageAdded { id text } }"}} and the server responds with {"type":"next","id":"1","payload":{"data":{"messageAdded":{"id":"5","text":"hello"}}}} for each new event. Subscription resolvers return an AsyncIterator or AsyncGenerator — each yield delivers one JSON payload to the subscribed client. graphql-yoga and Apollo Server support subscriptions with useServer() from the graphql-ws library. For read-only subscriptions, graphql-sse uses Server-Sent Events instead of WebSocket — simpler infrastructure with automatic reconnection. Apollo Client's useSubscription hook receives each JSON payload as { data, loading, error }. This guide covers the graphql-ws protocol message format, subscription resolver with AsyncGenerator, graphql-yoga setup, SSE alternative, and Apollo Client subscription consumption.
graphql-ws Protocol: JSON Message Format
The graphql-ws library implements the graphql-transport-ws subprotocol — a structured JSON message protocol layered on top of WebSocket. Every message is a JSON object with a type field and an optional id (for per-subscription correlation) and payload. The protocol defines 8 message types across 2 directions, governing the full lifecycle from connection initialization to event delivery and graceful teardown.
// ── graphql-transport-ws message flow ─────────────────────────
//
// Client → Server Server → Client
// ───────────────────────────── ──────────────────────────────
// ConnectionInit ConnectionAck
// Subscribe (id + payload) Next (id + payload)
// Complete (id) Error (id + payload)
// Ping Complete (id)
// Ping / Pong
// ── 1. Client sends ConnectionInit ────────────────────────────
// Optional payload for authentication tokens
{
"type": "connection_init",
"payload": {
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
}
}
// ── 2. Server responds with ConnectionAck ─────────────────────
{ "type": "connection_ack" }
// ── 3. Client sends Subscribe ─────────────────────────────────
// id is client-generated, unique per subscription on this connection
{
"type": "subscribe",
"id": "1",
"payload": {
"query": "subscription OnMessageAdded($roomId: ID!) { messageAdded(roomId: $roomId) { id text author createdAt } }",
"variables": { "roomId": "room-42" },
"operationName": "OnMessageAdded"
}
}
// ── 4. Server streams Next messages ───────────────────────────
// One JSON frame per subscription event — data matches the selection set
{
"type": "next",
"id": "1",
"payload": {
"data": {
"messageAdded": {
"id": "msg-5",
"text": "Hello, world!",
"author": "alice",
"createdAt": "2026-05-28T10:00:00.000Z"
}
}
}
}
// ── 5. Server sends Error (GraphQL errors, not transport errors) ─
{
"type": "error",
"id": "1",
"payload": [
{
"message": "You do not have permission to subscribe to this room",
"locations": [{ "line": 1, "column": 14 }],
"path": ["messageAdded"],
"extensions": { "code": "FORBIDDEN" }
}
]
}
// ── 6. Client or server sends Complete ────────────────────────
// Client → server: unsubscribe from this id
// Server → client: subscription stream ended (resolver completed)
{ "type": "complete", "id": "1" }
// ── WebSocket close codes (server-initiated) ──────────────────
// 4400 Bad Request — invalid message received
// 4401 Unauthorized — onConnect returned false
// 4408 Request Timeout — ConnectionInit not received within timeout
// 4409 Subscriber for <id> already exists
// 4429 Too Many Initialisation RequestsThe id field is client-generated and must be unique per active subscription on a single WebSocket connection. This allows multiplexing — a single connection can carry dozens of concurrent subscriptions, each identified by its id. When the client sends a complete message with a subscription's id, the server stops the corresponding AsyncIterator by calling iterator.return(). The server can also initiate a complete by sending its own complete message, signaling that the subscription stream has naturally ended. Unlike HTTP responses, WebSocket frames have no inherent size limit — a single subscription event can be any size, though large payloads (over 1 MB) should be paginated or broken into multiple events.
Subscription Resolvers with AsyncGenerator
A GraphQL subscription resolver is a JavaScript object with a subscribe function (returns the AsyncIterator) and an optional resolve function (transforms each iterator value before JSON serialization). The subscribe function receives the same arguments as a query resolver: (parent, args, context, info). Each value yielded by the iterator must be an object whose key matches the subscription field name.
import { PubSub, withFilter } from 'graphql-subscriptions'
// ── In-memory PubSub for single-server deployments ─────────────
// For multi-server production, use graphql-redis-subscriptions
const pubsub = new PubSub()
// ── Subscription resolver with PubSub ─────────────────────────
export const subscriptionResolvers = {
Subscription: {
messageAdded: {
// subscribe: returns an AsyncIterator
// args.roomId comes from the client's subscription variables JSON
subscribe: withFilter(
() => pubsub.asyncIterableIterator(['MESSAGE_ADDED']),
// Filter: only deliver events for the subscribed room
(payload, variables) =>
payload.messageAdded.roomId === variables.roomId,
),
// resolve: optional — transform payload before sending JSON
resolve: (payload: { messageAdded: Message }) => payload.messageAdded,
},
},
}
// ── Publishing from a mutation resolver ───────────────────────
export const mutationResolvers = {
Mutation: {
sendMessage: async (_: unknown, args: { roomId: string; text: string }, ctx: Context) => {
const message = await ctx.db.message.create({
data: {
roomId: args.roomId,
text: args.text,
author: ctx.user.name,
},
})
// Publish — triggers all matching subscribers
// The payload key must match the subscription field name
await pubsub.publish('MESSAGE_ADDED', { messageAdded: message })
return message
},
},
}
// ── Plain AsyncGenerator (no PubSub) ──────────────────────────
// Useful for server-push patterns: counters, polls, live data
async function* countdown(from: number) {
for (let i = from; i >= 0; i--) {
yield { countdown: i }
await new Promise(resolve => setTimeout(resolve, 1000))
}
}
export const countdownResolvers = {
Subscription: {
countdown: {
subscribe: (_: unknown, args: { from: number }) => countdown(args.from),
// No resolve needed — yielded shape matches the field type directly
},
},
}
// ── TypeScript: subscription payload types ────────────────────
type Message = {
id: string
roomId: string
text: string
author: string
createdAt: string
}
// The yielded value must have the subscription field name as key
// Each yield → one "next" JSON message to the client
// return / throw → "complete" or "error" JSON message to the clientThe resolve function runs after subscribe yields a value and before the payload is JSON-serialized and sent. This is where field-level transforms belong: converting database timestamps to ISO strings, removing internal fields, or applying field-level authorization. If resolve is omitted, the yielded value is serialized as-is — the object must already match the GraphQL type's expected shape. The withFilter() wrapper from graphql-subscriptions adds per-event filtering: returning false skips the event entirely (no JSON frame sent to that client), returning true allows it through to the resolve stage.
graphql-yoga Server Setup for Subscriptions
graphql-yoga supports subscriptions out of the box — no separate useServer() call or WebSocket server configuration is required for Node.js HTTP servers. The yoga instance handles WebSocket upgrades internally by attaching to the http.Server's upgrade event. Define your schema with subscription types and provide resolvers that return AsyncGenerators. Yoga's built-in createPubSub() is a TypeScript-typed, in-memory pub/sub primitive suitable for single-server deployments.
import { createServer } from 'node:http'
import { createYoga, createPubSub } from 'graphql-yoga'
import { makeExecutableSchema } from '@graphql-tools/schema'
// ── Type-safe PubSub — maps topic names to payload types ───────
const pubsub = createPubSub<{
MESSAGE_ADDED: [{ messageAdded: { id: string; text: string; author: string } }]
USER_ONLINE: [{ userOnline: { userId: string; online: boolean } }]
}>()
// ── Schema with subscription type ─────────────────────────────
const typeDefs = /* GraphQL */ `
type Message { id: ID! text: String! author: String! createdAt: String! }
type UserStatus { userId: ID! online: Boolean! }
type Query { messages: [Message!]! }
type Mutation {
sendMessage(roomId: ID!, text: String!): Message!
}
type Subscription {
messageAdded(roomId: ID!): Message!
userOnline(userId: ID!): UserStatus!
}
`
const resolvers = {
Query: {
messages: () => [],
},
Mutation: {
sendMessage: async (_: unknown, args: { roomId: string; text: string }, ctx: any) => {
const message = { id: Date.now().toString(), text: args.text, author: ctx.user ?? 'anon', createdAt: new Date().toISOString() }
// Publish triggers all matching subscribers immediately
pubsub.publish('MESSAGE_ADDED', { messageAdded: message })
return message
},
},
Subscription: {
messageAdded: {
subscribe: (_: unknown, args: { roomId: string }) =>
pubsub.subscribe('MESSAGE_ADDED'),
resolve: (payload: { messageAdded: { id: string; text: string; author: string; createdAt?: string } }) =>
payload.messageAdded,
},
userOnline: {
subscribe: () => pubsub.subscribe('USER_ONLINE'),
resolve: (payload: { userOnline: { userId: string; online: boolean } }) =>
payload.userOnline,
},
},
}
const schema = makeExecutableSchema({ typeDefs, resolvers })
// ── graphql-yoga instance ──────────────────────────────────────
const yoga = createYoga({
schema,
// graphql-yoga adds WebSocket upgrade handling automatically
// in Node.js — no useServer() or ws.Server needed
context: (req) => ({
// Extract user from Authorization header for both HTTP and WS
user: req.request.headers.get('x-user') ?? null,
}),
})
// ── Attach to http.Server ─────────────────────────────────────
const server = createServer(yoga)
server.listen(4000, () => {
console.log('GraphQL server running at http://localhost:4000/graphql')
console.log('Subscriptions: ws://localhost:4000/graphql (graphql-transport-ws)')
})
// ── Client WebSocket message flow (for reference) ─────────────
// → {"type":"connection_init"}
// ← {"type":"connection_ack"}
// → {"type":"subscribe","id":"1","payload":{"query":"subscription { messageAdded(roomId: \"room-1\") { id text author } }"}}
// ← {"type":"next","id":"1","payload":{"data":{"messageAdded":{"id":"1748...","text":"hello","author":"alice"}}}}
// → {"type":"complete","id":"1"} ← client unsubscribesFor Next.js App Router deployments, graphql-yoga's HTTP handler exports as GET and POST route handlers directly. WebSocket upgrades require a custom server (server.js) or a separate WebSocket server running alongside the Next.js dev server — Next.js App Router does not support WebSocket upgrades natively. The recommended pattern for Next.js with real-time subscriptions is to run a separate graphql-yoga or Apollo Server process on a different port and configure Apollo Client's split link to route subscriptions there.
Apollo Server Subscriptions with useServer
Apollo Server 4 does not include built-in WebSocket handling — subscriptions require the graphql-ws library's useServer() function applied to a ws.WebSocketServer instance. This explicit wiring gives full control over the WebSocket server configuration: max connections, perMessageDeflate compression, path routing, and the onConnect/onDisconnect lifecycle callbacks used for authentication and cleanup.
import { ApolloServer } from '@apollo/server'
import { expressMiddleware } from '@apollo/server/express4'
import { ApolloServerPluginDrainHttpServer } from '@apollo/server/plugin/drainHttpServer'
import { makeExecutableSchema } from '@graphql-tools/schema'
import { WebSocketServer } from 'ws'
import { useServer } from 'graphql-ws/lib/use/ws'
import express from 'express'
import { createServer } from 'node:http'
import cors from 'cors'
import bodyParser from 'body-parser'
const schema = makeExecutableSchema({ typeDefs, resolvers }) // your schema
const app = express()
const httpServer = createServer(app)
// ── 1. Create WebSocket server ─────────────────────────────────
const wsServer = new WebSocketServer({
server: httpServer, // share the same port as HTTP
path: '/graphql', // subscriptions endpoint
})
// ── 2. Apply graphql-ws handler ────────────────────────────────
// Returns a cleanup function to gracefully drain connections on shutdown
const serverCleanup = useServer(
{
schema,
// onConnect: runs on ConnectionInit — return false to reject
onConnect: async (ctx) => {
const token = (ctx.connectionParams as { Authorization?: string })?.Authorization
?.replace('Bearer ', '')
if (!token) {
// Returning false sends ws close code 4401 to the client
return false
}
const user = await verifyJwt(token)
if (!user) return false
// Returned object is merged into ctx.extra.user (not context)
return { userId: user.id }
},
// context: runs on every Subscribe message — build per-operation context
context: async (ctx) => ({
user: (ctx.extra as { userId?: string }).userId
? await db.user.findUnique({ where: { id: (ctx.extra as { userId?: string }).userId } })
: null,
pubsub,
db,
}),
onDisconnect: (ctx, code, reason) => {
console.log(`Client disconnected: ${code} ${reason}`)
},
},
wsServer,
)
// ── 3. Create Apollo Server ────────────────────────────────────
const apolloServer = new ApolloServer({
schema,
plugins: [
ApolloServerPluginDrainHttpServer({ httpServer }),
// Gracefully shut down the WebSocket server when Apollo shuts down
{
async serverWillStart() {
return {
async drainServer() {
await serverCleanup.dispose()
},
}
},
},
],
})
await apolloServer.start()
// ── 4. Apply HTTP middleware ────────────────────────────────────
app.use(
'/graphql',
cors<cors.CorsRequest>(),
bodyParser.json(),
expressMiddleware(apolloServer, {
context: async ({ req }) => ({
user: await getUserFromRequest(req),
db,
pubsub,
}),
}),
)
// ── 5. Start server ────────────────────────────────────────────
httpServer.listen(4000, () => {
console.log('HTTP: http://localhost:4000/graphql')
console.log('WS: ws://localhost:4000/graphql (graphql-transport-ws)')
})
// ── Declaring verifyJwt and other helpers ─────────────────────
async function verifyJwt(token: string): Promise<{ id: string } | null> {
try {
const payload = await jose.jwtVerify(token, JWT_SECRET)
return { id: payload.payload.sub as string }
} catch {
return null
}
}
async function getUserFromRequest(req: express.Request) {
const auth = req.headers.authorization?.replace('Bearer ', '')
if (!auth) return null
return verifyJwt(auth)
}The key difference from graphql-yoga's integrated approach is that Apollo Server 4 requires explicit WebSocket server creation and useServer() wiring. The upside is full control: you can apply rate limiting at the WebSocket server level, configure perMessageDeflate for payload compression (30–70% reduction on text JSON), set maxPayload to reject oversized messages, and run multiple WebSocket servers on different paths. The onConnect authentication runs once per connection (ConnectionInit), not once per subscription — make it fast by caching the verified user object in ctx.extra and looking it up in the context function.
SSE Alternative with graphql-sse
graphql-sse implements GraphQL subscriptions over Server-Sent Events (SSE) rather than WebSocket. The client opens a standard HTTP POST request; the server streams text/event-stream responses. Each event is a JSON object containing the familiar GraphQL { data, errors } envelope. SSE events use one HTTP connection per subscription (no multiplexing), but eliminate WebSocket upgrade complexity and work through HTTP/2, standard load balancers, and corporate firewalls that block WebSocket traffic.
// npm install graphql-sse
import { createHandler } from 'graphql-sse/lib/adapters/node'
import { makeExecutableSchema } from '@graphql-tools/schema'
import { createServer } from 'node:http'
const schema = makeExecutableSchema({ typeDefs, resolvers })
// ── graphql-sse handler ────────────────────────────────────────
const handler = createHandler({
schema,
context: async (req) => ({
// Parse Authorization header for HTTP-based auth (cookies work too)
user: await getUserFromRequest(req.raw),
pubsub,
db,
}),
})
// ── Attach to http.Server ─────────────────────────────────────
const server = createServer((req, res) => {
if (req.url?.startsWith('/graphql/stream')) {
return handler(req, res)
}
// Handle regular GraphQL queries/mutations separately
res.writeHead(404)
res.end()
})
server.listen(4000)
// ── SSE wire format (text/event-stream) ───────────────────────
// Each event is separated by a blank line; data is JSON
//
// event: next
// data: {"data":{"messageAdded":{"id":"5","text":"hello"}}}
//
// event: complete
// data:
//
// ── Client: native EventSource API (browser) ──────────────────
// EventSource does not support POST or custom headers — use fetch
// graphql-sse ships a client that uses fetch for POST-based SSE
import { createClient } from 'graphql-sse'
const client = createClient({
url: 'http://localhost:4000/graphql/stream',
headers: () => ({
Authorization: `Bearer ${localStorage.getItem('token')}`,
}),
})
// Subscribe — returns an unsubscribe function
const unsubscribe = client.subscribe(
{
query: `subscription OnMessageAdded($roomId: ID!) {
messageAdded(roomId: $roomId) { id text author }
}`,
variables: { roomId: 'room-42' },
},
{
// next: called for each JSON event payload
next: ({ data }) => console.log('New message:', data?.messageAdded),
error: (err) => console.error('Subscription error:', err),
complete: () => console.log('Subscription complete'),
},
)
// Unsubscribe after 60 seconds
setTimeout(unsubscribe, 60_000)
// ── SSE vs WebSocket: when to choose ─────────────────────────
// SSE (graphql-sse):
// ✓ No WebSocket proxy config — works through nginx, CDN, AWS ALB
// ✓ Automatic reconnect via browser EventSource or graphql-sse client
// ✓ HTTP/2 multiplexing in supported environments
// ✗ One HTTP connection per subscription (no id-based muxing)
// ✗ Server → client only (heartbeats must use polling or app-level ping)
//
// WebSocket (graphql-ws):
// ✓ Full-duplex — client can send Ping, Complete, multiple Subscribe
// ✓ Multiplex N subscriptions over 1 connection with id field
// ✓ Lower overhead per event (no HTTP headers per event)
// ✗ Requires WebSocket-aware proxy/load balancer config
// ✗ Browser EventSource not applicable — needs ws/graphql-ws clientFor production SSE deployments, configure your load balancer to disable response buffering for the /graphql/stream path — nginx needs proxy_buffering off, AWS ALB needs proxy_read_timeout set above the subscription lifetime, and Cloudflare needs the endpoint excluded from caching. SSE connections hold an open HTTP response — each active subscription counts against the server's open file descriptor limit (ulimit -n), typically 65,535 on Linux by default.
Apollo Client useSubscription Hook
Apollo Client consumes GraphQL subscriptions in React via the useSubscription hook. The hook requires a GraphQLWsLink configured in the Apollo Client instance and a split link that routes subscription operations to the WebSocket transport. Each new subscription event triggers a re-render with the updated data value — it is the latest payload, not a running array of all events.
// npm install @apollo/client graphql-ws
import {
ApolloClient, InMemoryCache, HttpLink, split,
ApolloProvider, gql, useSubscription, useQuery,
} from '@apollo/client'
import { GraphQLWsLink } from '@apollo/client/link/subscriptions'
import { getMainDefinition } from '@apollo/client/utilities'
import { createClient } from 'graphql-ws'
// ── 1. Create the WebSocket link ───────────────────────────────
const wsLink = new GraphQLWsLink(
createClient({
url: 'ws://localhost:4000/graphql',
connectionParams: () => ({
// Sent as the ConnectionInit payload — server reads this for auth
Authorization: `Bearer ${localStorage.getItem('token')}`,
}),
// Reconnect on disconnect with exponential backoff
retryAttempts: 5,
on: {
connected: () => console.log('WS connected'),
closed: () => console.log('WS closed'),
error: (err) => console.error('WS error', err),
},
})
)
// ── 2. Create the HTTP link for queries and mutations ──────────
const httpLink = new HttpLink({ uri: 'http://localhost:4000/graphql' })
// ── 3. Split: subscriptions → WS, everything else → HTTP ──────
const splitLink = split(
({ query }) => {
const definition = getMainDefinition(query)
return (
definition.kind === 'OperationDefinition' &&
definition.operation === 'subscription'
)
},
wsLink,
httpLink,
)
// ── 4. Apollo Client instance ──────────────────────────────────
const client = new ApolloClient({
link: splitLink,
cache: new InMemoryCache(),
})
// ── 5. Subscription document ──────────────────────────────────
const MESSAGE_ADDED_SUB = gql`
subscription OnMessageAdded($roomId: ID!) {
messageAdded(roomId: $roomId) {
id
text
author
createdAt
}
}
`
// ── 6. useSubscription hook ────────────────────────────────────
// Re-renders on every new JSON event from the server
function LiveChat({ roomId }: { roomId: string }) {
const { data, loading, error } = useSubscription(MESSAGE_ADDED_SUB, {
variables: { roomId },
// onData: side-effect callback without re-render (Apollo Client 3.7+)
onData: ({ client, data: { data: subscriptionData } }) => {
// Write the new message directly into Apollo's cache
// so that useQuery('GET_MESSAGES') sees it immediately
client.cache.modify({
fields: {
messages(existing = []) {
const newRef = client.cache.writeFragment({
data: subscriptionData?.messageAdded,
fragment: gql`fragment NewMsg on Message { id text author createdAt }`,
})
return [...existing, newRef]
},
},
})
},
})
if (loading) return <p>Connecting to live chat...</p>
if (error) return <p>Error: {error.message}</p>
// data.messageAdded is the LATEST message — not all messages
// Use onData + cache.modify above to accumulate messages
return (
<div>
<p>Latest: {data?.messageAdded?.text} — {data?.messageAdded?.author}</p>
</div>
)
}
// ── 7. subscribeToMore — add live updates to an existing useQuery ─
function ChatRoom({ roomId }: { roomId: string }) {
const { data, subscribeToMore } = useQuery(GET_MESSAGES, { variables: { roomId } })
React.useEffect(() => {
// subscribeToMore returns an unsubscribe function
const unsubscribe = subscribeToMore({
document: MESSAGE_ADDED_SUB,
variables: { roomId },
updateQuery: (prev, { subscriptionData }) => {
if (!subscriptionData.data) return prev
const newMsg = subscriptionData.data.messageAdded
return { messages: [...(prev.messages ?? []), newMsg] }
},
})
return unsubscribe
}, [roomId])
return <MessageList messages={data?.messages ?? []} />
}
export default function App() {
return (
<ApolloProvider client={client}>
<ChatRoom roomId="room-42" />
</ApolloProvider>
)
}The onData callback (Apollo Client 3.7+) fires on every subscription event without triggering an additional re-render from the hook itself — useful for writing events into the Apollo cache so that related useQuery results update reactively. subscribeToMore is the classic pattern for combining an initial query with a live subscription: the query loads historical data, and subscribeToMore appends new events as they arrive. Apollo's InMemoryCache normalizes subscription payloads by __typename + id — if the server returns a Message type with a unique id, the cache automatically updates any cached query result that references the same message, keeping the UI consistent with zero extra code.
Authentication and Filtering JSON Subscription Events
Securing GraphQL subscriptions requires two layers: connection-level authentication (verify the user can open a subscription at all) and field-level authorization (filter or redact events based on the user's permissions). Connection-level auth runs in the onConnect callback and completes before any Subscribe messages are processed. Field-level auth runs in the withFilter() predicate or inside the subscribe resolver itself.
import { withFilter } from 'graphql-subscriptions'
import { useServer } from 'graphql-ws/lib/use/ws'
// ── useServer: onConnect for connection-level auth ─────────────
const serverCleanup = useServer(
{
schema,
// onConnect: runs on ConnectionInit JSON message
// connectionParams comes from the client's ConnectionInit payload
onConnect: async (ctx) => {
const params = ctx.connectionParams as { Authorization?: string }
const token = params?.Authorization?.replace('Bearer ', '')
if (!token) {
console.warn('No token — rejecting WS connection')
return false // sends close code 4401 to client
}
try {
const user = await verifyJwt(token)
if (!user) return false
// Store user in ctx.extra — accessible in context() per Subscribe
;(ctx.extra as { user?: { id: string; role: string } }).user = user
return true
} catch {
return false
}
},
// context: runs per Subscribe operation — has access to ctx.extra
context: async (ctx) => {
const extraUser = (ctx.extra as { user?: { id: string; role: string } }).user
return {
user: extraUser,
db,
pubsub,
}
},
},
wsServer,
)
// ── withFilter: per-event field-level filtering ────────────────
export const subscriptionResolvers = {
Subscription: {
// Deliver order updates only to the order's owner
orderUpdated: {
subscribe: withFilter(
() => pubsub.asyncIterableIterator(['ORDER_UPDATED']),
async (
payload: { orderUpdated: { id: string; userId: string; status: string } },
variables: { orderId: string },
context: { user: { id: string; role: string } | null }
) => {
if (!context.user) return false
// Admin sees all order updates
if (context.user.role === 'ADMIN') return true
// Regular users see only their own orders
return payload.orderUpdated.userId === context.user.id
&& payload.orderUpdated.id === variables.orderId
},
),
},
// Field-level redaction: remove sensitive fields for non-admins
auditEvent: {
subscribe: withFilter(
() => pubsub.asyncIterableIterator(['AUDIT_EVENT']),
(_: unknown, __: unknown, ctx: { user: { role: string } | null }) =>
ctx.user?.role === 'ADMIN', // Only admins receive audit events
),
resolve: (
payload: { auditEvent: AuditEvent },
_: unknown,
ctx: { user: { role: string } | null }
) => {
const event = payload.auditEvent
// Strip IP and user agent for non-super-admin users
if (ctx.user?.role !== 'SUPER_ADMIN') {
const { ipAddress, userAgent, ...safeEvent } = event
return safeEvent
}
return event
},
},
},
}
// ── Per-room subscription channels (Redis pattern) ─────────────
// Instead of withFilter on a global channel, publish to specific channels
// to avoid per-event CPU overhead at high subscriber counts
import { RedisPubSub } from 'graphql-redis-subscriptions'
import Redis from 'ioredis'
const redisPubSub = new RedisPubSub({
publisher: new Redis({ host: 'redis', port: 6379 }),
subscriber: new Redis({ host: 'redis', port: 6379 }),
})
export const roomResolvers = {
Subscription: {
messageAdded: {
// Subscribe to room-specific Redis channel — no withFilter needed
// Channel: "message:room-{roomId}"
subscribe: (_: unknown, args: { roomId: string }) =>
redisPubSub.asyncIterableIterator(`message:room:${args.roomId}`),
},
},
Mutation: {
sendMessage: async (_: unknown, args: { roomId: string; text: string }, ctx: any) => {
const message = await ctx.db.message.create({ data: { roomId: args.roomId, text: args.text, author: ctx.user.name } })
// Publish to the room's specific channel
await redisPubSub.publish(`message:room:${args.roomId}`, { messageAdded: message })
return message
},
},
}The Redis pub/sub channel-per-room pattern is the most scalable approach: instead of publishing all messages to one MESSAGE_ADDED channel and filtering client-side, publish to message:room:${'{roomId}'}. Each subscriber connects only to their specific channel — zero per-event filter CPU cost regardless of the number of total subscribers or rooms. Redis pub/sub supports up to 10,000 concurrent subscribers per channel with no throughput degradation, making this pattern production-ready for real-time chat, live dashboards, and collaborative tools with millions of concurrent users across multiple server instances.
Key Terms
- graphql-ws
- An npm library that implements the graphql-transport-ws WebSocket subprotocol for GraphQL subscriptions. Despite sharing a name with the older subscriptions-transport-ws protocol, the two are different and incompatible. The library provides both a client (
createClient()) and server adapters (useServer()) for ws, uWebSockets, Bun, Deno, and other runtimes. Every message in the protocol is a JSON object with atypefield; eight message types govern the full connection lifecycle from ConnectionInit to Complete. Apollo Server 4, graphql-yoga, and Mercurius all use this library internally for their subscription support. - AsyncIterator
- A JavaScript protocol object with a
Symbol.asyncIteratormethod that returns an object with anext()method. Callingnext()returns aPromise<{ value, done }>. GraphQL subscription resolvers must return anAsyncIterator(orAsyncGenerator, which automatically implements the protocol). The GraphQL execution engine callsnext()repeatedly; each resolved value becomes one subscription event, JSON-serialized into a graphql-wsnextmessage. Whendoneistrueor the client sends acompletemessage, the engine callsiterator.return()to clean up resources. - PubSub
- A publish-subscribe system that decouples event producers (mutations) from event consumers (subscription resolvers). The
graphql-subscriptionspackage provides an in-memoryPubSubbacked by Node.jsEventEmitter— suitable for single-server development but unable to span multiple server instances. Production deployments usegraphql-redis-subscriptions(backed by Redis pub/sub),graphql-postgres-subscriptions(backed by PostgreSQL LISTEN/NOTIFY), or managed services like AWS SNS. The PubSub pattern separates the event source (a mutation callingpubsub.publish()) from the subscription resolver (callingpubsub.asyncIterableIterator()), keeping resolvers stateless. - withFilter
- A higher-order function from
graphql-subscriptionsthat wraps anAsyncIteratorto add per-event filtering. It takes two arguments: a subscribe function that returns the base iterator, and a filter function that receives each event payload and the client's subscription variables. If the filter returnsfalse(orPromise<false>), the event is silently discarded — no JSON frame is sent to that client. If it returnstrue, the event proceeds to theresolvefunction and then to the client.withFilter()is the standard mechanism for room-scoped or user-scoped subscriptions in single-server PubSub setups. - graphql-sse
- A library implementing GraphQL subscriptions over Server-Sent Events (SSE), an HTTP/1.1 feature where the server streams
text/event-streamresponses. Unlike WebSocket, SSE is unidirectional (server to client only), uses standard HTTP, and reconnects automatically via the browser'sEventSourceAPI or the graphql-sse client. Each GraphQL subscription event is sent as an SSE event with a JSON data payload. graphql-sse supports two modes: "distinct connections mode" (one HTTP request per subscription, simpler auth) and "single connection mode" (one HTTP request for all subscriptions, similar to WebSocket multiplexing). SSE works through all HTTP-aware load balancers and proxies without special configuration. - useSubscription
- The React hook in Apollo Client for consuming GraphQL subscriptions. It accepts a subscription document, optional variables, and options including
skip(pause the subscription),onData(side-effect callback), andfetchPolicy. The hook returns{ data, loading, error }—datais always the most recent event payload, not an accumulated history. Apollo Client opens the WebSocket connection and sends the subscribe message when the component mounts; it sends a complete message when the component unmounts, and the server stops the correspondingAsyncIterator. The hook re-renders the component on every new JSON event received from the server.
FAQ
How do GraphQL subscriptions send JSON data?
GraphQL subscriptions send JSON over a persistent WebSocket connection using the graphql-ws subprotocol (or the older subscriptions-transport-ws protocol). Every message — connection init, subscribe, next, complete, and error — is a JSON object sent as a WebSocket text frame. When the server emits a new subscription event, it sends a message with type: "next", an id matching the original subscribe message, and a payload containing the familiar GraphQL { data, errors } envelope: {"type":"next","id":"1","payload":{"data":{"messageAdded":{"id":"42","text":"hello"}}}}. The client parses this JSON text frame and delivers the data object to the application layer. Each event arrives independently — there is no batching by default. The round-trip latency from server event to client data is typically under 5 ms on a local network, making subscriptions suitable for real-time dashboards, chat, live scores, and collaborative editing.
What is the graphql-ws protocol?
graphql-ws is a community-maintained WebSocket subprotocol for GraphQL subscriptions, identified by the subprotocol string "graphql-transport-ws". It replaced the older "subscriptions-transport-ws" subprotocol (confusingly, the npm package names are swapped). The protocol defines 8 JSON message types: ConnectionInit (client sends optional auth payload), ConnectionAck (server confirms), Ping / Pong (keepalive), Subscribe (client sends the GraphQL query as a JSON payload), Next (server sends one event), Error (server sends a GraphQL errors array), and Complete (either side signals end). The server closes the WebSocket with code 4400 for invalid messages, 4401 for unauthorized connections (when onConnect returns false), and 4429 if too many initialization requests arrive within 1 second. Apollo Server 4, graphql-yoga, and Mercurius all support the graphql-transport-ws subprotocol via useServer() from the graphql-ws library.
How do I write a subscription resolver in TypeScript?
A subscription resolver returns an AsyncIterator or AsyncGenerator from its subscribe function. Each value yielded by the generator becomes one JSON payload delivered to the subscribed client. The resolver object has two properties: subscribe (mandatory, returns the AsyncIterator) and resolve (optional, transforms each iterator value before serialization). With PubSub: subscribe: withFilter(() => pubsub.asyncIterableIterator(["MESSAGE_ADDED"]), (payload, variables) => payload.messageAdded.roomId === variables.roomId). With a plain AsyncGenerator: async function* subscribe() { while (true) { yield { countdown: count-- }; await delay(1000); } }. TypeScript types the subscribe return as AsyncIterator<TPayload> where TPayload must have the subscription field name as a key. The resolve function receives each yielded value and can transform or redact fields — only the resolve return value is JSON-serialized and sent to the client.
What is the difference between GraphQL subscriptions and SSE?
GraphQL subscriptions over WebSocket (graphql-ws) and over Server-Sent Events (graphql-sse) both stream JSON events from server to client, but differ in protocol and infrastructure requirements. WebSocket is full-duplex TCP — the client can send messages after the handshake, enabling the graphql-ws ping/pong heartbeat and allowing multiple subscriptions (each with a unique id) to multiplex over a single connection. SSE is one-directional HTTP/1.1 — the server streams text/event-stream responses. SSE connections reconnect automatically using the browser's built-in EventSource API, require no special WebSocket support from proxies or load balancers, and work through HTTP/2 multiplexing. graphql-sse uses one HTTP request per subscription (no muxing) but eliminates WebSocket upgrade complexity and corporate firewall issues. Choose WebSocket for chat, collaborative editing, or bidirectional protocols; choose SSE for read-only dashboards, notifications, or environments with WebSocket restrictions. Both transport identical JSON payloads at the GraphQL layer.
How do I set up GraphQL subscriptions with graphql-yoga?
graphql-yoga supports subscriptions out of the box — no separate WebSocket server or useServer() call is needed for Node.js HTTP servers. Define a subscription field whose resolver returns an AsyncGenerator, create the yoga server with createYoga({'{ schema }'}), attach it to a Node.js createServer(yoga), and call server.listen(4000). Yoga automatically handles the graphql-transport-ws upgrade when a WebSocket client connects. For the PubSub pattern, use yoga's built-in createPubSub(): const pubsub = createPubSub(); then subscribe: () => pubsub.subscribe("MESSAGE_ADDED") in the resolver and pubsub.publish("MESSAGE_ADDED", {'{ messageAdded: msg }'}) in mutations. Yoga also handles SSE subscriptions on the same endpoint — the transport is negotiated by the client's Accept header or subprotocol, requiring no resolver changes. For Next.js App Router, yoga exports as a standard Route Handler; WebSocket upgrades require a custom server.
How do I consume GraphQL subscriptions in Apollo Client?
Configure Apollo Client with a split link that routes subscription operations to a GraphQLWsLink and queries/mutations to an HttpLink. Install @apollo/client and graphql-ws. Create the WebSocket link: const wsLink = new GraphQLWsLink(createClient({'{ url: "ws://localhost:4000/graphql", connectionParams: () => ({ Authorization: `Bearer ${token}` }) }'})). Route operations: split(({'{ query }')) => { const def = getMainDefinition(query); return def.kind === "OperationDefinition" && def.operation === "subscription"; }, wsLink, httpLink). Then use the hook: const { data, loading, error } = useSubscription(MY_SUB, { variables }). The hook re-renders on every new JSON event. data is the latest event — not accumulated. To accumulate events, use the onData callback to write to Apollo cache, or use subscribeToMore inside a useQuery to append events to a historical data set. Apollo automatically normalizes subscription payloads into InMemoryCache when the returned objects have __typename and id.
How do I filter subscription events for specific clients?
Use the withFilter() helper from graphql-subscriptions to add per-event filtering to a subscription resolver. withFilter() takes the subscribe function and a filter predicate: subscribe: withFilter(() => pubsub.asyncIterableIterator("MESSAGE_ADDED"), (payload, variables, context) => payload.messageAdded.roomId === variables.roomId). The predicate receives 3 arguments: the event payload, the client's subscription variables, and the resolver context (containing the authenticated user). Return true to deliver the event, false to skip it (no JSON frame sent). The predicate can be async — return a Promise<boolean> for database permission checks. For high-throughput scenarios with many subscribers, use Redis pub/sub with per-room channels (message:room:${'{roomId}'}) and subscribe each client to their specific channel — this eliminates per-event filter CPU overhead entirely and scales to thousands of concurrent subscriptions with no throughput degradation.
How do I authenticate GraphQL subscription connections?
Authenticate GraphQL subscription connections in the onConnect callback of useServer() (Apollo Server) or createYoga()'s onConnect option. The callback runs once per WebSocket connection when the client sends its ConnectionInit message. The client includes auth data in the ConnectionInit payload: connectionParams: () => ({ Authorization: `Bearer ${token}` }). On the server: onConnect: async (ctx) => {'{ const token = ctx.connectionParams?.Authorization?.replace("Bearer ", ""); const user = await verifyJwt(token); if (!user) return false; return { userId: user.id }; }'}. Returning false closes the WebSocket with code 4401. The returned object is stored in ctx.extra and is accessible in the context() function, which runs per Subscribe operation. Note: browsers cannot send custom HTTP headers during WebSocket upgrade handshakes — the ConnectionInit payload is the correct mechanism for JWT-based auth. Cookie-based auth works automatically because browsers include cookies in the WebSocket upgrade request.
Further reading and primary sources
- graphql-ws Library Documentation — Official graphql-ws docs: protocol spec, server adapters (ws, uWebSockets, Bun, Deno), client API, and authentication patterns
- graphql-sse: GraphQL over Server-Sent Events — graphql-sse library docs: SSE transport, distinct vs single connection mode, Node.js and edge adapter setup
- graphql-yoga Subscriptions — graphql-yoga built-in subscription support: createPubSub, SSE and WS transport, context, filtering, and Next.js integration
- Apollo Server Subscriptions — Apollo Server 4 subscription setup: useServer, WebSocketServer wiring, onConnect auth, and drain plugin
- Apollo Client useSubscription — Apollo Client subscription hooks: useSubscription, GraphQLWsLink, split link setup, subscribeToMore, and cache integration
- graphql-subscriptions: PubSub and withFilter — In-memory PubSub with asyncIterableIterator and withFilter for event filtering in subscription resolvers