Configure SSE Reaction

Stream query results via Server-Sent Events for real-time dashboards

The SSE (Server-Sent Events) Reaction Reaction A component that receives query result changes and takes action on them. Learn more creates an HTTP endpoint that streams query result changes Result Change Event A notification from a Continuous Query describing changes to its result set. Learn more to connected clients. It’s ideal for web browsers and applications that need real-time updates.

Basic Configuration

reactions:
  - kind: sse
    id: live-stream
    queries: [my-query]
    host: 0.0.0.0
    port: 8081
    ssePath: /events

Configuration Reference

Field Type Default Description
kind string Required Must be sse
id string Required Unique reaction identifier
queries array Required Query IDs to subscribe to
autoStart boolean true Start reaction automatically
host string 0.0.0.0 Listen address
port integer 8080 Listen port
ssePath string /events SSE endpoint path
heartbeatIntervalMs integer 30000 Heartbeat interval in milliseconds
routes object {} Per-query path and template configurations
defaultTemplate object None Default template for all queries

Connecting to the SSE Stream

JavaScript/Browser

const eventSource = new EventSource('http://localhost:8081/events');

eventSource.onmessage = (event) => {
  const data = JSON.parse(event.data);
  console.log('Change:', data);
};

eventSource.onerror = (error) => {
  console.error('SSE Error:', error);
};

curl

curl -N http://localhost:8081/events

React Example

import { useState, useEffect } from 'react';

function LiveDashboard() {
  const [data, setData] = useState([]);

  useEffect(() => {
    const eventSource = new EventSource('http://localhost:8081/events');

    eventSource.onmessage = (event) => {
      const msg = JSON.parse(event.data);

      // Default SSE payloads are query batches: { queryId, results: [ { type, ... } ], timestamp }
      if (!Array.isArray(msg.results)) return;

      for (const diff of msg.results) {
        if (diff.type === 'ADD') {
          setData(prev => [...prev, diff.data]);
        } else if (diff.type === 'UPDATE') {
          setData(prev => prev.map(item =>
            item.id === diff.after.id ? diff.after : item
          ));
        } else if (diff.type === 'DELETE') {
          setData(prev => prev.filter(item => item.id !== diff.data.id));
        }
      }
    };

    return () => eventSource.close();
  }, []);

  return (
    <ul>
      {data.map(item => (
        <li key={item.id}>{item.name}: {item.value}</li>
      ))}
    </ul>
  );
}

Event Format

Events are sent in Server-Sent Events format:

data: {"queryId":"my-query","result":{"type":"ADD","data":{"id":"1","name":"Test"}},"timestamp":1706742123456}

data: {"queryId":"my-query","result":{"type":"UPDATE","data":{"id":"1","name":"Updated"},"before":{"id":"1","name":"Test"},"after":{"id":"1","name":"Updated"}},"timestamp":1706742123457}

data: {"queryId":"my-query","result":{"type":"DELETE","data":{"id":"1","name":"Updated"}},"timestamp":1706742123458}

Custom Templates

Default Template

Apply to all queries:

reactions:
  - kind: sse
    id: custom-stream
    queries: [sensors]
    port: 8081
    defaultTemplate:
      added:
        template: '{"event":"new","sensor":{{json after}}}'
      updated:
        template: '{"event":"update","sensor":{{json after}}}'
      deleted:
        template: '{"event":"remove","id":"{{before.id}}"}'

Per-Query Paths

Create separate endpoints for different queries:

reactions:
  - kind: sse
    id: multi-stream
    queries: [orders, inventory]
    port: 8081
    ssePath: /events
    routes:
      orders:
        added:
          path: /orders/stream
          template: '{"type":"order","data":{{json after}}}'
      inventory:
        updated:
          path: /inventory/stream
          template: '{"type":"stock","sku":"{{after.sku}}","qty":{{after.quantity}}}'

Access at:

  • http://localhost:8081/orders/stream
  • http://localhost:8081/inventory/stream
  • http://localhost:8081/events (all queries)

Heartbeat Configuration

Keep connections alive with heartbeats:

reactions:
  - kind: sse
    id: long-lived-stream
    queries: [events]
    port: 8081
    heartbeatIntervalMs: 15000  # 15 seconds

Heartbeats appear as comments in the SSE stream:

: heartbeat

data: {"queryId":"my-query","results":[...],"timestamp":1706742123456}

: heartbeat

Multiple SSE Endpoints

Run multiple SSE reactions on different ports:

reactions:
  - kind: sse
    id: public-stream
    queries: [public-events]
    port: 8081

  - kind: sse
    id: admin-stream
    queries: [all-events, audit-events]
    port: 8082

