JSON in Redis Streams: XADD, XREAD, Consumer Groups, Pub/Sub & RedisJSON
Last updated:
Redis handles JSON events through two mechanisms: Redis Streams for durable message queues and Pub/Sub for ephemeral broadcasts. XADD mystream * event '{"type":"order","id":123}' appends a JSON-encoded message to a stream with an auto-generated timestamp ID. XREAD COUNT 10 STREAMS mystream $ reads new messages as they arrive. Consumer groups (XGROUP CREATE + XREADGROUP) distribute stream messages across multiple worker processes with at-least-once delivery — XACK acknowledges processing and removes messages from the Pending Entries List. Redis Pub/Sub (PUBLISH channel '{"event":"update"}') broadcasts JSON to all current subscribers but loses messages if no subscriber is connected — unlike Streams which persist messages for replay. RedisJSON (JSON.SET, JSON.GET) stores structured JSON natively with JSONPath queries, avoiding serialization overhead for complex objects. This guide covers XADD/XREAD for streaming JSON, consumer groups for parallel processing, Pub/Sub vs Streams tradeoffs, RedisJSON, and Node.js/ioredis implementation patterns.
XADD and XREAD: Appending and Reading JSON Stream Messages
Redis Streams store messages as field-value pairs. The most common pattern is a single event or payload field whose value is a JSON string. XADD is O(1) and returns the auto-generated ID immediately. XREAD is used for simple consumers without group coordination — it tracks its own position using the last received ID.
# ── XADD: append a JSON message to a stream ─────────────────────────────
# Syntax: XADD key [MAXLEN [~] count] id field value [field value ...]
# * = auto-generate ID (millisecond timestamp + sequence number)
XADD orders * event '{"type":"order_created","id":1001,"total":49.99,"currency":"USD"}'
# Returns: "1716900000000-0"
XADD orders * event '{"type":"order_shipped","id":1001,"carrier":"FedEx","trackingCode":"FX123"}'
# Returns: "1716900001000-0"
# Cap stream at approximately 10,000 messages (radix-tree trimming, O(1))
XADD orders MAXLEN ~ 10000 * event '{"type":"order_delivered","id":1001}'
# ── XLEN: count messages in a stream ────────────────────────────────────
XLEN orders
# Returns: 3
# ── XRANGE: read messages between two IDs (inclusive) ───────────────────
# - = minimum ID, + = maximum ID (reads entire stream)
XRANGE orders - + COUNT 10
# Returns list of [id, [field, value, ...]] pairs:
# 1) 1) "1716900000000-0"
# 2) 1) "event"
# 2) "{"type":"order_created","id":1001,"total":49.99,"currency":"USD"}"
# ── XREAD: read new messages after a given ID ────────────────────────────
# Read up to 10 messages starting from ID 0 (the very beginning)
XREAD COUNT 10 STREAMS orders 0
# Read up to 5 messages starting after the last known ID
XREAD COUNT 5 STREAMS orders 1716900000000-0
# Read from multiple streams simultaneously
XREAD COUNT 10 STREAMS orders notifications 1716900000000-0 1716900000000-0
# ── XREAD with BLOCK: wait for new messages ──────────────────────────────
# $ means "only messages newer than right now"
# BLOCK 5000 = wait up to 5000ms (5 seconds) for a new message
XREAD COUNT 10 BLOCK 5000 STREAMS orders $
# Returns null if no message arrives within the timeout
# Returns message immediately when one is published
# After the first XREAD with $, use the last received ID for subsequent calls
# to avoid missing messages that arrive between polls:
XREAD COUNT 10 BLOCK 5000 STREAMS orders 1716900001000-0The ID format milliseconds-sequence guarantees ordering within a millisecond. IDs are lexicographically sortable, so XRANGE and XREAD always return messages in insertion order. When using XREAD BLOCK $ STREAMS mystream, always replace $ with the last received ID on the second and subsequent calls — $only means "latest at connection time" and will silently skip messages that arrived while your code was processing the previous batch.
Consumer Groups: Parallel JSON Processing with At-Least-Once Delivery
Consumer groups allow multiple worker processes to each receive a different subset of messages, scaling throughput horizontally. Each message is delivered to exactly one consumer in the group. Redis tracks which messages each consumer has received but not yet acknowledged via the Pending Entries List (PEL).
# ── XGROUP CREATE: create a consumer group ──────────────────────────────
# $ = start from the latest message (don't replay history)
# 0 = start from the beginning (replay all existing messages)
# MKSTREAM = create the stream if it doesn't exist
XGROUP CREATE orders processors $ MKSTREAM
# Create a second group for a different use case (analytics)
XGROUP CREATE orders analytics 0
# ── XREADGROUP: read as a named consumer within a group ─────────────────
# > = "give me messages not yet delivered to any consumer in this group"
# Each call delivers different messages to different workers
XREADGROUP GROUP processors worker-1 COUNT 5 STREAMS orders >
# Returns messages exclusively reserved for worker-1
XREADGROUP GROUP processors worker-2 COUNT 5 STREAMS orders >
# Returns a different 5 messages, exclusively for worker-2
# ── XREADGROUP with BLOCK: long-poll for new JSON events ─────────────────
XREADGROUP GROUP processors worker-1 COUNT 10 BLOCK 5000 STREAMS orders >
# ── Response format: [stream_name, [[id, [field, value, ...]], ...]] ─────
# Example response (Python-style for readability):
# [
# ["orders", [
# ["1716900000000-0", ["event", '{"type":"order_created","id":1001}']],
# ["1716900001000-0", ["event", '{"type":"order_shipped","id":1001}']],
# ]]
# ]
# ── XINFO GROUPS: inspect consumer group state ───────────────────────────
XINFO GROUPS orders
# Shows: name, consumers, pending count, last-delivered-id, entries-read, lag
# ── XINFO CONSUMERS: list consumers in a group ───────────────────────────
XINFO CONSUMERS orders processorsUp to 128 consumer groups per stream are supported. Each group maintains an independent delivery cursor — the last-delivered-id — so multiple groups can process the same stream at different speeds. A consumer group named analytics starting from 0 will replay all historical events while the processors group starting from $ only handles new messages. Worker names (worker-1, worker-2) are arbitrary strings; Redis creates them implicitly on the first XREADGROUP call.
XACK and the Pending Entries List: Reliable JSON Processing
The Pending Entries List (PEL) records every message delivered by XREADGROUP but not yet acknowledged. XACK removes a message from the PEL after successful processing. Messages that stay in the PEL too long can be reclaimed and redelivered to another worker, implementing automatic retry.
# ── XACK: acknowledge successful processing ─────────────────────────────
# Remove message from the PEL — confirms it was processed successfully
XACK orders processors 1716900000000-0
# Returns: 1 (number of messages acknowledged)
# Acknowledge multiple messages at once (batch ack after processing batch)
XACK orders processors 1716900000000-0 1716900001000-0 1716900002000-0
# ── XPENDING: inspect the Pending Entries List ───────────────────────────
# Summary: total pending count, min/max ID, consumers with pending messages
XPENDING orders processors
# Detailed: list up to 10 pending messages for all consumers
XPENDING orders processors - + 10
# Returns: [id, consumer-name, idle-ms, delivery-count]
# 1) 1) "1716900003000-0"
# 2) "worker-2"
# 3) (integer) 35000 ← idle for 35 seconds
# 4) (integer) 1 ← delivered once, not yet acked
# ── XAUTOCLAIM: reclaim messages idle for too long ───────────────────────
# Transfer messages idle > 60,000ms (60s) from any consumer to worker-3
# Starting from the earliest pending message (0-0)
XAUTOCLAIM orders processors worker-3 60000 0-0 COUNT 10
# Returns: [next-start-id, [[id, fields], ...], [deleted-ids]]
# next-start-id: cursor for the next XAUTOCLAIM call
# ── Dead-letter stream pattern: route poison-pill messages ───────────────
# After delivery-count exceeds 3, send to a dead-letter stream
# (application-level logic — not built into Redis)
# XADD orders-dlq * original_id 1716900003000-0 event '{"type":"order_created",...}' error "max retries exceeded"
# XACK orders processors 1716900003000-0
# ── XDEL: remove a specific message from the stream ─────────────────────
# Does NOT automatically remove from PEL — call XACK first
XDEL orders 1716900000000-0
# ── XTRIM: trim stream to a maximum length ───────────────────────────────
XTRIM orders MAXLEN ~ 50000The delivery count in XPENDING output is critical for detecting poison-pill messages — a message that always fails processing will have a high delivery count while remaining in the PEL. A robust consumer checks the delivery count before calling XACK and routes messages that exceed a threshold (e.g., 5 attempts) to a dead-letter stream rather than continuing to retry them. This prevents a single malformed JSON event from blocking the entire consumer group indefinitely.
Redis Pub/Sub: Ephemeral JSON Broadcasting
Redis Pub/Sub delivers JSON messages to all currently-connected subscribers simultaneously. It is appropriate for scenarios where delivery to offline subscribers is not required — presence events, live dashboard updates, and chat room messages. Unlike Streams, there is no persistence, no replay, and no acknowledgement.
# ── SUBSCRIBE: listen to a channel ─────────────────────────────────────
# (Run in a separate terminal / connection)
SUBSCRIBE notifications
# Pattern subscribe: receive from any channel matching a glob
PSUBSCRIBE orders.*
PSUBSCRIBE user.*
# ── PUBLISH: broadcast a JSON message ───────────────────────────────────
PUBLISH notifications '{"event":"price_update","item":"AAPL","price":182.50}'
# Returns: 2 (number of subscribers who received the message)
# Returns: 0 (no subscribers connected — message is permanently lost)
PUBLISH orders.created '{"id":1002,"total":79.99,"userId":"u-456"}'
# ── Keyspace notifications: auto-publish on Redis key events ─────────────
# Enable in redis.conf: notify-keyspace-events KEA
# Or at runtime:
CONFIG SET notify-keyspace-events KEA
# Now subscribe to keyspace events for a specific key:
SUBSCRIBE __keyevent@0__:set # fires when any key is SET
SUBSCRIBE __keyspace@0__:user:1 # fires for any operation on user:1
# Notification payload is the command name (e.g., "set", "del", "expire")
# NOT the value — use GET to fetch the current value after notification
# ── PUBSUB commands: inspect channels and subscribers ───────────────────
PUBSUB CHANNELS * # list all active channels
PUBSUB NUMSUB notifications # subscriber count for 'notifications'
PUBSUB NUMPAT # number of active pattern subscriptionsA connection in SUBSCRIBE or PSUBSCRIBE mode can only send subscribe/unsubscribe commands — it cannot execute regular Redis commands like GET or SET. Use a dedicated connection for subscriptions. In Node.js with ioredis, call client.duplicate()to create a subscriber connection that doesn't block your main client.
Redis Streams vs Pub/Sub: When to Use Each
Streams and Pub/Sub solve different problems. Choose based on whether message durability, replay, and acknowledgement are required for your JSON event pipeline.
| Dimension | Redis Streams | Redis Pub/Sub |
|---|---|---|
| Persistence | Yes — messages survive restarts (AOF/RDB) | No — messages are lost if no subscriber is connected |
| Replay | Yes — read from any past ID with XRANGE/XREAD | No — only live delivery |
| Parallel consumers | Yes — consumer groups divide messages across workers | Fan-out — all subscribers receive every message |
| Acknowledgement | XACK + PEL retry for at-least-once delivery | None — fire-and-forget |
| Backpressure | Yes — consumers can fall behind; MAXLEN caps growth | No — slow subscribers are disconnected by Redis |
| Best for | Order processing, audit logs, ETL pipelines, task queues | Live scores, chat messages, presence updates, cache invalidation |
| Max subscribers/consumers | 128 consumer groups; unlimited consumers per group | Unlimited subscribers per channel |
A common hybrid pattern: use Streams as the durable source of truth and Pub/Sub for real-time UI notifications. When an order event is appended to a Stream, a worker processes it and then PUBLISHes a lighter notification (just an order ID) to a Pub/Sub channel. Connected browser clients receive the notification via WebSocket and fetch fresh data from the API. Offline clients miss the Pub/Sub notification but will catch up by polling or reconnecting — they don't miss the actual data because it's in a database or cache, not in Pub/Sub.
RedisJSON: Storing and Querying Structured JSON Natively
RedisJSON (the ReJSON module, bundled in Redis Stack) stores JSON documents as a parsed binary tree rather than flat strings. This enables atomic field-level operations and JSONPath queries without deserializing the entire document on the client. Available in Redis Cloud, Redis Stack, and self-hosted Redis with the module loaded.
# ── JSON.SET: store a JSON document ────────────────────────────────────
# Syntax: JSON.SET key path value [NX | XX]
# $ = root path
JSON.SET user:1 $ '{"name":"Alice","score":0,"tags":["premium"],"address":{"city":"Portland"}}'
# Returns: "OK"
# Set only if key does NOT exist (NX = not exists)
JSON.SET user:2 $ '{"name":"Bob","score":10}' NX
# Update a nested field — no full document fetch needed
JSON.SET user:1 $.address.city $ '"Seattle"'
# ── JSON.GET: retrieve all or part of a document ────────────────────────
# Return the full document
JSON.GET user:1 $
# Returns: [{"name":"Alice","score":0,"tags":["premium"],"address":{"city":"Seattle"}}]
# Return only the name field (JSONPath)
JSON.GET user:1 $.name
# Returns: ["Alice"]
# Return a nested field
JSON.GET user:1 $.address.city
# Returns: ["Seattle"]
# Return multiple paths at once
JSON.GET user:1 $.name $.score
# Returns: {"$.name":["Alice"],"$.score":[0]}
# ── JSON.NUMINCRBY: atomically increment a number field ─────────────────
JSON.NUMINCRBY user:1 $.score 10
# Returns: [10] (new value)
JSON.NUMINCRBY user:1 $.score -3
# Returns: [7]
# ── JSON.ARRAPPEND: append to a JSON array without fetching it ───────────
JSON.ARRAPPEND user:1 $.tags '"vip"' '"beta-tester"'
# Returns: [3] (new array length)
JSON.GET user:1 $.tags
# Returns: [["premium","vip","beta-tester"]]
# ── JSON.DEL: delete a field or the entire document ──────────────────────
JSON.DEL user:1 $.address # delete the address field
JSON.DEL user:1 $ # delete the entire document
# ── JSONPath filter expressions ──────────────────────────────────────────
# Find all users with score > 5 (requires RediSearch or KEYS + JSON.GET loop)
JSON.GET user:1 '$.tags[?(@=="vip")]'
# Returns: ["vip"] (matched elements)
# Wildcard: get all values in an array
JSON.GET user:1 '$.tags[*]'
# Returns: ["premium","vip","beta-tester"]
# ── JSON.MGET: get a path from multiple keys at once ────────────────────
JSON.MGET user:1 user:2 $.name
# Returns: [["Alice"], ["Bob"]]
# ── JSON.TYPE: get the JSON type of a value ──────────────────────────────
JSON.TYPE user:1 $.score # Returns: ["integer"]
JSON.TYPE user:1 $.tags # Returns: ["array"]
JSON.TYPE user:1 $ # Returns: ["object"]RedisJSON supports JSONPath v2 (RFC 9535) syntax. The $ root operator, . child access, [*] wildcards, [?(@.field>value)] filter expressions, and .. recursive descent are all available. When combined with RediSearch, you can define a search index over JSON fields and run full-text queries, range searches, and aggregations across millions of documents in milliseconds — bypassing the need to iterate keys with SCAN.
Node.js Implementation with ioredis
The ioredis library provides a Promise-based API for all Redis Stream and Pub/Sub commands. Use separate client instances for blocking reads (which hold the connection) and regular commands. The ioredis cluster mode and Sentinel mode work transparently with Stream commands.
import Redis from 'ioredis'
// ── Connection setup ─────────────────────────────────────────────────────
const producer = new Redis({ host: 'localhost', port: 6379 })
// Dedicated connection for blocking reads — XREADGROUP holds the connection
const consumer = producer.duplicate()
// ── Producer: append JSON events to a stream ────────────────────────────
async function publishEvent(streamKey: string, event: Record<string, unknown>) {
const payload = JSON.stringify(event)
// XADD streamKey * event <json>
const id = await producer.xadd(streamKey, '*', 'event', payload)
console.log('Published event:', id, event.type)
return id
}
// Usage
await publishEvent('orders', { type: 'order_created', id: 1001, total: 49.99 })
await publishEvent('orders', { type: 'order_shipped', id: 1001, carrier: 'FedEx' })
// ── Consumer group setup ─────────────────────────────────────────────────
async function setupConsumerGroup(streamKey: string, groupName: string) {
try {
// Create group starting from the latest message ($)
await producer.xgroup('CREATE', streamKey, groupName, '$', 'MKSTREAM')
console.log('Consumer group created:', groupName)
} catch (err: unknown) {
// BUSYGROUP: group already exists — safe to ignore
if (err instanceof Error && err.message.includes('BUSYGROUP')) {
console.log('Consumer group already exists:', groupName)
} else {
throw err
}
}
}
await setupConsumerGroup('orders', 'processors')
// ── Consumer: read and process JSON messages ─────────────────────────────
async function runConsumer(
streamKey: string,
groupName: string,
consumerName: string,
processEvent: (id: string, event: Record<string, unknown>) => Promise<void>
) {
console.log(`Consumer ${consumerName} started`)
while (true) {
try {
// XREADGROUP GROUP groupName consumerName COUNT 10 BLOCK 5000 STREAMS streamKey >
const results = await consumer.xreadgroup(
'GROUP', groupName, consumerName,
'COUNT', '10',
'BLOCK', '5000',
'STREAMS', streamKey,
'>' // > = undelivered messages only
) as Array<[string, Array<[string, string[]]>]> | null
if (!results) continue // timeout — no new messages, loop again
for (const [, messages] of results) {
for (const [id, fields] of messages) {
// fields = ['event', '{"type":"order_created",...}']
const fieldMap = new Map<string, string>()
for (let i = 0; i < fields.length; i += 2) {
fieldMap.set(fields[i], fields[i + 1])
}
const rawJson = fieldMap.get('event')
if (!rawJson) continue
let event: Record<string, unknown>
try {
event = JSON.parse(rawJson) as Record<string, unknown>
} catch {
console.error('Invalid JSON in message', id, rawJson)
// Ack to remove from PEL — don't retry malformed JSON
await producer.xack(streamKey, groupName, id)
continue
}
try {
await processEvent(id, event)
// XACK after successful processing
await producer.xack(streamKey, groupName, id)
} catch (err) {
console.error('Failed to process event', id, err)
// Don't ack — message stays in PEL for retry via XAUTOCLAIM
}
}
}
} catch (err) {
if (err instanceof Error && err.message.includes('ECONNRESET')) {
console.log('Connection reset, reconnecting...')
await new Promise(res => setTimeout(res, 1000))
} else {
throw err
}
}
}
}
// ── Claim stale messages from crashed workers ────────────────────────────
async function claimStalePending(
streamKey: string,
groupName: string,
consumerName: string,
idleMs = 60_000,
maxRetries = 5
) {
let startId = '0-0'
while (true) {
// XAUTOCLAIM: transfer messages idle > idleMs to this consumer
const [nextId, messages, deletedIds] = await producer.xautoclaim(
streamKey, groupName, consumerName,
idleMs.toString(), startId, 'COUNT', '10'
) as [string, Array<[string, string[]]>, string[]]
for (const [id, fields] of messages) {
// Check delivery count via XPENDING to detect poison pills
const pending = await producer.xpending(streamKey, groupName, id, id, '1') as Array<[string, string, number, number]>
const deliveryCount = pending[0]?.[3] ?? 0
if (deliveryCount > maxRetries) {
// Route to dead-letter stream and acknowledge
const rawJson = fields[1] ?? '{}'
await producer.xadd('orders-dlq', '*', 'original_id', id, 'event', rawJson, 'reason', 'max_retries_exceeded')
await producer.xack(streamKey, groupName, id)
console.warn('Moved to DLQ:', id, 'deliveries:', deliveryCount)
}
// Otherwise the message is now pending for this consumer — process normally
}
if (nextId === '0-0') break // no more pending messages
startId = nextId
}
if (deletedIds.length > 0) {
console.log('Deleted entries (no longer in stream):', deletedIds)
}
}
// ── Pub/Sub: ephemeral JSON broadcasting ────────────────────────────────
const subscriber = producer.duplicate()
const notifier = producer.duplicate()
// Subscribe before publishing to avoid missing messages
await subscriber.subscribe('order-events')
subscriber.on('message', (channel: string, message: string) => {
const event = JSON.parse(message) as Record<string, unknown>
console.log('Received on channel', channel, ':', event)
})
// Publish a JSON notification
await notifier.publish('order-events', JSON.stringify({ orderId: 1001, status: 'shipped' }))
// Pattern subscribe — receive from 'orders.*' channels
const patternSubscriber = producer.duplicate()
await patternSubscriber.psubscribe('orders.*')
patternSubscriber.on('pmessage', (pattern: string, channel: string, message: string) => {
console.log('Pattern:', pattern, 'Channel:', channel, 'Message:', JSON.parse(message))
})
// ── RedisJSON with ioredis ───────────────────────────────────────────────
// RedisJSON commands are available via redis.call() in older ioredis versions
// or natively via ioredis 5.x+
// Store a document
await producer.call('JSON.SET', 'user:1', '$', JSON.stringify({
name: 'Alice',
score: 0,
tags: ['premium'],
}))
// Get a specific field
const name = await producer.call('JSON.GET', 'user:1', '$.name')
// Returns: '["Alice"]' — parse the result
const parsed = JSON.parse(name as string) as string[]
console.log('Name:', parsed[0]) // "Alice"
// Atomically increment a field
await producer.call('JSON.NUMINCRBY', 'user:1', '$.score', '10')
// Append to an array
await producer.call('JSON.ARRAPPEND', 'user:1', '$.tags', '"vip"')
// Graceful shutdown
process.on('SIGTERM', async () => {
await producer.quit()
await consumer.quit()
await subscriber.quit()
})Use producer.duplicate() to create sibling connections that share the same configuration (host, port, auth) without sharing state. A blocking XREADGROUP with BLOCK 5000 holds the TCP connection open for up to 5 seconds — this is why it needs a dedicated connection. The main producer client handles XADD, XACK, and other non-blocking commands concurrently using ioredis's command queue. Always call client.quit() on shutdown to gracefully close connections and flush pending commands.
Key Terms
- Redis Stream
- A Redis data structure that functions as an append-only log of messages. Each message has a unique ID of the format
milliseconds-sequence(e.g.,1716900000000-0) and one or more field-value pairs. Streams persist messages across restarts (via AOF or RDB), support consumer groups for parallel processing with acknowledgement, and allow random-access reads usingXRANGEwith any start/end ID. Stream memory is bounded usingXADD MAXLENorXTRIM. Unlike lists, streams do not delete messages on read — messages stay until explicitly deleted or trimmed. - Consumer Group
- A named group of consumers that cooperatively process a single stream. Created with
XGROUP CREATE, a consumer group maintains alast-delivered-idcursor and a Pending Entries List (PEL). EachXREADGROUPcall delivers a non-overlapping subset of messages to the calling consumer — different workers receive different messages. Consumer names are arbitrary strings created implicitly on first use. Up to 128 consumer groups per stream are supported, each with an independent cursor so multiple groups can process the same stream at different paces. - Pending Entries List (PEL)
- A per-consumer-group data structure that tracks messages delivered by
XREADGROUPbut not yet acknowledged withXACK. Each PEL entry records the message ID, the consumer name, the delivery timestamp, and the delivery count.XPENDINGdisplays PEL contents.XAUTOCLAIMtransfers PEL entries from one consumer to another after they exceed an idle timeout — enabling automatic recovery from crashed workers. Messages in the PEL are not redelivered automatically; applications must callXAUTOCLAIMorXCLAIMto recover them. The PEL is also stored in Redis memory, so unacknowledged messages consume memory beyond the stream itself. - Pub/Sub
- Redis Publish/Subscribe is a message-passing paradigm where publishers send messages to named channels and subscribers receive messages from channels they are subscribed to.
PUBLISHreturns the number of subscribers who received the message — if 0, the message is permanently lost. Subscribers must be connected whenPUBLISHis called to receive the message; there is no persistence and no replay. Connections inSUBSCRIBEmode can only issue subscribe/unsubscribe commands. Pattern subscriptions usingPSUBSCRIBEwith glob patterns (orders.*) match multiple channels. Pub/Sub is appropriate for real-time notifications where message loss is acceptable. - RedisJSON
- A Redis module that adds native JSON storage and manipulation commands to Redis. Documents are stored as binary-encoded tree structures rather than flat strings, enabling O(1) field access using JSONPath expressions without client-side deserialization. Key commands:
JSON.SET(store/update),JSON.GET(retrieve with JSONPath),JSON.NUMINCRBY(atomic numeric increment),JSON.ARRAPPEND(append to array),JSON.DEL(delete field or document). RedisJSON is included in Redis Stack and Redis Cloud; self-hosted Redis requires loading thelibrejson.somodule. When combined with RediSearch, JSON fields can be indexed for full-text search and range queries. - XAUTOCLAIM
- A Redis command (available since Redis 6.2) that atomically transfers ownership of pending messages from one consumer to another based on idle time.
XAUTOCLAIM mystream mygroup new-worker 60000 0-0 COUNT 10claims up to 10 messages that have been idle (unacknowledged) for more than 60,000 ms and assigns them tonew-worker. Returns a cursor for paginating through the full PEL, the claimed messages, and a list of IDs that no longer exist in the stream (stale PEL entries).XAUTOCLAIMis the preferred replacement for the olderXCLAIMcommand, which required knowing specific message IDs in advance.
FAQ
How do I add JSON messages to a Redis Stream?
Use XADD with a field-value pair where the value is a JSON string. XADD mystream * event '{"type":"order","id":123,"total":49.99}' appends a message and returns an auto-generated ID like 1716900000000-0 — a millisecond timestamp plus a sequence number. The asterisk (*) tells Redis to generate the ID automatically; you can supply your own ID if you need deterministic IDs. Each stream message is a map of fields to string values, so you typically use a single field (event, payload, or data) holding the full JSON string. XADD is O(1) and persists the message to the AOF or RDB snapshot according to your Redis persistence config. Stream length is unbounded by default — use XADD MAXLEN ~ 10000 mystream * ... to cap the stream at approximately 10,000 messages with a radix-tree compaction.
How do Redis consumer groups process JSON messages?
Consumer groups let multiple worker processes each receive a different subset of messages, enabling horizontal scaling with at-least-once delivery. Create a group with XGROUP CREATE mystream mygroup $ MKSTREAM — the $ means start from the latest message (use 0 to replay all history). Each worker then calls XREADGROUP GROUP mygroup worker-1 COUNT 10 STREAMS mystream > — the > cursor means "give me messages not yet delivered to any consumer in this group." Redis marks each delivered message as pending in the Pending Entries List (PEL). After processing, call XACK mystream mygroup messageId to remove it from the PEL. Unacknowledged messages stay in the PEL indefinitely and can be reclaimed by another worker after a timeout using XAUTOCLAIM. Up to 128 consumer groups per stream are supported, each maintaining independent delivery cursors.
What is the difference between Redis Streams and Pub/Sub for JSON?
Redis Pub/Sub (PUBLISH/SUBSCRIBE) is fire-and-forget: messages are delivered only to subscribers currently connected at publish time. If no subscriber is listening, the message is permanently lost — there is no persistence and no replay. Redis Streams persist every message in a log structure and allow any number of consumers to read from any position. A consumer joining 5 minutes after messages were published can replay them from ID 0. Streams support consumer groups for parallel processing with acknowledgement, while Pub/Sub delivers to all subscribers simultaneously (fan-out). Choose Pub/Sub for ephemeral broadcasts where loss is acceptable (chat typing indicators, live score updates). Choose Streams for durable event queues where every JSON event must be processed at least once, such as order processing, audit logs, or ETL pipelines. Redis 7.0+ allows up to 4 billion messages per stream before ID space exhaustion.
How do I acknowledge a JSON message in a Redis consumer group?
After successfully processing a message delivered by XREADGROUP, call XACK mystream mygroup messageId. The messageId is the string like 1716900000000-0 returned from XADD or included in the XREADGROUP response. XACK removes the message from the Pending Entries List (PEL) for that consumer group, confirming successful delivery. If you do not call XACK, the message stays in the PEL and will appear in XPENDING output. To inspect pending messages, run XPENDING mystream mygroup - + 10 which returns up to 10 pending message IDs along with the consumer name, idle time in milliseconds, and delivery count. Messages delivered more than N times without acknowledgement typically indicate a poison-pill message — handle these by routing to a dead-letter stream or logging the raw JSON for manual inspection. XAUTOCLAIM mystream mygroup newconsumer 60000 0-0 COUNT 10 atomically transfers messages idle for more than 60 seconds to a different consumer.
What is RedisJSON and how does it differ from storing JSON strings?
RedisJSON is a Redis module (included in Redis Stack and Redis Cloud) that stores JSON natively as a parsed binary structure rather than as a flat string. JSON.SET user:1 $ '{"name":"Alice","score":42}' stores the document in a tree. JSON.GET user:1 $.name returns ["Alice"] using JSONPath syntax — you query a specific field without fetching and parsing the entire document on the client. With plain string storage (SET + GET + JSON.parse), every read fetches the full serialized string. JSON.NUMINCRBY user:1 $.score 1 atomically increments a numeric field in-place without a read-modify-write cycle. JSON.ARRAPPEND user:1 $.tags '"vip"' appends to an array. RedisJSON supports JSONPath expressions including recursive descent ($..), wildcards ($.items[*].price), and filters ($.orders[?(@.total > 100)]). RediSearch can index RedisJSON fields for full-text and range queries across millions of documents in under 1 ms.
How do I replay JSON messages from a Redis Stream?
Pass the start ID 0 to XREAD to replay from the beginning of the stream. XREAD COUNT 100 STREAMS mystream 0 returns up to 100 messages starting from ID 0-0 (the earliest entry). For a consumer group, XREADGROUP GROUP mygroup new-consumer COUNT 100 STREAMS mystream 0 delivers all messages that were never acknowledged by that group — useful for catching up after a crash. To replay from a specific timestamp, convert the Unix millisecond timestamp to a stream ID prefix: XREAD COUNT 100 STREAMS mystream 1716900000000-0. To replay everything between two IDs, use XRANGE mystream startId endId COUNT 100. XRANGE mystream - + returns all messages (- is the minimum possible ID, + is the maximum). For very large streams, iterate with XRANGE using the last received ID as the new start to avoid loading millions of messages in one response. Stream messages are retained until explicitly deleted with XDEL or trimmed with XTRIM.
How do I read new JSON messages as they arrive in Redis Streams?
Use the BLOCK option with XREAD to wait for new messages. XREAD COUNT 10 BLOCK 5000 STREAMS mystream $ blocks for up to 5000 milliseconds (5 seconds) and returns as soon as up to 10 new messages arrive. The $ cursor means "only messages newer than the last one I received." Replace $ with the last received ID on subsequent calls to avoid missing messages between polls — $ only works for the very first call. For consumer groups: XREADGROUP GROUP mygroup worker-1 COUNT 10 BLOCK 5000 STREAMS mystream > blocks waiting for undelivered messages. A BLOCK value of 0 waits indefinitely. This blocking pattern eliminates polling loops and allows a single connection to efficiently consume a stream with sub-millisecond latency after message arrival. In ioredis, use a loop with await client.xread('COUNT', 10, 'BLOCK', 5000, 'STREAMS', 'mystream', lastId) and update lastId after each successful read.
How do I implement a Redis Stream consumer in Node.js?
Use the ioredis library which has first-class Stream support. Install with npm install ioredis. Create a client: const client = new Redis(). To read with a consumer group, first create the group: await client.xgroup('CREATE', 'mystream', 'mygroup', '$', 'MKSTREAM'). Then in a loop: const results = await client.xreadgroup('GROUP', 'mygroup', 'worker-1', 'COUNT', 10, 'BLOCK', 5000, 'STREAMS', 'mystream', '>'). If results is non-null, iterate through the messages array. Each message is an array of [id, fields] where fields alternates field names and values — fields[1] is the JSON string for a single-field message. Always call await client.xack('mystream', 'mygroup', id) after successful processing. Use a separate connection (via client.duplicate()) for blocking reads to avoid blocking other commands. Handle ECONNRESET errors with a retry loop and BUSYGROUP errors when creating consumer groups that already exist.
Validate your Redis Stream JSON payloads
Use Jsonic's JSON validator to check your stream event schemas before publishing — catch malformed JSON before it reaches your consumers.
Open JSON ValidatorFurther reading and primary sources
- Redis Streams Introduction — Official Redis documentation covering XADD, XREAD, consumer groups, and the PEL
- Redis Pub/Sub — PUBLISH, SUBSCRIBE, PSUBSCRIBE, and keyspace notifications reference
- RedisJSON Documentation — JSON.SET, JSON.GET, JSONPath syntax, and RediSearch integration
- ioredis GitHub Repository — Node.js Redis client with Stream, Pub/Sub, and Cluster support
- Redis Stream Tutorial — Hands-on walkthrough of Redis Streams with consumer groups and XAUTOCLAIM