JSON in Event-Driven Architecture: CloudEvents, Kafka, and SQS

Last updated:

JSON event messages in event-driven architectures carry a typed payload — a type field identifies the event, data carries the domain payload, and metadata fields like eventId, timestamp, and correlationId enable tracing and deduplication. The CloudEvents specification standardizes JSON event format across cloud providers: specversion, type (reverse-DNS like com.example.user.created), source, id, time, and data — with a content type of application/cloudevents+json distinguishing it from plain JSON payloads. AWS SQS delivers JSON events as a Records array — each record has body as a JSON string requiring a second JSON.parse(); SNS fan-out to SQS adds a third level of nesting where JSON.parse(record.body).Message is itself a JSON string. This guide covers CloudEvents JSON format, Kafka JSON producer and consumer patterns, AWS SQS and SNS JSON envelopes, event schema registry setup, idempotency via eventId, and dead letter queue handling. Use Jsonic's JSON formatter to inspect and debug event payloads while developing.

JSON Event Envelope Design

Every event in an event-driven system should conform to a shared envelope schema. The envelope separates routing metadata from domain payload, enabling consumers to filter, trace, and deduplicate events without parsing the inner data field. A well-designed envelope makes debugging distributed failures tractable — you can trace a chain of events by following correlationId across services, and identify the exact triggering event via causationId.

A minimal production event envelope needs: type (reverse-DNS event name), eventId (UUID v4 for deduplication), timestamp (ISO 8601), source (the producing service), correlationId (links all events in a user-initiated flow), causationId (ID of the event that caused this one), schemaVersion (semver for evolution), and data (the domain payload). Never put operational metadata inside data — keep it flat on the envelope so consumers can act on it without deserializing the payload.

// Minimal event envelope
{
  "type": "com.example.user.created",
  "eventId": "550e8400-e29b-41d4-a716-446655440000",
  "timestamp": "2025-05-19T10:00:00.000Z",
  "source": "/services/user-service",
  "data": {
    "userId": "u123",
    "email": "alice@example.com",
    "plan": "pro"
  }
}

// Full production envelope with tracing fields
{
  "type": "com.example.order.placed",
  "eventId": "7f3a9b2c-1234-4abc-8def-000000000001",
  "timestamp": "2025-05-19T10:01:00.000Z",
  "source": "/services/order-service",
  "schemaVersion": "1.0.0",
  "correlationId": "corr-abc123",   // ties all events in this user flow
  "causationId": "550e8400-e29b-41d4-a716-446655440000",  // caused by user.created
  "data": {
    "orderId": "ord-456",
    "userId": "u123",
    "items": [{ "sku": "PRO-PLAN", "qty": 1 }],
    "totalCents": 2999
  }
}

Use reverse-DNS naming for type: com.example.user.created, com.example.order.placed, com.example.payment.failed. This prevents collisions in multi-tenant event buses (AWS EventBridge, Azure Event Grid) where multiple services publish to a shared bus. Generate eventId with crypto.randomUUID() in Node.js 15+ or the uuid package. Propagate correlationId from inbound HTTP request headers (e.g., X-Correlation-ID) into every outbound event — this links the entire distributed trace. See the JSON audit logging guide for complementary tracing patterns.

CloudEvents JSON Format

CloudEvents is a CNCF specification that standardizes the event envelope structure across cloud providers. AWS EventBridge, Azure Event Grid, GCP Eventarc, Knative, and Dapr all support CloudEvents natively — adopting it means your event format is portable across infrastructure. The CloudEvents JSON binding requires five mandatory attributes and uses application/cloudevents+json as the content type to distinguish it from arbitrary JSON.

// Minimal CloudEvents 1.0 JSON message
{
  "specversion": "1.0",
  "type": "com.example.user.created",
  "source": "https://example.com/users/service",
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "time": "2025-05-19T10:00:00.000Z",
  "data": {
    "userId": "u123",
    "email": "alice@example.com"
  }
}

