Configure Platform Reaction

Publish query results to Redis Streams

The Platform Reaction Reaction A component that receives query result changes and takes action on them. Learn more publishes query result changes Result Change Event A notification from a Continuous Query describing changes to its result set. Learn more to Redis Streams in CloudEvent format. It enables integration with Drasi for Kubernetes Drasi for Kubernetes A scalable Data Change Processing platform running on Kubernetes clusters. Learn more or any system that consumes from Redis Streams.

Basic Configuration

reactions:
  - kind: platform
    id: redis-publisher
    queries: [my-query]
    redisUrl: redis://localhost:6379

Configuration Reference

Field Type Default Description
kind string Required Must be platform
id string Required Unique reaction identifier
queries array Required Query IDs to subscribe to
autoStart boolean true Start reaction automatically
redisUrl string Required Redis connection URL
pubsubName string Optional Pub/sub name used in CloudEvent metadata
sourceName string Optional Source name used in CloudEvent metadata
maxStreamLength integer None Maximum stream length (enables trimming)
emitControlEvents boolean false Emit control events
batchEnabled boolean false Enable batching
batchMaxSize integer 100 Maximum batch size
batchMaxWaitMs integer 100 Maximum wait time for batch

Redis Connection URL

# Local Redis
redisUrl: redis://localhost:6379

# With password
redisUrl: redis://:password@localhost:6379

# With username and password
redisUrl: redis://user:password@localhost:6379

# With database selection
redisUrl: redis://localhost:6379/1

# TLS connection
redisUrl: rediss://localhost:6379

Event Format

Each Redis Streams entry contains a single field named data with a JSON-encoded Dapr CloudEvent envelope.

{
  "data": {
    "kind": "change",
    "queryId": "my-query",
    "sequence": 42,
    "sourceTimeMs": 1705318245123,
    "addedResults": [{"id":"123","name":"Test"}],
    "updatedResults": [{"before":{"id":"123","name":"Old"},"after":{"id":"123","name":"New"}}],
    "deletedResults": []
  },
  "datacontenttype": "application/json",
  "id": "550e8400-e29b-41d4-a716-446655440000",
  "pubsubname": "drasi-pubsub",
  "source": "drasi-core",
  "specversion": "1.0",
  "time": "2025-01-15T10:30:45.123Z",
  "topic": "my-query-results",
  "type": "com.dapr.event.sent"
}

If pubsubName or sourceName are set, they appear as pubsubname and source in the CloudEvent envelope.

When emitControlEvents is enabled, control events use the same envelope with data.kind: "control" and a controlSignal.

Batching Configuration

For high-throughput scenarios, enable batching:

reactions:
  - kind: platform
    id: batched-publisher
    queries: [high-volume-events]
    redisUrl: redis://localhost:6379
    batchEnabled: true
    batchMaxSize: 500
    batchMaxWaitMs: 200
Setting Low Latency High Throughput
batchEnabled false true
batchMaxSize 1-10 100-1000
batchMaxWaitMs 10-50 100-500

Stream Length Management

Prevent unbounded stream growth:

reactions:
  - kind: platform
    id: bounded-stream
    queries: [events]
    redisUrl: redis://localhost:6379
    maxStreamLength: 100000

When the stream exceeds maxStreamLength, older entries are automatically trimmed.

Control Events

Emit control events for stream lifecycle:

reactions:
  - kind: platform
    id: with-control
    queries: [events]
    redisUrl: redis://localhost:6379
    emitControlEvents: true

Control events include:

  • Stream initialization
  • Query status changes
  • Error notifications

Stream Key Naming

By default, streams use the CloudEvent topic format {queryId}-results. This is independent of pubsubName (which only affects CloudEvent metadata).

reactions:
  - kind: platform
    id: custom-metadata
    queries: [orders]
    redisUrl: redis://localhost:6379
    pubsubName: order-events  # metadata only
    sourceName: ecommerce-system

Events published to stream key: orders-results

Use Cases

Drasi Platform Integration

Connect Drasi Server to the Drasi Platform:

reactions:
  - kind: platform
    id: platform-connector
    queries: [all-events]
    redisUrl: ${PLATFORM_REDIS_URL}

Event Bus

