JSON in GraphQL Subscriptions: graphql-ws, AsyncIterator, SSE & Apollo Client

Last updated:

GraphQL subscriptions stream JSON events to clients over a persistent connection — each event arrives as a JSON payload matching the subscription's selection set. The graphql-ws subprotocol encodes all messages as JSON: the client sends {"type":"subscribe","id":"1","payload":{"query":"subscription { messageAdded { id text } }"}} and the server responds with {"type":"next","id":"1","payload":{"data":{"messageAdded":{"id":"5","text":"hello"}}}} for each new event. Subscription resolvers return an AsyncIterator or AsyncGenerator — each yield delivers one JSON payload to the subscribed client. graphql-yoga and Apollo Server support subscriptions with useServer() from the graphql-ws library. For read-only subscriptions, graphql-sse uses Server-Sent Events instead of WebSocket — simpler infrastructure with automatic reconnection. Apollo Client's useSubscription hook receives each JSON payload as { data, loading, error }. This guide covers the graphql-ws protocol message format, subscription resolver with AsyncGenerator, graphql-yoga setup, SSE alternative, and Apollo Client subscription consumption.

graphql-ws Protocol: JSON Message Format

The graphql-ws library implements the graphql-transport-ws subprotocol — a structured JSON message protocol layered on top of WebSocket. Every message is a JSON object with a type field and an optional id (for per-subscription correlation) and payload. The protocol defines 8 message types across 2 directions, governing the full lifecycle from connection initialization to event delivery and graceful teardown.

// ── graphql-transport-ws message flow ─────────────────────────
//
//  Client → Server                Server → Client
//  ─────────────────────────────  ──────────────────────────────
//  ConnectionInit                 ConnectionAck
//  Subscribe (id + payload)       Next (id + payload)
//  Complete (id)                  Error (id + payload)
//  Ping                           Complete (id)
//                                 Ping / Pong

// ── 1. Client sends ConnectionInit ────────────────────────────
// Optional payload for authentication tokens
{
  "type": "connection_init",
  "payload": {
    "Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
  }
}

// ── 2. Server responds with ConnectionAck ─────────────────────
{ "type": "connection_ack" }

// ── 3. Client sends Subscribe ─────────────────────────────────
// id is client-generated, unique per subscription on this connection
{
  "type": "subscribe",
  "id": "1",
  "payload": {
    "query": "subscription OnMessageAdded($roomId: ID!) { messageAdded(roomId: $roomId) { id text author createdAt } }",
    "variables": { "roomId": "room-42" },
    "operationName": "OnMessageAdded"
  }
}

// ── 4. Server streams Next messages ───────────────────────────
// One JSON frame per subscription event — data matches the selection set
{
  "type": "next",
  "id": "1",
  "payload": {
    "data": {
      "messageAdded": {
        "id": "msg-5",
        "text": "Hello, world!",
        "author": "alice",
        "createdAt": "2026-05-28T10:00:00.000Z"
      }
    }
  }
}

// ── 5. Server sends Error (GraphQL errors, not transport errors) ─
{
  "type": "error",
  "id": "1",
  "payload": [
    {
      "message": "You do not have permission to subscribe to this room",
      "locations": [{ "line": 1, "column": 14 }],
      "path": ["messageAdded"],
      "extensions": { "code": "FORBIDDEN" }
    }
  ]
}

// ── 6. Client or server sends Complete ────────────────────────
// Client → server: unsubscribe from this id
// Server → client: subscription stream ended (resolver completed)
{ "type": "complete", "id": "1" }

// ── WebSocket close codes (server-initiated) ──────────────────
// 4400  Bad Request      — invalid message received
// 4401  Unauthorized     — onConnect returned false
// 4408  Request Timeout  — ConnectionInit not received within timeout
// 4409  Subscriber for <id> already exists
// 4429  Too Many Initialisation Requests

The id field is client-generated and must be unique per active subscription on a single WebSocket connection. This allows multiplexing — a single connection can carry dozens of concurrent subscriptions, each identified by its id. When the client sends a complete message with a subscription's id, the server stops the corresponding AsyncIterator by calling iterator.return(). The server can also initiate a complete by sending its own complete message, signaling that the subscription stream has naturally ended. Unlike HTTP responses, WebSocket frames have no inherent size limit — a single subscription event can be any size, though large payloads (over 1 MB) should be paginated or broken into multiple events.

Subscription Resolvers with AsyncGenerator

A GraphQL subscription resolver is a JavaScript object with a subscribe function (returns the AsyncIterator) and an optional resolve function (transforms each iterator value before JSON serialization). The subscribe function receives the same arguments as a query resolver: (parent, args, context, info). Each value yielded by the iterator must be an object whose key matches the subscription field name.

import { PubSub, withFilter } from 'graphql-subscriptions'

// ── In-memory PubSub for single-server deployments ─────────────
// For multi-server production, use graphql-redis-subscriptions
const pubsub = new PubSub()