// CloudEvents with optional attributes
{
  "specversion": "1.0",
  "type": "com.example.order.placed",
  "source": "https://example.com/orders/service",
  "id": "7f3a9b2c-1234-4abc-8def-000000000001",
  "time": "2025-05-19T10:01:00.000Z",
  "datacontenttype": "application/json",
  "schemaurl": "https://example.com/schemas/order-placed/1.0.0",
  // Extension attributes — lowercase, no hyphens
  "correlationid": "corr-abc123",
  "causationid": "550e8400-e29b-41d4-a716-446655440000",
  "data": {
    "orderId": "ord-456",
    "userId": "u123",
    "totalCents": 2999
  }
}
// Producing a CloudEvents message with cloudevents SDK (Node.js)
import { CloudEvent, emitterFor, httpTransport } from 'cloudevents'

const event = new CloudEvent({
  type: 'com.example.user.created',
  source: 'https://example.com/users/service',
  data: { userId: 'u123', email: 'alice@example.com' },
  // Extension attributes
  correlationid: 'corr-abc123',
})

// Emit over HTTP
const emit = emitterFor(httpTransport('https://receiver.example.com/events'))
await emit(event)

// Or serialize to JSON string for Kafka/SQS
const jsonString = JSON.stringify(event)

// Consuming a CloudEvents message
import { HTTP } from 'cloudevents'
const received = HTTP.toEvent({ headers: req.headers, body: req.body })
console.log(received.type)   // 'com.example.user.created'
console.log(received.data)   // { userId: 'u123', email: 'alice@example.com' }

CloudEvents extension attributes must be lowercase alphanumeric — no hyphens or underscores — which is why correlationId becomes correlationid in a CloudEvents envelope. The source attribute should be a URI that uniquely identifies the producer context: use the service URL (e.g., https://example.com/orders/service) or a URN (e.g., urn:example:orders:service). The datacontenttype attribute (application/json) tells consumers how to interpret data; omitting it implies the data is JSON-encoded. See the JSON data validation guide for validating CloudEvents payloads with Zod or JSON Schema.

Kafka JSON Producer and Consumer

Kafka does not enforce message format — it treats message values as opaque byte arrays. JSON is the most common format for Kafka messages because it requires no schema definition upfront and is human-readable in consumer group lag tools and log viewers. The tradeoff is that JSON is larger than Avro or Protobuf — typically 3–5x larger for numeric-heavy payloads — and provides no compile-time schema enforcement without an additional schema registry.

// Kafka JSON producer with kafkajs
import { Kafka } from 'kafkajs'

const kafka = new Kafka({
  clientId: 'user-service',
  brokers: ['kafka-broker:9092'],
})

const producer = kafka.producer()
await producer.connect()

async function publishEvent(event: {
  type: string
  eventId: string
  timestamp: string
  data: object
}) {
  await producer.send({
    topic: 'user-events',
    messages: [
      {
        // Use eventId as the Kafka message key
        // — ensures same-user events go to the same partition
        key: event.eventId,
        value: JSON.stringify(event),
        headers: {
          'content-type': 'application/json',
          'event-type': event.type,
          'event-id': event.eventId,
        },
      },
    ],
  })
}

await publishEvent({
  type: 'com.example.user.created',
  eventId: crypto.randomUUID(),
  timestamp: new Date().toISOString(),
  data: { userId: 'u123', email: 'alice@example.com' },
})
// Kafka JSON consumer with kafkajs
const consumer = kafka.consumer({ groupId: 'email-service-group' })
await consumer.connect()
await consumer.subscribe({ topic: 'user-events', fromBeginning: false })

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    // message.value is a Buffer — convert to string first
    if (!message.value) return

    let event: {
      type: string
      eventId: string
      timestamp: string
      data: unknown
    }

    try {
      event = JSON.parse(message.value.toString())
    } catch (err) {
      console.error('Failed to parse Kafka message as JSON', {
        topic,
        partition,
        offset: message.offset,
        raw: message.value.toString().slice(0, 200),
      })
      // Do NOT throw — throwing causes kafkajs to retry the same message
      return
    }

    // Idempotency check before processing
    const alreadyProcessed = await redis.set(
      `eventId:${event.eventId}`,
      '1',
      'EX', 86400,
      'NX'
    )
    if (alreadyProcessed === null) {
      console.log('Skipping duplicate event', event.eventId)
      return
    }

    switch (event.type) {
      case 'com.example.user.created':
        await sendWelcomeEmail(event.data as { userId: string; email: string })
        break
      default:
        console.warn('Unknown event type', event.type)
    }
  },
})

