Configure AWS SQS Reaction

Send messages to Amazon SQS when query results change

The AWS SQS Reaction Reaction A component that receives query result changes and takes action on them. Learn more sends messages to Amazon Simple Queue Service (SQS) when query results change Result Change Event A notification from a Continuous Query describing changes to its result set. Learn more . Use it to decouple downstream consumers, trigger asynchronous workflows, and integrate with AWS services.

Basic Configuration

reactions:
  - kind: aws-sqs
    id: order-notifications
    queries: [new-orders]
    queueUrl: https://sqs.us-east-1.amazonaws.com/123456789012/orders
    region: us-east-1
    defaultTemplate:
      added:
        body: '{"event":"new_order","data":{{json after}}}'

Configuration Reference

Field Type Default Description
kind string Required Must be aws-sqs
id string Required Unique reaction identifier
queries array Required Query IDs to subscribe to
autoStart boolean true Start reaction automatically
queueUrl string Required Full SQS queue URL
region string None AWS region override
endpointUrl string None Custom SQS endpoint (ElasticMQ/LocalStack)
accessKeyId string None Explicit AWS access key ID
secretAccessKey string None Explicit AWS secret access key
fifoQueue boolean false Enable FIFO queue options
messageGroupIdTemplate string None Handlebars template for FIFO MessageGroupId
routes object {} Per-query templates
defaultTemplate object None Fallback templates when a query-specific route is not defined

Route Configuration

Routes define how each change type is rendered into SQS messages. You can configure per-query routes or use defaultTemplate as a fallback.

Each query can have templates for different change types:

Change Type When Triggered Data Available
added New item in results {{after}}
updated Item changed {{before}}, {{after}}
deleted Item removed {{before}}

Template Spec

Each change type accepts a template spec:

Field Type Default Description
body string Empty (raw JSON fallback) Handlebars template for the SQS message body
messageAttributes object {} SQS message attributes with Handlebars-rendered values

Per-Query Routing

reactions:
  - kind: aws-sqs
    id: multi-query-sqs
    queries: [orders, inventory]
    queueUrl: https://sqs.us-east-1.amazonaws.com/123456789012/events
    region: us-east-1
    routes:
      orders:
        added:
          body: '{"type":"order_created","order":{{json after}}}'
          messageAttributes:
            entity-type: order
            entity-id: "{{after.id}}"
        deleted:
          body: '{"type":"order_cancelled","id":"{{before.id}}"}'
      inventory:
        updated:
          body: '{"type":"stock_changed","sku":"{{after.sku}}","qty":{{after.quantity}}}'
    defaultTemplate:
      added:
        body: '{{json after}}'
      updated:
        body: '{{json after}}'
      deleted:
        body: '{{json before}}'

Handlebars Templates

Message bodies and attribute values support Handlebars templating.

Template Variables

Variable Available On Description
{{after}} added, updated The new data object
{{after.property}} added, updated Specific property from new data
{{before}} updated, deleted The previous data object
{{before.property}} updated, deleted Specific property from previous data
{{query_id}} All The query ID that produced this change
{{query_name}} All The query name
{{operation}} All The operation type (add, update, delete)
{{timestamp}} All Event timestamp
{{json value}} All JSON-serializes nested objects

Example Template

defaultTemplate:
  added:
    body: |
      {
        "event": "created",
        "id": "{{after.id}}",
        "data": {{json after}},
        "query": "{{query_id}}",
        "timestamp": "{{timestamp}}"
      }
    messageAttributes:
      entity-id: "{{after.id}}"
      event-type: created

Message Attributes

Every SQS message automatically includes the following system attributes:

Attribute Description
drasi-query-id The ID of the query that produced the change
drasi-operation The operation type (add, update, delete)

User-defined attributes from messageAttributes are merged after system attributes. You can override system attribute keys if needed.

FIFO Queue Support

For SQS FIFO queues, enable the fifoQueue option and optionally provide a messageGroupIdTemplate:

reactions:
  - kind: aws-sqs
    id: ordered-events
    queries: [transactions]
    queueUrl: https://sqs.us-east-1.amazonaws.com/123456789012/events.fifo
    region: us-east-1
    fifoQueue: true
    messageGroupIdTemplate: "{{after.account_id}}"
    defaultTemplate:
      added:
        body: '{{json after}}'
      updated:
        body: '{{json after}}'

When messageGroupIdTemplate is omitted, the query ID is used as the message group ID.

Authentication

In production, use IAM roles attached to your compute environment (EC2 instance profile, ECS task role, etc.). No explicit credentials are needed in the configuration:

reactions:
  - kind: aws-sqs
    id: sqs-iam
    queries: [events]
    queueUrl: https://sqs.us-east-1.amazonaws.com/123456789012/events
    region: us-east-1
    defaultTemplate:
      added:
        body: '{{json after}}'

The AWS SDK credential chain will resolve credentials automatically.

Explicit Credentials (Development Only)

For local development or testing, provide explicit credentials:

reactions:
  - kind: aws-sqs
    id: sqs-local
    queries: [events]
    queueUrl: http://localhost:4566/000000000000/test-queue
    region: us-east-1
    endpointUrl: http://localhost:4566
    accessKeyId: ${AWS_ACCESS_KEY_ID:-test}
    secretAccessKey: ${AWS_SECRET_ACCESS_KEY:-test}
    defaultTemplate:
      added:
        body: '{{json after}}'