// ── Subscription resolver with PubSub ─────────────────────────
export const subscriptionResolvers = {
  Subscription: {
    messageAdded: {
      // subscribe: returns an AsyncIterator
      // args.roomId comes from the client's subscription variables JSON
      subscribe: withFilter(
        () => pubsub.asyncIterableIterator(['MESSAGE_ADDED']),
        // Filter: only deliver events for the subscribed room
        (payload, variables) =>
          payload.messageAdded.roomId === variables.roomId,
      ),
      // resolve: optional — transform payload before sending JSON
      resolve: (payload: { messageAdded: Message }) => payload.messageAdded,
    },
  },
}

// ── Publishing from a mutation resolver ───────────────────────
export const mutationResolvers = {
  Mutation: {
    sendMessage: async (_: unknown, args: { roomId: string; text: string }, ctx: Context) => {
      const message = await ctx.db.message.create({
        data: {
          roomId: args.roomId,
          text:   args.text,
          author: ctx.user.name,
        },
      })

      // Publish — triggers all matching subscribers
      // The payload key must match the subscription field name
      await pubsub.publish('MESSAGE_ADDED', { messageAdded: message })

      return message
    },
  },
}

// ── Plain AsyncGenerator (no PubSub) ──────────────────────────
// Useful for server-push patterns: counters, polls, live data
async function* countdown(from: number) {
  for (let i = from; i >= 0; i--) {
    yield { countdown: i }
    await new Promise(resolve => setTimeout(resolve, 1000))
  }
}

export const countdownResolvers = {
  Subscription: {
    countdown: {
      subscribe: (_: unknown, args: { from: number }) => countdown(args.from),
      // No resolve needed — yielded shape matches the field type directly
    },
  },
}

// ── TypeScript: subscription payload types ────────────────────
type Message = {
  id:        string
  roomId:    string
  text:      string
  author:    string
  createdAt: string
}

// The yielded value must have the subscription field name as key
// Each yield → one "next" JSON message to the client
// return / throw → "complete" or "error" JSON message to the client

The resolve function runs after subscribe yields a value and before the payload is JSON-serialized and sent. This is where field-level transforms belong: converting database timestamps to ISO strings, removing internal fields, or applying field-level authorization. If resolve is omitted, the yielded value is serialized as-is — the object must already match the GraphQL type's expected shape. The withFilter() wrapper from graphql-subscriptions adds per-event filtering: returning false skips the event entirely (no JSON frame sent to that client), returning true allows it through to the resolve stage.

graphql-yoga Server Setup for Subscriptions

graphql-yoga supports subscriptions out of the box — no separate useServer() call or WebSocket server configuration is required for Node.js HTTP servers. The yoga instance handles WebSocket upgrades internally by attaching to the http.Server's upgrade event. Define your schema with subscription types and provide resolvers that return AsyncGenerators. Yoga's built-in createPubSub() is a TypeScript-typed, in-memory pub/sub primitive suitable for single-server deployments.

import { createServer } from 'node:http'
import { createYoga, createPubSub } from 'graphql-yoga'
import { makeExecutableSchema } from '@graphql-tools/schema'

// ── Type-safe PubSub — maps topic names to payload types ───────
const pubsub = createPubSub<{
  MESSAGE_ADDED: [{ messageAdded: { id: string; text: string; author: string } }]
  USER_ONLINE:   [{ userOnline:   { userId: string; online: boolean } }]
}>()

// ── Schema with subscription type ─────────────────────────────
const typeDefs = /* GraphQL */ `
  type Message { id: ID! text: String! author: String! createdAt: String! }
  type UserStatus { userId: ID! online: Boolean! }

  type Query { messages: [Message!]! }

  type Mutation {
    sendMessage(roomId: ID!, text: String!): Message!
  }

  type Subscription {
    messageAdded(roomId: ID!): Message!
    userOnline(userId: ID!): UserStatus!
  }
`

const resolvers = {
  Query: {
    messages: () => [],
  },
  Mutation: {
    sendMessage: async (_: unknown, args: { roomId: string; text: string }, ctx: any) => {
      const message = { id: Date.now().toString(), text: args.text, author: ctx.user ?? 'anon', createdAt: new Date().toISOString() }
      // Publish triggers all matching subscribers immediately
      pubsub.publish('MESSAGE_ADDED', { messageAdded: message })
      return message
    },
  },
  Subscription: {
    messageAdded: {
      subscribe: (_: unknown, args: { roomId: string }) =>
        pubsub.subscribe('MESSAGE_ADDED'),
      resolve: (payload: { messageAdded: { id: string; text: string; author: string; createdAt?: string } }) =>
        payload.messageAdded,
    },
    userOnline: {
      subscribe: () => pubsub.subscribe('USER_ONLINE'),
      resolve: (payload: { userOnline: { userId: string; online: boolean } }) =>
        payload.userOnline,
    },
  },
}

const schema = makeExecutableSchema({ typeDefs, resolvers })