Use the eventId as the Kafka message key when all events for the same entity (e.g., same user) must be processed in order — Kafka guarantees ordering within a partition, and the key determines which partition a message goes to. For schema enforcement, configure the Confluent Schema Registry serializer so invalid JSON structure is caught at the producer before reaching any consumer. See the JSON in Kafka guide for advanced producer and consumer configurations.

AWS SQS JSON Message Structure

AWS SQS wraps each message in a Records array when delivering to a Lambda function. The critical detail: record.body is a JSON string, not a parsed object. Every SQS Lambda handler must call JSON.parse(record.body) explicitly. SQS supports batch processing — a single Lambda invocation can receive up to 10 messages — so always iterate over all records rather than assuming a single message.

// AWS Lambda handler for SQS events (TypeScript)
import { SQSHandler, SQSRecord } from 'aws-lambda'

interface UserCreatedEvent {
  type: string
  eventId: string
  timestamp: string
  data: {
    userId: string
    email: string
  }
}

// SQS message structure delivered to Lambda:
// {
//   "Records": [
//     {
//       "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
//       "receiptHandle": "AQEBwJ...long string...",
//       "body": "{"type":"com.example.user.created","eventId":"..."}",
//       "attributes": {
//         "ApproximateReceiveCount": "1",
//         "SentTimestamp": "1716120000000",
//         "SenderId": "AROAEXAMPLE",
//         "ApproximateFirstReceiveTimestamp": "1716120001000"
//       },
//       "messageAttributes": {},
//       "md5OfBody": "...",
//       "eventSource": "aws:sqs",
//       "eventSourceARN": "arn:aws:sqs:us-east-1:123456789:user-events",
//       "awsRegion": "us-east-1"
//     }
//   ]
// }

export const handler: SQSHandler = async (event) => {
  const failures: { itemIdentifier: string }[] = []

  for (const record of event.Records) {
    try {
      await processRecord(record)
    } catch (err) {
      console.error('Failed to process record', { messageId: record.messageId, err })
      // Return failed messageIds for partial batch failure
      failures.push({ itemIdentifier: record.messageId })
    }
  }

  // Partial batch response — only failed messages go to DLQ
  return { batchItemFailures: failures }
}

async function processRecord(record: SQSRecord) {
  // body is always a JSON string — always JSON.parse
  const event: UserCreatedEvent = JSON.parse(record.body)

  // Check retry count — skip poison-pill messages
  const receiveCount = parseInt(record.attributes.ApproximateReceiveCount, 10)
  if (receiveCount > 3) {
    console.error('Exceeded retry limit, sending to DLQ enriched', { event, receiveCount })
    throw new Error(`Max retries exceeded for eventId ${event.eventId}`)
  }

  // Process the event
  await sendWelcomeEmail(event.data)
}

Use SQS partial batch response (returning batchItemFailures) so only failed messages are retried — without it, a single failure causes the entire batch to be reprocessed. Set the SQS visibility timeout to at least 6x the Lambda timeout to prevent a message from becoming visible again while the Lambda is still processing it. Use MessageAttributes for routing metadata (event type, schema version) that consumers can filter on without parsing the body. See the JSON audit logging guide for structured logging patterns that complement SQS event processing.

AWS SNS Fan-Out with JSON

AWS SNS delivers JSON messages to multiple SQS queues, Lambda functions, and HTTP endpoints simultaneously — the fan-out pattern. When SNS delivers to an SQS queue, it wraps your message in an SNS envelope so the SQS consumer receives a nested JSON structure: the SQS body is the SNS notification JSON, and the SNS Message field inside it is your original JSON string. This double-parse requirement surprises many developers and causes silent data loss when only the outer parse is done.