Examples

Simple Event Queue

reactions:
  - kind: aws-sqs
    id: event-queue
    queries: [user-signups]
    queueUrl: https://sqs.us-east-1.amazonaws.com/123456789012/signups
    region: us-east-1
    defaultTemplate:
      added:
        body: '{"userId":"{{after.id}}","email":"{{after.email}}","signedUpAt":"{{timestamp}}"}'
        messageAttributes:
          event-type: user-signup

Multi-Query with Custom Routing

reactions:
  - kind: aws-sqs
    id: order-pipeline
    queries: [new-orders, cancelled-orders, shipped-orders]
    queueUrl: https://sqs.us-east-1.amazonaws.com/123456789012/order-events
    region: ${AWS_REGION:-us-east-1}
    routes:
      new-orders:
        added:
          body: |
            {
              "event": "order_created",
              "orderId": "{{after.id}}",
              "total": {{after.total}},
              "customer": "{{after.customer_id}}"
            }
          messageAttributes:
            event-type: order_created
            priority: "{{after.priority}}"
      cancelled-orders:
        added:
          body: '{"event":"order_cancelled","orderId":"{{after.id}}","reason":"{{after.reason}}"}'
          messageAttributes:
            event-type: order_cancelled
      shipped-orders:
        added:
          body: '{"event":"order_shipped","orderId":"{{after.id}}","trackingNumber":"{{after.tracking}}"}'
          messageAttributes:
            event-type: order_shipped

FIFO Queue with Account Grouping

reactions:
  - kind: aws-sqs
    id: account-events
    queries: [balance-changes]
    queueUrl: https://sqs.us-east-1.amazonaws.com/123456789012/account-events.fifo
    region: us-east-1
    fifoQueue: true
    messageGroupIdTemplate: "{{after.account_id}}"
    routes:
      balance-changes:
        updated:
          body: |
            {
              "account": "{{after.account_id}}",
              "previousBalance": {{before.balance}},
              "newBalance": {{after.balance}},
              "change": {{after.change_amount}}
            }
          messageAttributes:
            account-id: "{{after.account_id}}"

Testing

Local Testing with LocalStack

Use LocalStack to test SQS locally:

# Start LocalStack
docker run -d --name localstack -p 4566:4566 localstack/localstack

# Create a test queue
aws --endpoint-url=http://localhost:4566 sqs create-queue --queue-name test-queue

Configure the reaction to use the LocalStack endpoint:

reactions:
  - kind: aws-sqs
    id: local-test
    queries: [my-query]
    queueUrl: http://localhost:4566/000000000000/test-queue
    region: us-east-1
    endpointUrl: http://localhost:4566
    accessKeyId: test
    secretAccessKey: test
    defaultTemplate:
      added:
        body: '{{json after}}'
      updated:
        body: '{{json after}}'
      deleted:
        body: '{{json before}}'

Verify messages are received:

aws --endpoint-url=http://localhost:4566 sqs receive-message \
  --queue-url http://localhost:4566/000000000000/test-queue

Local Testing with ElasticMQ

ElasticMQ is a lightweight SQS-compatible message queue:

docker run -d --name elasticmq -p 9324:9324 softwaremill/elasticmq
reactions:
  - kind: aws-sqs
    id: elasticmq-test
    queries: [my-query]
    queueUrl: http://localhost:9324/queue/test-queue
    region: us-east-1
    endpointUrl: http://localhost:9324
    accessKeyId: x
    secretAccessKey: x
    defaultTemplate:
      added:
        body: '{{json after}}'

Limitations

  • Messages are sent one at a time (no SendMessageBatch optimization)
  • SQS message size limit applies (256 KB)
  • Aggregation diffs are sent as update messages
  • Noop diffs are silently ignored

Complete Example

host: 0.0.0.0
port: 8080
logLevel: info

sources:
  - kind: postgres
    id: products-db
    host: ${DB_HOST}
    database: inventory
    user: ${DB_USER}
    password: ${DB_PASSWORD}
    tables:
      - public.products

queries:
  - id: low-stock
    query: |
      MATCH (p:products)
      WHERE p.quantity < 10
      RETURN p.id, p.name, p.sku, p.quantity
    sources:
      - sourceId: products-db

reactions:
  - kind: aws-sqs
    id: stock-alerts
    queries: [low-stock]
    queueUrl: ${SQS_QUEUE_URL}
    region: ${AWS_REGION:-us-east-1}
    fifoQueue: false
    routes:
      low-stock:
        added:
          body: |
            {
              "alert": "low_stock",
              "product": "{{after.name}}",
              "sku": "{{after.sku}}",
              "currentStock": {{after.quantity}}
            }
          messageAttributes:
            alert-type: low_stock
            sku: "{{after.sku}}"
        updated:
          body: |
            {
              "alert": "stock_update",
              "product": "{{after.name}}",
              "sku": "{{after.sku}}",
              "previousStock": {{before.quantity}},
              "currentStock": {{after.quantity}}
            }
          messageAttributes:
            alert-type: stock_update
            sku: "{{after.sku}}"
        deleted:
          body: |
            {
              "alert": "product_removed",
              "product": "{{before.name}}",
              "sku": "{{before.sku}}"
            }
          messageAttributes:
            alert-type: product_removed

Documentation resources

Next steps