// ── graphql-yoga instance ──────────────────────────────────────
const yoga = createYoga({
  schema,
  // graphql-yoga adds WebSocket upgrade handling automatically
  // in Node.js — no useServer() or ws.Server needed
  context: (req) => ({
    // Extract user from Authorization header for both HTTP and WS
    user: req.request.headers.get('x-user') ?? null,
  }),
})

// ── Attach to http.Server ─────────────────────────────────────
const server = createServer(yoga)
server.listen(4000, () => {
  console.log('GraphQL server running at http://localhost:4000/graphql')
  console.log('Subscriptions: ws://localhost:4000/graphql (graphql-transport-ws)')
})

// ── Client WebSocket message flow (for reference) ─────────────
// → {"type":"connection_init"}
// ← {"type":"connection_ack"}
// → {"type":"subscribe","id":"1","payload":{"query":"subscription { messageAdded(roomId: \"room-1\") { id text author } }"}}
// ← {"type":"next","id":"1","payload":{"data":{"messageAdded":{"id":"1748...","text":"hello","author":"alice"}}}}
// → {"type":"complete","id":"1"}  ← client unsubscribes

For Next.js App Router deployments, graphql-yoga's HTTP handler exports as GET and POST route handlers directly. WebSocket upgrades require a custom server (server.js) or a separate WebSocket server running alongside the Next.js dev server — Next.js App Router does not support WebSocket upgrades natively. The recommended pattern for Next.js with real-time subscriptions is to run a separate graphql-yoga or Apollo Server process on a different port and configure Apollo Client's split link to route subscriptions there.

Apollo Server Subscriptions with useServer

Apollo Server 4 does not include built-in WebSocket handling — subscriptions require the graphql-ws library's useServer() function applied to a ws.WebSocketServer instance. This explicit wiring gives full control over the WebSocket server configuration: max connections, perMessageDeflate compression, path routing, and the onConnect/onDisconnect lifecycle callbacks used for authentication and cleanup.

import { ApolloServer } from '@apollo/server'
import { expressMiddleware } from '@apollo/server/express4'
import { ApolloServerPluginDrainHttpServer } from '@apollo/server/plugin/drainHttpServer'
import { makeExecutableSchema } from '@graphql-tools/schema'
import { WebSocketServer } from 'ws'
import { useServer } from 'graphql-ws/lib/use/ws'
import express from 'express'
import { createServer } from 'node:http'
import cors from 'cors'
import bodyParser from 'body-parser'

const schema = makeExecutableSchema({ typeDefs, resolvers }) // your schema

const app = express()
const httpServer = createServer(app)

// ── 1. Create WebSocket server ─────────────────────────────────
const wsServer = new WebSocketServer({
  server: httpServer,   // share the same port as HTTP
  path: '/graphql',     // subscriptions endpoint
})

// ── 2. Apply graphql-ws handler ────────────────────────────────
// Returns a cleanup function to gracefully drain connections on shutdown
const serverCleanup = useServer(
  {
    schema,

    // onConnect: runs on ConnectionInit — return false to reject
    onConnect: async (ctx) => {
      const token = (ctx.connectionParams as { Authorization?: string })?.Authorization
        ?.replace('Bearer ', '')
      if (!token) {
        // Returning false sends ws close code 4401 to the client
        return false
      }
      const user = await verifyJwt(token)
      if (!user) return false
      // Returned object is merged into ctx.extra.user (not context)
      return { userId: user.id }
    },

    // context: runs on every Subscribe message — build per-operation context
    context: async (ctx) => ({
      user: (ctx.extra as { userId?: string }).userId
        ? await db.user.findUnique({ where: { id: (ctx.extra as { userId?: string }).userId } })
        : null,
      pubsub,
      db,
    }),

    onDisconnect: (ctx, code, reason) => {
      console.log(`Client disconnected: ${code} ${reason}`)
    },
  },
  wsServer,
)

// ── 3. Create Apollo Server ────────────────────────────────────
const apolloServer = new ApolloServer({
  schema,
  plugins: [
    ApolloServerPluginDrainHttpServer({ httpServer }),
    // Gracefully shut down the WebSocket server when Apollo shuts down
    {
      async serverWillStart() {
        return {
          async drainServer() {
            await serverCleanup.dispose()
          },
        }
      },
    },
  ],
})

await apolloServer.start()

// ── 4. Apply HTTP middleware ────────────────────────────────────
app.use(
  '/graphql',
  cors<cors.CorsRequest>(),
  bodyParser.json(),
  expressMiddleware(apolloServer, {
    context: async ({ req }) => ({
      user: await getUserFromRequest(req),
      db,
      pubsub,
    }),
  }),
)

// ── 5. Start server ────────────────────────────────────────────
httpServer.listen(4000, () => {
  console.log('HTTP:  http://localhost:4000/graphql')
  console.log('WS:    ws://localhost:4000/graphql  (graphql-transport-ws)')
})

// ── Declaring verifyJwt and other helpers ─────────────────────
async function verifyJwt(token: string): Promise<{ id: string } | null> {
  try {
    const payload = await jose.jwtVerify(token, JWT_SECRET)
    return { id: payload.payload.sub as string }
  } catch {
    return null
  }
}