Use Redis Streams as an event bus:

reactions:
  - kind: platform
    id: event-bus
    queries: [orders, inventory, customers]
    redisUrl: redis://redis:6379
    batchEnabled: true
    batchMaxSize: 100

Cross-Service Communication

Publish events for other services to consume:

reactions:
  - kind: platform
    id: service-events
    queries: [order-created, order-updated]
    redisUrl: redis://redis:6379
    pubsubName: order-service-events
    maxStreamLength: 50000

Data Replication

Replicate data changes to other systems:

reactions:
  - kind: platform
    id: replication
    queries: [all-changes]
    redisUrl: redis://replica-redis:6379
    batchEnabled: true
    batchMaxSize: 500
    batchMaxWaitMs: 100

Consuming Events

Using redis-cli

# Read from stream
redis-cli XREAD STREAMS my-query-results 0

# Read new events (blocking)
redis-cli XREAD BLOCK 0 STREAMS my-query-results $

# Consumer group
redis-cli XREADGROUP GROUP my-group consumer-1 STREAMS my-query-results >

Python Consumer

import redis
import json

r = redis.Redis(host='localhost', port=6379)

# Create consumer group
try:
    r.xgroup_create('my-query-results', 'my-group', id='0', mkstream=True)
except redis.exceptions.ResponseError:
    pass  # Group already exists

while True:
    events = r.xreadgroup('my-group', 'consumer-1', {'my-query-results': '>'}, block=5000)
    for stream, messages in events:
        for message_id, data in messages:
            event = json.loads(data[b'data'])
            print(f"Received: {event}")
            r.xack('my-query-results', 'my-group', message_id)

Node.js Consumer

const Redis = require('ioredis');
const redis = new Redis();

async function consume() {
  // Create consumer group
  try {
    await redis.xgroup('CREATE', 'my-query-results', 'my-group', '0', 'MKSTREAM');
  } catch (e) {
    // Group exists
  }

  while (true) {
    const result = await redis.xreadgroup(
      'GROUP', 'my-group', 'consumer-1',
      'BLOCK', 5000,
      'STREAMS', 'my-query-results', '>'
    );

    if (result) {
      for (const [stream, messages] of result) {
        for (const [id, fields] of messages) {
          const event = JSON.parse(fields[1]);
          console.log('Received:', event);
          await redis.xack('my-query-results', 'my-group', id);
        }
      }
    }
  }
}

consume();

Docker Compose Setup

version: '3.8'

services:
  drasi-server:
    image: ghcr.io/drasi-project/drasi-server:latest
    ports:
      - "8080:8080"
    environment:
      - REDIS_URL=redis://redis:6379
    volumes:
      - ./config:/config:ro
    depends_on:
      - redis

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data
    command: redis-server --appendonly yes

volumes:
  redis-data:

Complete Example

host: 0.0.0.0
port: 8080
logLevel: info

sources:
  - kind: postgres
    id: orders-db
    host: ${DB_HOST}
    database: ecommerce
    user: ${DB_USER}
    password: ${DB_PASSWORD}
    tables:
      - public.orders

queries:
  - id: order-events
    query: "MATCH (o:orders) RETURN o.id, o.status, o.total"
    sources:
      - sourceId: orders-db

reactions:
  - kind: platform
    id: order-stream
    queries: [order-events]
    redisUrl: ${REDIS_URL:-redis://localhost:6379}
    pubsubName: ecommerce-orders
    sourceName: order-service
    maxStreamLength: 100000
    batchEnabled: true
    batchMaxSize: 100
    batchMaxWaitMs: 100

Monitoring

Stream Info

redis-cli XINFO STREAM my-query-results

Stream Length

redis-cli XLEN my-query-results

Consumer Groups

redis-cli XINFO GROUPS my-query-results

Pending Messages

redis-cli XPENDING my-query-results my-group

Troubleshooting

Connection Errors

  • Verify Redis is running and accessible
  • Check connection URL format
  • Verify network connectivity

Memory Issues

  • Set maxStreamLength to limit growth
  • Monitor Redis memory usage
  • Configure Redis maxmemory policies

Message Backlog

  • Scale consumers
  • Enable batching
  • Increase consumer throughput

Documentation resources

Next steps