// SNS envelope received in SQS body (raw string):
// {
//   "Type": "Notification",
//   "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
//   "TopicArn": "arn:aws:sns:us-east-1:123456789:user-events",
//   "Subject": "user.created",
//   "Message": "{"type":"com.example.user.created","eventId":"..."}",
//   "Timestamp": "2025-05-19T10:00:00.000Z",
//   "SignatureVersion": "1",
//   "Signature": "...",
//   "MessageAttributes": {
//     "eventType": {
//       "Type": "String",
//       "Value": "com.example.user.created"
//     }
//   }
// }

// Lambda handler — SNS fan-out to SQS
export const handler: SQSHandler = async (event) => {
  for (const record of event.Records) {
    // Step 1: Parse the SQS body to get the SNS envelope
    const snsEnvelope = JSON.parse(record.body) as {
      Type: string
      Message: string
      MessageAttributes: Record<string, { Type: string; Value: string }>
    }

    // Step 2: Parse the SNS Message field to get your domain event
    const domainEvent = JSON.parse(snsEnvelope.Message)

    console.log('Processing event', {
      type: domainEvent.type,
      eventId: domainEvent.eventId,
      snsMessageId: snsEnvelope.MessageAttributes,
    })

    await processEvent(domainEvent)
  }
}
// SNS subscription filter policy — only deliver matching events
// Set on the SQS subscription, not in code
// Attach via AWS Console, CDK, or Terraform:
const filterPolicy = {
  eventType: ['com.example.user.created', 'com.example.user.updated'],
  // Only deliver events where plan = 'pro'
  plan: ['pro'],
}

// Publishing to SNS with MessageAttributes for filtering
import { SNSClient, PublishCommand } from '@aws-sdk/client-sns'
const sns = new SNSClient({ region: 'us-east-1' })

await sns.send(new PublishCommand({
  TopicArn: 'arn:aws:sns:us-east-1:123456789:user-events',
  Message: JSON.stringify(domainEvent),
  MessageAttributes: {
    eventType: { DataType: 'String', StringValue: domainEvent.type },
    schemaVersion: { DataType: 'String', StringValue: '1.0.0' },
  },
}))

SNS message filtering with MessageAttributes lets subscribers receive only the event types they care about — without writing filter logic in consumer code. The filter policy is evaluated by SNS before delivery, so filtered-out messages are not delivered to the SQS queue at all, reducing queue depth and Lambda invocations. Use MessageAttributes for high-cardinality routing keys like event type and schema version; use the SNS message body filter for content-based routing on domain payload fields.

Event Schema Registry

A schema registry centralizes event schema definitions and enforces compatibility rules so producers cannot publish breaking changes without explicit version bumps. Without a schema registry, JSON event structure drifts silently as services evolve — a producer renames a field or changes a type, and consumers break at runtime with no compile-time warning. Both Confluent Schema Registry (Kafka-native) and AWS Glue Schema Registry (AWS-native) support JSON Schema alongside Avro and Protobuf.

// JSON Schema for the user.created event — stored in schema registry
{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "$id": "https://schemas.example.com/user-created/1.0.0",
  "title": "com.example.user.created",
  "type": "object",
  "required": ["type", "eventId", "timestamp", "source", "data"],
  "properties": {
    "type": { "type": "string", "const": "com.example.user.created" },
    "eventId": { "type": "string", "format": "uuid" },
    "timestamp": { "type": "string", "format": "date-time" },
    "source": { "type": "string", "format": "uri" },
    "correlationId": { "type": "string" },
    "causationId": { "type": "string" },
    "data": {
      "type": "object",
      "required": ["userId", "email"],
      "properties": {
        "userId": { "type": "string" },
        "email": { "type": "string", "format": "email" },
        "plan": { "type": "string", "enum": ["free", "pro", "enterprise"] }
      },
      "additionalProperties": false
    }
  }
}
// AWS Glue Schema Registry — register and validate schema
import { GlueClient, CreateSchemaCommand, RegisterSchemaVersionCommand } from '@aws-sdk/client-glue'