async function getUserFromRequest(req: express.Request) {
  const auth = req.headers.authorization?.replace('Bearer ', '')
  if (!auth) return null
  return verifyJwt(auth)
}

The key difference from graphql-yoga's integrated approach is that Apollo Server 4 requires explicit WebSocket server creation and useServer() wiring. The upside is full control: you can apply rate limiting at the WebSocket server level, configure perMessageDeflate for payload compression (30–70% reduction on text JSON), set maxPayload to reject oversized messages, and run multiple WebSocket servers on different paths. The onConnect authentication runs once per connection (ConnectionInit), not once per subscription — make it fast by caching the verified user object in ctx.extra and looking it up in the context function.

SSE Alternative with graphql-sse

graphql-sse implements GraphQL subscriptions over Server-Sent Events (SSE) rather than WebSocket. The client opens a standard HTTP POST request; the server streams text/event-stream responses. Each event is a JSON object containing the familiar GraphQL { data, errors } envelope. SSE events use one HTTP connection per subscription (no multiplexing), but eliminate WebSocket upgrade complexity and work through HTTP/2, standard load balancers, and corporate firewalls that block WebSocket traffic.

// npm install graphql-sse
import { createHandler } from 'graphql-sse/lib/adapters/node'
import { makeExecutableSchema } from '@graphql-tools/schema'
import { createServer } from 'node:http'

const schema = makeExecutableSchema({ typeDefs, resolvers })

// ── graphql-sse handler ────────────────────────────────────────
const handler = createHandler({
  schema,
  context: async (req) => ({
    // Parse Authorization header for HTTP-based auth (cookies work too)
    user: await getUserFromRequest(req.raw),
    pubsub,
    db,
  }),
})

// ── Attach to http.Server ─────────────────────────────────────
const server = createServer((req, res) => {
  if (req.url?.startsWith('/graphql/stream')) {
    return handler(req, res)
  }
  // Handle regular GraphQL queries/mutations separately
  res.writeHead(404)
  res.end()
})

server.listen(4000)

// ── SSE wire format (text/event-stream) ───────────────────────
// Each event is separated by a blank line; data is JSON
//
// event: next
// data: {"data":{"messageAdded":{"id":"5","text":"hello"}}}
//
// event: complete
// data:
//
// ── Client: native EventSource API (browser) ──────────────────
// EventSource does not support POST or custom headers — use fetch
// graphql-sse ships a client that uses fetch for POST-based SSE

import { createClient } from 'graphql-sse'

const client = createClient({
  url: 'http://localhost:4000/graphql/stream',
  headers: () => ({
    Authorization: `Bearer ${localStorage.getItem('token')}`,
  }),
})

// Subscribe — returns an unsubscribe function
const unsubscribe = client.subscribe(
  {
    query: `subscription OnMessageAdded($roomId: ID!) {
      messageAdded(roomId: $roomId) { id text author }
    }`,
    variables: { roomId: 'room-42' },
  },
  {
    // next: called for each JSON event payload
    next: ({ data }) => console.log('New message:', data?.messageAdded),
    error: (err)    => console.error('Subscription error:', err),
    complete: ()    => console.log('Subscription complete'),
  },
)

// Unsubscribe after 60 seconds
setTimeout(unsubscribe, 60_000)

// ── SSE vs WebSocket: when to choose ─────────────────────────
// SSE (graphql-sse):
//   ✓ No WebSocket proxy config — works through nginx, CDN, AWS ALB
//   ✓ Automatic reconnect via browser EventSource or graphql-sse client
//   ✓ HTTP/2 multiplexing in supported environments
//   ✗ One HTTP connection per subscription (no id-based muxing)
//   ✗ Server → client only (heartbeats must use polling or app-level ping)
//
// WebSocket (graphql-ws):
//   ✓ Full-duplex — client can send Ping, Complete, multiple Subscribe
//   ✓ Multiplex N subscriptions over 1 connection with id field
//   ✓ Lower overhead per event (no HTTP headers per event)
//   ✗ Requires WebSocket-aware proxy/load balancer config
//   ✗ Browser EventSource not applicable — needs ws/graphql-ws client

For production SSE deployments, configure your load balancer to disable response buffering for the /graphql/stream path — nginx needs proxy_buffering off, AWS ALB needs proxy_read_timeout set above the subscription lifetime, and Cloudflare needs the endpoint excluded from caching. SSE connections hold an open HTTP response — each active subscription counts against the server's open file descriptor limit (ulimit -n), typically 65,535 on Linux by default.

Apollo Client useSubscription Hook

Apollo Client consumes GraphQL subscriptions in React via the useSubscription hook. The hook requires a GraphQLWsLink configured in the Apollo Client instance and a split link that routes subscription operations to the WebSocket transport. Each new subscription event triggers a re-render with the updated data value — it is the latest payload, not a running array of all events.