Docker Configuration

Map SSE ports when running in Docker:

# docker-compose.yml
services:
  drasi-server:
    image: ghcr.io/drasi-project/drasi-server:latest
    ports:
      - "8080:8080"   # REST API
      - "8081:8081"   # SSE endpoint

CORS Configuration

For browser clients from different origins, you may need to configure CORS. The SSE reaction includes appropriate CORS headers by default.

Complete Example

Server Configuration

host: 0.0.0.0
port: 8080
logLevel: info

sources:
  - kind: postgres
    id: orders-db
    host: ${DB_HOST}
    database: ecommerce
    user: ${DB_USER}
    password: ${DB_PASSWORD}
    tables:
      - public.orders
      - public.order_items

queries:
  - id: live-orders
    query: |
      MATCH (o:orders)
      WHERE o.status IN ['pending', 'processing']
      RETURN o.id, o.customer_name, o.total, o.status, o.created_at
    sources:
      - sourceId: orders-db

  - id: order-alerts
    query: |
      MATCH (o:orders)
      WHERE o.total > 1000
      RETURN o.id, o.total, o.customer_name
    sources:
      - sourceId: orders-db

reactions:
  - kind: sse
    id: dashboard-stream
    queries: [live-orders, order-alerts]
    host: 0.0.0.0
    port: 8081
    heartbeatIntervalMs: 30000
    routes:
      live-orders:
        added:
          path: /orders
        updated:
          path: /orders
        deleted:
          path: /orders
      order-alerts:
        added:
          path: /alerts
          template: '{"alert":"high_value_order","order":{{json after}}}'

HTML Dashboard

<!DOCTYPE html>
<html>
<head>
  <title>Order Dashboard</title>
</head>
<body>
  <h1>Live Orders</h1>
  <table id="orders">
    <thead>
      <tr>
        <th>ID</th>
        <th>Customer</th>
        <th>Total</th>
        <th>Status</th>
      </tr>
    </thead>
    <tbody></tbody>
  </table>

  <h2>Alerts</h2>
  <ul id="alerts"></ul>

  <script>
    const orders = {};
    const tbody = document.querySelector('#orders tbody');
    const alertList = document.getElementById('alerts');

    function updateTable() {
      tbody.innerHTML = Object.values(orders)
        .map(o => `
          <tr>
            <td>${o.id}</td>
            <td>${o.customer_name}</td>
            <td>$${o.total}</td>
            <td>${o.status}</td>
          </tr>
        `).join('');
    }

    // Orders stream
    const orderStream = new EventSource('http://localhost:8081/orders');
    orderStream.onmessage = (event) => {
      const msg = JSON.parse(event.data);
      if (!Array.isArray(msg.results)) return;

      for (const diff of msg.results) {
        if (diff.type === 'ADD') {
          orders[diff.data.id] = diff.data;
        } else if (diff.type === 'UPDATE') {
          orders[diff.after.id] = diff.after;
        } else if (diff.type === 'DELETE') {
          delete orders[diff.data.id];
        }
      }

      updateTable();
    };

    // Alerts stream
    const alertStream = new EventSource('http://localhost:8081/alerts');
    alertStream.onmessage = (event) => {
      const alert = JSON.parse(event.data);
      const li = document.createElement('li');
      li.textContent = `High value order: #${alert.order.id} - $${alert.order.total}`;
      alertList.insertBefore(li, alertList.firstChild);
    };
  </script>
</body>
</html>

Use Cases

Real-Time Dashboards

Stream data to browser-based dashboards:

reactions:
  - kind: sse
    id: metrics-dashboard
    queries: [system-metrics, user-activity]
    port: 8081

Live Notifications

Push notifications to web applications:

reactions:
  - kind: sse
    id: notifications
    queries: [user-notifications]
    port: 8081
    routes:
      user-notifications:
        path: /notifications

IoT Monitoring

Stream sensor data to monitoring interfaces:

reactions:
  - kind: sse
    id: iot-stream
    queries: [sensor-readings, device-alerts]
    port: 8081
    heartbeatIntervalMs: 10000

Troubleshooting

Connection Drops

  • Decrease heartbeatIntervalMs
  • Check for proxy timeouts
  • Verify network stability

CORS Errors

  • Ensure the client origin is allowed
  • Check browser developer tools for specific errors

No Events Received

  • Verify the query is producing results
  • Check source is running and connected
  • Review server logs for errors

High Memory Usage

  • Limit the number of concurrent connections
  • Consider using multiple SSE reactions for different queries

Documentation resources

Next steps