const glue = new GlueClient({ region: 'us-east-1' })

// Register schema version (first time)
await glue.send(new CreateSchemaCommand({
  RegistryId: { RegistryName: 'event-schemas' },
  SchemaName: 'com.example.user.created',
  DataFormat: 'JSON',
  Compatibility: 'BACKWARD',  // new schema can read old messages
  SchemaDefinition: JSON.stringify(userCreatedJsonSchema),
}))

// Register a new schema version (evolution)
await glue.send(new RegisterSchemaVersionCommand({
  SchemaId: {
    RegistryName: 'event-schemas',
    SchemaName: 'com.example.user.created',
  },
  SchemaDefinition: JSON.stringify(userCreatedJsonSchemaV2),
}))

// Compatibility modes:
// BACKWARD — consumers using new schema can read old messages (safe to add optional fields)
// FORWARD  — consumers using old schema can read new messages (safe to remove optional fields)
// FULL     — both BACKWARD and FORWARD
// NONE     — no compatibility checks (dangerous in production)

Use BACKWARD compatibility for most JSON event schemas — it lets you add new optional fields without breaking existing consumers that use the old schema. Avoid renaming or removing required fields without a major version bump and a consumer migration. Register schemas in CI as part of your build pipeline — fail the build if a proposed schema change is incompatible with the registered version. See the JSON data validation guide for runtime validation using Zod or Ajv against the same JSON Schema definitions.

Idempotency and Dead Letter Queues

SQS and Kafka both provide at-least-once delivery — the same message can arrive more than once due to network retries, consumer crashes, or visibility timeout expiry. Idempotent consumers handle duplicate events safely by checking whether an eventId was already processed before executing business logic. Dead letter queues (DLQ) capture messages that failed after the maximum retry count, preserving them for investigation and replay without blocking the main queue.

// Idempotency with Redis NX pattern
import { createClient } from 'redis'

const redis = createClient({ url: process.env.REDIS_URL })
await redis.connect()

async function processEventIdempotently(event: {
  eventId: string
  type: string
  data: unknown
}) {
  // SET eventId 1 EX 86400 NX — atomic check-and-set
  // Returns 'OK' if key was set (first time), null if already exists
  const result = await redis.set(
    `processed:eventId:${event.eventId}`,
    '1',
    { EX: 86400, NX: true }  // 24-hour TTL, only set if Not eXists
  )

  if (result === null) {
    // Already processed — skip safely
    console.log('Duplicate event skipped', { eventId: event.eventId, type: event.type })
    return { status: 'skipped', reason: 'duplicate' }
  }

  // First time — process the event
  try {
    await handleEvent(event)
    return { status: 'processed' }
  } catch (err) {
    // Remove the Redis key so the event can be retried
    await redis.del(`processed:eventId:${event.eventId}`)
    throw err
  }
}
// Dead letter queue handler — enrich and log failed events
interface DLQMessage {
  originalEvent: unknown
  dlqReason: string
  dlqTimestamp: string
  originalQueue: string
  retryCount: number
  lastError: string
}

export const dlqHandler: SQSHandler = async (event) => {
  for (const record of event.Records) {
    const originalBody = JSON.parse(record.body)
    const receiveCount = parseInt(record.attributes.ApproximateReceiveCount, 10)

    const dlqMessage: DLQMessage = {
      originalEvent: originalBody,
      dlqReason: 'max_retries_exceeded',
      dlqTimestamp: new Date().toISOString(),
      originalQueue: record.eventSourceARN,
      retryCount: receiveCount,
      lastError: 'See CloudWatch logs for details',
    }

    // Structured log for alerting and dashboards
    console.error('DLQ message received', JSON.stringify(dlqMessage))

    // Send alert to PagerDuty / Slack
    await sendAlert({
      severity: 'critical',
      title: `DLQ: failed to process ${(originalBody as { type?: string }).type ?? 'unknown event'}`,
      body: JSON.stringify(dlqMessage, null, 2),
    })

    // Store in DynamoDB for manual replay
    await dynamoDB.put({
      TableName: 'dlq-archive',
      Item: {
        pk: record.messageId,
        ...dlqMessage,
        ttl: Math.floor(Date.now() / 1000) + 30 * 24 * 60 * 60,  // 30-day TTL
      },
    })
  }
}

