Configure gRPC Reaction

Stream query results via gRPC

The gRPC Reaction Reaction A component that receives query result changes and takes action on them. Learn more streams query result changes Result Change Event A notification from a Continuous Query describing changes to its result set. Learn more to gRPC clients. It’s ideal for high-performance, low-latency applications that need real-time updates.

Basic Configuration

reactions:
  - kind: grpc
    id: grpc-stream
    queries: [my-query]
    endpoint: grpc://localhost:50052

Configuration Reference

Field Type Default Description
kind string Required Must be grpc
id string Required Unique reaction identifier
queries array Required Query IDs to subscribe to
autoStart boolean true Start reaction automatically
endpoint string grpc://localhost:50052 gRPC server endpoint
timeoutMs integer 5000 Request timeout in milliseconds
batchSize integer 100 Items per batch
batchFlushTimeoutMs integer 1000 Flush timeout for partial batches
maxRetries integer 3 Maximum retry attempts
connectionRetryAttempts integer 5 Connection retry attempts
initialConnectionTimeoutMs integer 10000 Initial connection timeout
metadata object {} gRPC metadata headers

Batching Configuration

Batch events for improved throughput:

reactions:
  - kind: grpc
    id: batched-stream
    queries: [high-volume-query]
    endpoint: grpc://processor:50052
    batchSize: 500
    batchFlushTimeoutMs: 2000
Setting Low Latency High Throughput
batchSize 1-10 100-1000
batchFlushTimeoutMs 100-500 1000-5000

Retry Configuration

Handle connection failures gracefully:

reactions:
  - kind: grpc
    id: reliable-stream
    queries: [critical-events]
    endpoint: grpc://processor:50052
    maxRetries: 5
    connectionRetryAttempts: 10
    initialConnectionTimeoutMs: 30000

Metadata Headers

Add custom metadata to gRPC calls:

reactions:
  - kind: grpc
    id: authenticated-stream
    queries: [events]
    endpoint: grpc://processor:50052
    metadata:
      authorization: Bearer ${API_TOKEN}
      x-client-id: drasi-server
      x-environment: production

Protocol Definition

The gRPC reaction sends results to a service implementing drasi.v1.ReactionService (see reaction.proto in the plugin repo):

syntax = "proto3";

package drasi.v1;

service ReactionService {
  rpc ProcessResults(ProcessResultsRequest) returns (ProcessResultsResponse);
  rpc StreamResults(stream QueryResult) returns (stream StreamResultsResponse);
  rpc Subscribe(SubscribeRequest) returns (stream QueryResult);
  rpc HealthCheck(google.protobuf.Empty) returns (ReactionHealthCheckResponse);
}

Use Cases

Real-Time Processing

Stream events to a processing service:

reactions:
  - kind: grpc
    id: event-processor
    queries: [transactions, alerts]
    endpoint: grpc://processor.service:50052
    batchSize: 100
    timeoutMs: 5000

Microservice Integration

Connect to downstream microservices:

reactions:
  - kind: grpc
    id: order-service
    queries: [new-orders]
    endpoint: grpc://order-service:50052
    metadata:
      x-service-name: drasi-server

  - kind: grpc
    id: inventory-service
    queries: [stock-changes]
    endpoint: grpc://inventory-service:50052

Cross-Datacenter Streaming

Stream to remote datacenters:

reactions:
  - kind: grpc
    id: remote-dc
    queries: [replicated-events]
    endpoint: grpc://remote-dc.example.com:50052
    timeoutMs: 30000
    connectionRetryAttempts: 10
    metadata:
      x-datacenter: us-west

Docker Compose Example

version: '3.8'

services:
  drasi-server:
    image: ghcr.io/drasi-project/drasi-server:latest
    ports:
      - "8080:8080"
    volumes:
      - ./config:/config:ro
    depends_on:
      - event-processor

  event-processor:
    image: your-grpc-processor:latest
    ports:
      - "50052:50052"

Configuration:

reactions:
  - kind: grpc
    id: processor-stream
    queries: [events]
    endpoint: grpc://event-processor:50052

Complete Example

host: 0.0.0.0
port: 8080
logLevel: info

sources:
  - kind: postgres
    id: orders-db
    host: ${DB_HOST}
    database: ecommerce
    user: ${DB_USER}
    password: ${DB_PASSWORD}
    tables:
      - public.orders

queries:
  - id: order-events
    query: "MATCH (o:orders) RETURN o.id, o.status, o.total"
    sources:
      - sourceId: orders-db

reactions:
  - kind: grpc
    id: order-processor
    queries: [order-events]
    endpoint: ${GRPC_ENDPOINT:-grpc://localhost:50052}
    timeoutMs: 10000
    batchSize: 50
    batchFlushTimeoutMs: 1000
    maxRetries: 5
    metadata:
      authorization: Bearer ${API_TOKEN}

Performance tuning

Low latency

reactions:
  - kind: grpc
    id: low-latency
    queries: [events]
    endpoint: grpc://processor:50052
    batchSize: 1
    batchFlushTimeoutMs: 50
    timeoutMs: 1000

Higher throughput

reactions:
  - kind: grpc
    id: high-throughput
    queries: [events]
    endpoint: grpc://processor:50052
    batchSize: 500
    batchFlushTimeoutMs: 2000

Troubleshooting

Connection Timeout

  • Increase initialConnectionTimeoutMs
  • Verify endpoint is reachable
  • Check firewall rules

Frequent Disconnects

  • Increase connectionRetryAttempts
  • Check network stability
  • Monitor processor service health

High Latency

  • Reduce batchSize
  • Reduce batchFlushTimeoutMs
  • Check network latency

Message Backlog

  • Increase batchSize
  • Scale the receiving service

gRPC vs HTTP

Aspect gRPC HTTP
Performance Higher throughput Lower throughput
Latency Lower Higher
Protocol HTTP/2 HTTP/1.1
Streaming Native bidirectional One-way
Integration Requires gRPC service Standard webhooks
Best for High-volume, real-time API integrations

Documentation resources

Next steps