// npm install @apollo/client graphql-ws
import {
  ApolloClient, InMemoryCache, HttpLink, split,
  ApolloProvider, gql, useSubscription, useQuery,
} from '@apollo/client'
import { GraphQLWsLink } from '@apollo/client/link/subscriptions'
import { getMainDefinition } from '@apollo/client/utilities'
import { createClient } from 'graphql-ws'

// ── 1. Create the WebSocket link ───────────────────────────────
const wsLink = new GraphQLWsLink(
  createClient({
    url: 'ws://localhost:4000/graphql',
    connectionParams: () => ({
      // Sent as the ConnectionInit payload — server reads this for auth
      Authorization: `Bearer ${localStorage.getItem('token')}`,
    }),
    // Reconnect on disconnect with exponential backoff
    retryAttempts: 5,
    on: {
      connected: ()    => console.log('WS connected'),
      closed:    ()    => console.log('WS closed'),
      error:     (err) => console.error('WS error', err),
    },
  })
)

// ── 2. Create the HTTP link for queries and mutations ──────────
const httpLink = new HttpLink({ uri: 'http://localhost:4000/graphql' })

// ── 3. Split: subscriptions → WS, everything else → HTTP ──────
const splitLink = split(
  ({ query }) => {
    const definition = getMainDefinition(query)
    return (
      definition.kind === 'OperationDefinition' &&
      definition.operation === 'subscription'
    )
  },
  wsLink,
  httpLink,
)

// ── 4. Apollo Client instance ──────────────────────────────────
const client = new ApolloClient({
  link: splitLink,
  cache: new InMemoryCache(),
})

// ── 5. Subscription document ──────────────────────────────────
const MESSAGE_ADDED_SUB = gql`
  subscription OnMessageAdded($roomId: ID!) {
    messageAdded(roomId: $roomId) {
      id
      text
      author
      createdAt
    }
  }
`

// ── 6. useSubscription hook ────────────────────────────────────
// Re-renders on every new JSON event from the server
function LiveChat({ roomId }: { roomId: string }) {
  const { data, loading, error } = useSubscription(MESSAGE_ADDED_SUB, {
    variables: { roomId },
    // onData: side-effect callback without re-render (Apollo Client 3.7+)
    onData: ({ client, data: { data: subscriptionData } }) => {
      // Write the new message directly into Apollo's cache
      // so that useQuery('GET_MESSAGES') sees it immediately
      client.cache.modify({
        fields: {
          messages(existing = []) {
            const newRef = client.cache.writeFragment({
              data: subscriptionData?.messageAdded,
              fragment: gql`fragment NewMsg on Message { id text author createdAt }`,
            })
            return [...existing, newRef]
          },
        },
      })
    },
  })

  if (loading) return <p>Connecting to live chat...</p>
  if (error)   return <p>Error: {error.message}</p>

  // data.messageAdded is the LATEST message — not all messages
  // Use onData + cache.modify above to accumulate messages
  return (
    <div>
      <p>Latest: {data?.messageAdded?.text} — {data?.messageAdded?.author}</p>
    </div>
  )
}

// ── 7. subscribeToMore — add live updates to an existing useQuery ─
function ChatRoom({ roomId }: { roomId: string }) {
  const { data, subscribeToMore } = useQuery(GET_MESSAGES, { variables: { roomId } })

  React.useEffect(() => {
    // subscribeToMore returns an unsubscribe function
    const unsubscribe = subscribeToMore({
      document: MESSAGE_ADDED_SUB,
      variables: { roomId },
      updateQuery: (prev, { subscriptionData }) => {
        if (!subscriptionData.data) return prev
        const newMsg = subscriptionData.data.messageAdded
        return { messages: [...(prev.messages ?? []), newMsg] }
      },
    })
    return unsubscribe
  }, [roomId])

  return <MessageList messages={data?.messages ?? []} />
}

export default function App() {
  return (
    <ApolloProvider client={client}>
      <ChatRoom roomId="room-42" />
    </ApolloProvider>
  )
}

The onData callback (Apollo Client 3.7+) fires on every subscription event without triggering an additional re-render from the hook itself — useful for writing events into the Apollo cache so that related useQuery results update reactively. subscribeToMore is the classic pattern for combining an initial query with a live subscription: the query loads historical data, and subscribeToMore appends new events as they arrive. Apollo's InMemoryCache normalizes subscription payloads by __typename + id — if the server returns a Message type with a unique id, the cache automatically updates any cached query result that references the same message, keeping the UI consistent with zero extra code.

Authentication and Filtering JSON Subscription Events