Set the DLQ maxReceiveCount (SQS redrive policy) based on the nature of failures — transient network errors warrant 3–5 retries, while validation errors warrant 1 retry (they will not self-correct). Never silently swallow DLQ messages — each one represents a lost business event. Store DLQ messages in DynamoDB or S3 with a TTL so they can be replayed by a recovery script after the root cause is fixed. For Kafka, configure max.poll.interval.ms and dead letter topic routing via the ErrorHandlingDeserializer in Spring Kafka or a custom error handler in kafkajs.

Definitions

Event envelope
A JSON wrapper object that every event in an event-driven system conforms to, containing routing metadata (type, eventId, timestamp, source) and the domain payload (data). The envelope separates cross-cutting concerns like tracing and deduplication from business data, enabling infrastructure to act on events without deserializing the payload. A consistent envelope shape across all services makes debugging distributed failures significantly easier.
CloudEvents
A CNCF specification that standardizes the event envelope structure for cloud-native event-driven systems. A CloudEvents JSON message has five mandatory attributes: specversion ("1.0"), type (reverse-DNS event name), source (producer URI), id (unique event ID), and time (RFC 3339 timestamp). The content type application/cloudevents+json distinguishes CloudEvents from arbitrary JSON. AWS EventBridge, Azure Event Grid, GCP Eventarc, Knative, and Dapr all support CloudEvents natively.
Dead letter queue
A secondary message queue (DLQ) that receives messages which failed processing after the maximum number of retries. In AWS SQS, a redrive policy specifies the maximum receive count and the target DLQ ARN. In Kafka, a dead letter topic receives messages that a consumer could not process. DLQ messages retain the original payload plus system metadata (receive count, timestamps). DLQ handlers should enrich messages with failure context, send alerts, and archive messages for manual replay after the root cause is resolved.
Idempotency key
A unique identifier (eventId) included in every event that consumers use to detect and skip duplicate deliveries. At-least-once delivery guarantees in SQS, Kafka, and most message brokers mean the same message can arrive more than once. An idempotent consumer stores the eventId in a durable store (Redis, DynamoDB) after successful processing and skips any subsequent delivery with the same ID. The Redis SET key value EX ttl NX command implements this atomically in a single round-trip.
Schema registry
A centralized service that stores event schema definitions and enforces compatibility rules between producers and consumers. Confluent Schema Registry supports JSON Schema, Avro, and Protobuf for Kafka. AWS Glue Schema Registry integrates with MSK, Kinesis, and SQS. Compatibility modes — BACKWARD, FORWARD, FULL, NONE — govern which schema changes are permitted. The producer serializer validates events against the registered schema before publishing; the consumer deserializer validates on receipt, catching schema drift at the boundary.
Correlation ID
A unique identifier propagated across all events and service calls that originate from the same user-initiated request or workflow. Set from the inbound HTTP header (X-Correlation-ID) or generated at the entry point. Every outbound event and downstream service call carries the same correlationId, enabling end-to-end distributed tracing across services, message queues, and async processing steps without a dedicated tracing infrastructure.
Causation ID
The eventId of the event that directly caused the current event to be produced. While correlationId links all events in a flow, causationId records the direct parent-child relationship. For example, a com.example.user.created event (eventId: A) causes a com.example.email.welcome-sent event — that child event carries causationId: A. Together, correlationId and causationId reconstruct the full causal graph of an event-driven workflow.

Frequently asked questions

What JSON format should I use for event-driven messages?

Use a typed event envelope with a type field (reverse-DNS like com.example.user.created), a data object carrying the domain payload, and metadata fields: eventId (UUID for deduplication), timestamp (ISO 8601), correlationId (links events in a chain), and causationId (ID of the triggering event). For cross-service and cross-cloud compatibility, adopt the CloudEvents specification — it defines a standard envelope supported natively by AWS EventBridge, Azure Event Grid, GCP Eventarc, and Knative. Generate eventId with crypto.randomUUID() in Node.js 15+.

What is the CloudEvents specification?

CloudEvents is a CNCF specification that standardizes the event envelope across cloud providers. A CloudEvents JSON message requires five attributes: specversion ("1.0"), type (reverse-DNS event name), source (producer URI), id (unique event ID), and time (RFC 3339 timestamp). The content type is application/cloudevents+json. Custom extension attributes (like correlationid) must be lowercase alphanumeric with no hyphens. The official cloudevents npm package provides typed constructors and HTTP transport helpers for Node.js applications.

How do I produce and consume JSON with Kafka?

To produce JSON to Kafka with kafkajs, serialize the event with JSON.stringify(event) and set it as the message value. Use the eventId as the Kafka message key to route same-entity events to the same partition, preserving order. In the consumer, deserialize with JSON.parse(message.value.toString())message.value is a Buffer, not a string. Always wrap JSON.parse in a try/catch; do not throw on parse failure (rethrowing causes kafkajs to retry indefinitely). For schema enforcement, configure the Confluent Schema Registry serializer on the producer.

How do I parse JSON from AWS SQS messages?

AWS SQS delivers a Records array to Lambda. Each record's body property is a JSON string — always call JSON.parse(record.body) explicitly. If the message originated from SNS fan-out to SQS, the body contains an SNS envelope — your domain event is in the Message field, which is itself a JSON string: use JSON.parse(JSON.parse(record.body).Message). Use SQS partial batch response (batchItemFailures) so only failed records are retried instead of the entire batch.

What is the difference between SNS and SQS JSON envelopes?

An SQS record has body (your JSON string), messageId, receiptHandle, attributes, and messageAttributes. When SNS delivers to SQS, the SQS body becomes an SNS notification JSON object with fields Type, MessageId, TopicArn, Message (your original JSON string), Timestamp, and MessageAttributes. SNS supports subscription filter policies on MessageAttributes so only matching event types are delivered to specific queues — reducing consumer load without code changes.

How do I implement idempotency with JSON events?

Include an eventId (UUID v4) in every event envelope. Before processing, execute SET processed:eventId:<id> 1 EX 86400 NX in Redis — the NX flag makes it atomic (set only if not exists). A return value of 'OK' means first delivery — proceed with processing. A return value of null means duplicate — skip and return success so SQS does not retry. If processing fails after the key is set, delete the Redis key before re-throwing so the event can be retried. Set the TTL to at least as long as the message retention period.

What is an event schema registry?

An event schema registry stores and enforces event schema definitions centrally, preventing producers from publishing schema-breaking changes silently. Confluent Schema Registry supports JSON Schema, Avro, and Protobuf for Kafka workloads. AWS Glue Schema Registry integrates with MSK, Kinesis, and SQS. Both support compatibility modes: BACKWARD (add optional fields safely), FORWARD (remove optional fields safely), and FULL (both). Register schemas in CI — fail the build on incompatible changes before they reach production consumers.

How do I handle dead letter queue messages in JSON?

DLQ messages retain the original body JSON plus SQS attributes including ApproximateReceiveCount. In your DLQ Lambda handler, enrich each message with failure metadata: dlqReason, dlqTimestamp, originalQueue, and retryCount. Log the enriched structure as JSON for CloudWatch alerting. Archive messages in DynamoDB or S3 with a TTL for manual replay after the root cause is resolved. Never silently discard DLQ messages — each one represents a business event that was not processed. Use a separate Lambda or script to replay archived DLQ events back to the original queue after a fix is deployed.

Further reading and primary sources

Inspect event JSON payloads visually

Paste any CloudEvents, SQS, or Kafka message payload into Jsonic to pretty-print, validate, and navigate the JSON structure. Instantly spot malformed envelopes, missing eventId fields, and double-serialized body strings before they cause runtime errors in production.

Open JSON Formatter