Securing GraphQL subscriptions requires two layers: connection-level authentication (verify the user can open a subscription at all) and field-level authorization (filter or redact events based on the user's permissions). Connection-level auth runs in the onConnect callback and completes before any Subscribe messages are processed. Field-level auth runs in the withFilter() predicate or inside the subscribe resolver itself.

import { withFilter } from 'graphql-subscriptions'
import { useServer } from 'graphql-ws/lib/use/ws'

// ── useServer: onConnect for connection-level auth ─────────────
const serverCleanup = useServer(
  {
    schema,

    // onConnect: runs on ConnectionInit JSON message
    // connectionParams comes from the client's ConnectionInit payload
    onConnect: async (ctx) => {
      const params = ctx.connectionParams as { Authorization?: string }
      const token  = params?.Authorization?.replace('Bearer ', '')

      if (!token) {
        console.warn('No token — rejecting WS connection')
        return false          // sends close code 4401 to client
      }

      try {
        const user = await verifyJwt(token)
        if (!user) return false
        // Store user in ctx.extra — accessible in context() per Subscribe
        ;(ctx.extra as { user?: { id: string; role: string } }).user = user
        return true
      } catch {
        return false
      }
    },

    // context: runs per Subscribe operation — has access to ctx.extra
    context: async (ctx) => {
      const extraUser = (ctx.extra as { user?: { id: string; role: string } }).user
      return {
        user: extraUser,
        db,
        pubsub,
      }
    },
  },
  wsServer,
)

// ── withFilter: per-event field-level filtering ────────────────
export const subscriptionResolvers = {
  Subscription: {
    // Deliver order updates only to the order's owner
    orderUpdated: {
      subscribe: withFilter(
        () => pubsub.asyncIterableIterator(['ORDER_UPDATED']),
        async (
          payload: { orderUpdated: { id: string; userId: string; status: string } },
          variables: { orderId: string },
          context: { user: { id: string; role: string } | null }
        ) => {
          if (!context.user) return false
          // Admin sees all order updates
          if (context.user.role === 'ADMIN') return true
          // Regular users see only their own orders
          return payload.orderUpdated.userId === context.user.id
                 && payload.orderUpdated.id   === variables.orderId
        },
      ),
    },

    // Field-level redaction: remove sensitive fields for non-admins
    auditEvent: {
      subscribe: withFilter(
        () => pubsub.asyncIterableIterator(['AUDIT_EVENT']),
        (_: unknown, __: unknown, ctx: { user: { role: string } | null }) =>
          ctx.user?.role === 'ADMIN',  // Only admins receive audit events
      ),
      resolve: (
        payload: { auditEvent: AuditEvent },
        _: unknown,
        ctx: { user: { role: string } | null }
      ) => {
        const event = payload.auditEvent
        // Strip IP and user agent for non-super-admin users
        if (ctx.user?.role !== 'SUPER_ADMIN') {
          const { ipAddress, userAgent, ...safeEvent } = event
          return safeEvent
        }
        return event
      },
    },
  },
}

// ── Per-room subscription channels (Redis pattern) ─────────────
// Instead of withFilter on a global channel, publish to specific channels
// to avoid per-event CPU overhead at high subscriber counts
import { RedisPubSub } from 'graphql-redis-subscriptions'
import Redis from 'ioredis'

const redisPubSub = new RedisPubSub({
  publisher:  new Redis({ host: 'redis', port: 6379 }),
  subscriber: new Redis({ host: 'redis', port: 6379 }),
})

export const roomResolvers = {
  Subscription: {
    messageAdded: {
      // Subscribe to room-specific Redis channel — no withFilter needed
      // Channel: "message:room-{roomId}"
      subscribe: (_: unknown, args: { roomId: string }) =>
        redisPubSub.asyncIterableIterator(`message:room:${args.roomId}`),
    },
  },
  Mutation: {
    sendMessage: async (_: unknown, args: { roomId: string; text: string }, ctx: any) => {
      const message = await ctx.db.message.create({ data: { roomId: args.roomId, text: args.text, author: ctx.user.name } })
      // Publish to the room's specific channel
      await redisPubSub.publish(`message:room:${args.roomId}`, { messageAdded: message })
      return message
    },
  },
}

The Redis pub/sub channel-per-room pattern is the most scalable approach: instead of publishing all messages to one MESSAGE_ADDED channel and filtering client-side, publish to message:room:${'{roomId}'}. Each subscriber connects only to their specific channel — zero per-event filter CPU cost regardless of the number of total subscribers or rooms. Redis pub/sub supports up to 10,000 concurrent subscribers per channel with no throughput degradation, making this pattern production-ready for real-time chat, live dashboards, and collaborative tools with millions of concurrent users across multiple server instances.

Key Terms

graphql-ws
An npm library that implements the graphql-transport-ws WebSocket subprotocol for GraphQL subscriptions. Despite sharing a name with the older subscriptions-transport-ws protocol, the two are different and incompatible. The library provides both a client (createClient()) and server adapters (useServer()) for ws, uWebSockets, Bun, Deno, and other runtimes. Every message in the protocol is a JSON object with a type field; eight message types govern the full connection lifecycle from ConnectionInit to Complete. Apollo Server 4, graphql-yoga, and Mercurius all use this library internally for their subscription support.
AsyncIterator
A JavaScript protocol object with a Symbol.asyncIterator method that returns an object with a next() method. Calling next() returns a Promise<{ value, done }>. GraphQL subscription resolvers must return an AsyncIterator (or AsyncGenerator, which automatically implements the protocol). The GraphQL execution engine calls next() repeatedly; each resolved value becomes one subscription event, JSON-serialized into a graphql-ws next message. When done is true or the client sends a complete message, the engine calls iterator.return() to clean up resources.
PubSub
A publish-subscribe system that decouples event producers (mutations) from event consumers (subscription resolvers). The graphql-subscriptions package provides an in-memory PubSub backed by Node.js EventEmitter — suitable for single-server development but unable to span multiple server instances. Production deployments use graphql-redis-subscriptions (backed by Redis pub/sub), graphql-postgres-subscriptions (backed by PostgreSQL LISTEN/NOTIFY), or managed services like AWS SNS. The PubSub pattern separates the event source (a mutation calling pubsub.publish()) from the subscription resolver (calling pubsub.asyncIterableIterator()), keeping resolvers stateless.
withFilter
A higher-order function from graphql-subscriptions that wraps an AsyncIterator to add per-event filtering. It takes two arguments: a subscribe function that returns the base iterator, and a filter function that receives each event payload and the client's subscription variables. If the filter returns false (or Promise<false>), the event is silently discarded — no JSON frame is sent to that client. If it returns true, the event proceeds to the resolve function and then to the client. withFilter() is the standard mechanism for room-scoped or user-scoped subscriptions in single-server PubSub setups.
graphql-sse
A library implementing GraphQL subscriptions over Server-Sent Events (SSE), an HTTP/1.1 feature where the server streams text/event-stream responses. Unlike WebSocket, SSE is unidirectional (server to client only), uses standard HTTP, and reconnects automatically via the browser's EventSourceAPI or the graphql-sse client. Each GraphQL subscription event is sent as an SSE event with a JSON data payload. graphql-sse supports two modes: "distinct connections mode" (one HTTP request per subscription, simpler auth) and "single connection mode" (one HTTP request for all subscriptions, similar to WebSocket multiplexing). SSE works through all HTTP-aware load balancers and proxies without special configuration.
useSubscription
The React hook in Apollo Client for consuming GraphQL subscriptions. It accepts a subscription document, optional variables, and options including skip (pause the subscription), onData (side-effect callback), and fetchPolicy. The hook returns { data, loading, error }data is always the most recent event payload, not an accumulated history. Apollo Client opens the WebSocket connection and sends the subscribe message when the component mounts; it sends a complete message when the component unmounts, and the server stops the corresponding AsyncIterator. The hook re-renders the component on every new JSON event received from the server.

FAQ

How do GraphQL subscriptions send JSON data?

GraphQL subscriptions send JSON over a persistent WebSocket connection using the graphql-ws subprotocol (or the older subscriptions-transport-ws protocol). Every message — connection init, subscribe, next, complete, and error — is a JSON object sent as a WebSocket text frame. When the server emits a new subscription event, it sends a message with type: "next", an id matching the original subscribe message, and a payload containing the familiar GraphQL { data, errors } envelope: {"type":"next","id":"1","payload":{"data":{"messageAdded":{"id":"42","text":"hello"}}}}. The client parses this JSON text frame and delivers the data object to the application layer. Each event arrives independently — there is no batching by default. The round-trip latency from server event to client data is typically under 5 ms on a local network, making subscriptions suitable for real-time dashboards, chat, live scores, and collaborative editing.

What is the graphql-ws protocol?

graphql-ws is a community-maintained WebSocket subprotocol for GraphQL subscriptions, identified by the subprotocol string "graphql-transport-ws". It replaced the older "subscriptions-transport-ws" subprotocol (confusingly, the npm package names are swapped). The protocol defines 8 JSON message types: ConnectionInit (client sends optional auth payload), ConnectionAck (server confirms), Ping / Pong (keepalive), Subscribe (client sends the GraphQL query as a JSON payload), Next (server sends one event), Error (server sends a GraphQL errors array), and Complete (either side signals end). The server closes the WebSocket with code 4400 for invalid messages, 4401 for unauthorized connections (when onConnect returns false), and 4429 if too many initialization requests arrive within 1 second. Apollo Server 4, graphql-yoga, and Mercurius all support the graphql-transport-ws subprotocol via useServer() from the graphql-ws library.

How do I write a subscription resolver in TypeScript?

A subscription resolver returns an AsyncIterator or AsyncGenerator from its subscribe function. Each value yielded by the generator becomes one JSON payload delivered to the subscribed client. The resolver object has two properties: subscribe (mandatory, returns the AsyncIterator) and resolve (optional, transforms each iterator value before serialization). With PubSub: subscribe: withFilter(() => pubsub.asyncIterableIterator(["MESSAGE_ADDED"]), (payload, variables) => payload.messageAdded.roomId === variables.roomId). With a plain AsyncGenerator: async function* subscribe() { while (true) { yield { countdown: count-- }; await delay(1000); } }. TypeScript types the subscribe return as AsyncIterator<TPayload> where TPayload must have the subscription field name as a key. The resolve function receives each yielded value and can transform or redact fields — only the resolve return value is JSON-serialized and sent to the client.

What is the difference between GraphQL subscriptions and SSE?

GraphQL subscriptions over WebSocket (graphql-ws) and over Server-Sent Events (graphql-sse) both stream JSON events from server to client, but differ in protocol and infrastructure requirements. WebSocket is full-duplex TCP — the client can send messages after the handshake, enabling the graphql-ws ping/pong heartbeat and allowing multiple subscriptions (each with a unique id) to multiplex over a single connection. SSE is one-directional HTTP/1.1 — the server streams text/event-stream responses. SSE connections reconnect automatically using the browser's built-in EventSource API, require no special WebSocket support from proxies or load balancers, and work through HTTP/2 multiplexing. graphql-sse uses one HTTP request per subscription (no muxing) but eliminates WebSocket upgrade complexity and corporate firewall issues. Choose WebSocket for chat, collaborative editing, or bidirectional protocols; choose SSE for read-only dashboards, notifications, or environments with WebSocket restrictions. Both transport identical JSON payloads at the GraphQL layer.

How do I set up GraphQL subscriptions with graphql-yoga?

graphql-yoga supports subscriptions out of the box — no separate WebSocket server or useServer() call is needed for Node.js HTTP servers. Define a subscription field whose resolver returns an AsyncGenerator, create the yoga server with createYoga({'{ schema }'}), attach it to a Node.js createServer(yoga), and call server.listen(4000). Yoga automatically handles the graphql-transport-ws upgrade when a WebSocket client connects. For the PubSub pattern, use yoga's built-in createPubSub(): const pubsub = createPubSub(); then subscribe: () => pubsub.subscribe("MESSAGE_ADDED") in the resolver and pubsub.publish("MESSAGE_ADDED", {'{ messageAdded: msg }'}) in mutations. Yoga also handles SSE subscriptions on the same endpoint — the transport is negotiated by the client's Accept header or subprotocol, requiring no resolver changes. For Next.js App Router, yoga exports as a standard Route Handler; WebSocket upgrades require a custom server.

How do I consume GraphQL subscriptions in Apollo Client?

Configure Apollo Client with a split link that routes subscription operations to a GraphQLWsLink and queries/mutations to an HttpLink. Install @apollo/client and graphql-ws. Create the WebSocket link: const wsLink = new GraphQLWsLink(createClient({'{ url: "ws://localhost:4000/graphql", connectionParams: () => ({ Authorization: `Bearer ${token}` }) }'})). Route operations: split(({'{ query }')) =&gt; { const def = getMainDefinition(query); return def.kind === "OperationDefinition" && def.operation === "subscription"; }, wsLink, httpLink). Then use the hook: const { data, loading, error } = useSubscription(MY_SUB, { variables }). The hook re-renders on every new JSON event. data is the latest event — not accumulated. To accumulate events, use the onData callback to write to Apollo cache, or use subscribeToMore inside a useQuery to append events to a historical data set. Apollo automatically normalizes subscription payloads into InMemoryCache when the returned objects have __typename and id.

How do I filter subscription events for specific clients?

Use the withFilter() helper from graphql-subscriptions to add per-event filtering to a subscription resolver. withFilter() takes the subscribe function and a filter predicate: subscribe: withFilter(() => pubsub.asyncIterableIterator("MESSAGE_ADDED"), (payload, variables, context) => payload.messageAdded.roomId === variables.roomId). The predicate receives 3 arguments: the event payload, the client's subscription variables, and the resolver context (containing the authenticated user). Return true to deliver the event, false to skip it (no JSON frame sent). The predicate can be async — return a Promise<boolean> for database permission checks. For high-throughput scenarios with many subscribers, use Redis pub/sub with per-room channels (message:room:${'{roomId}'}) and subscribe each client to their specific channel — this eliminates per-event filter CPU overhead entirely and scales to thousands of concurrent subscriptions with no throughput degradation.

How do I authenticate GraphQL subscription connections?

Authenticate GraphQL subscription connections in the onConnect callback of useServer() (Apollo Server) or createYoga()'s onConnect option. The callback runs once per WebSocket connection when the client sends its ConnectionInit message. The client includes auth data in the ConnectionInit payload: connectionParams: () => ({ Authorization: `Bearer ${token}` }). On the server: onConnect: async (ctx) =&gt; {'{ const token = ctx.connectionParams?.Authorization?.replace("Bearer ", ""); const user = await verifyJwt(token); if (!user) return false; return { userId: user.id }; }'}. Returning false closes the WebSocket with code 4401. The returned object is stored in ctx.extra and is accessible in the context() function, which runs per Subscribe operation. Note: browsers cannot send custom HTTP headers during WebSocket upgrade handshakes — the ConnectionInit payload is the correct mechanism for JWT-based auth. Cookie-based auth works automatically because browsers include cookies in the WebSocket upgrade request.

Further reading and primary sources