Configure Platform Source
Consume CloudEvent-wrapped change events from Redis Streams
3 minute read
The Platform Source Source A connection to an external system that Drasi monitors for data changes. Learn more consumes CloudEvent-wrapped change events from a Redis Stream and converts them into Drasi graph changes.
When to use the Platform source
- Connect Drasi Server to a Drasi Platform / Kubernetes environment that publishes changes to Redis Streams.
- Consume change events from a shared Redis-backed event bus.
- Scale ingestion horizontally using Redis consumer groups.
Prerequisites
- Redis 5.0+ (Streams + consumer groups).
- A publisher that writes CloudEvent JSON into the stream (see Event format).
How it connects
This source connects outbound from Drasi Server to Redis; it does not open an inbound port.
Quick example (Drasi Server config)
Drasi Server source configuration uses camelCase keys.
sources:
- kind: platform
id: platform-events
autoStart: true
redisUrl: ${REDIS_URL:-redis://localhost:6379}
streamKey: drasi-events
# Optional
consumerGroup: drasi-core
consumerName: ${HOSTNAME}
batchSize: 100
blockMs: 5000
Configuration reference (Drasi Server)
| Field | Type | Default | Description |
|---|---|---|---|
kind |
string | required | Must be platform. |
id |
string | required | Unique source identifier. |
autoStart |
boolean | true |
Whether Drasi Server starts the source on startup. |
bootstrapProvider |
object | none | Optional bootstrap provider for initial state. For platform bootstrap use { kind: platform, queryApiUrl?, timeoutSeconds? }. |
redisUrl |
string | required | Redis connection URL (for example redis://host:6379). |
streamKey |
string | required | Redis stream key to consume events from. |
consumerGroup |
string | drasi-core |
Consumer group name. |
consumerName |
string | auto | Consumer name within the group (should be unique). |
batchSize |
integer | 100 |
Max events to read per Redis XREADGROUP call. |
blockMs |
integer | 5000 |
Milliseconds to block waiting for new events. |
Fields support Drasi Server config references like ${ENV_VAR} / ${ENV_VAR:-default}.
Event format
The Platform source expects a CloudEvent JSON object whose data field is an array of change events.
CloudEvent wrapper (minimal)
{
"specversion": "1.0",
"type": "drasi.change",
"source": "my-producer",
"id": "event-123",
"time": "2025-01-01T00:00:00Z",
"datacontenttype": "application/json",
"data": [
{
"op": "i",
"payload": {
"after": {
"id": "user-123",
"labels": ["User"],
"properties": {"name": "Alice"}
},
"source": {"db": "myapp", "table": "node", "ts_ns": 1704067200000000000}
}
}
]
}
Change events (data[])
Each data[] entry has:
op: operation ("i"= insert,"u"= update,"d"= delete)payload.afterfori/u,payload.beforefordpayload.source.table: element type ("node"or"rel"/"relation")payload.source.ts_ns: timestamp in nanoseconds (required)
Drasi converts ts_ns to element effectiveFrom in milliseconds: effectiveFrom = ts_ns / 1_000_000.
Relation example
{
"op": "i",
"payload": {
"after": {
"id": "follows-1",
"labels": ["FOLLOWS"],
"startId": "user-123",
"endId": "user-456",
"properties": {"since": "2024-01-01"}
},
"source": {"db": "myapp", "table": "rel", "ts_ns": 1704067200000000000}
}
}
Publishing to Redis Streams
Write the CloudEvent JSON as a string in a stream entry field named data (preferred). The source also checks event, payload, and message.
redis-cli XADD drasi-events '*' data '{"specversion":"1.0","data":[{"op":"i","payload":{"after":{"id":"user-1","labels":["User"],"properties":{"name":"Alice"}},"source":{"db":"myapp","table":"node","ts_ns":1704067200000000000}}}]}'
Performance tuning notes
- Increase
batchSizefor higher throughput. - Decrease
blockMsfor lower-latency shutdown and quicker detection of new events (at the cost of more Redis calls). - Run multiple Drasi Server instances with the same
consumerGroup(and uniqueconsumerName) to scale consumption.
Troubleshooting
No events are being processed
- Confirm you are writing a CloudEvent JSON object (must include a
dataarray). - Ensure your stream entry field is
data(orevent/payload/message). - Verify
payload.source.ts_nsis present and is an integer (nanoseconds).
Known limitations
- The source rejects messages that do not match the expected CloudEvent +
data[]shape. - Control messages are identified by
payload.source.db = "Drasi"(case-insensitive); onlypayload.source.table = "SourceSubscription"is handled.
Documentation resources
Platform Source README
Implementation notes, Redis behavior, and full schema details
drasi-source-platform on crates.io
Package info and release history
Feedback
Was this page helpful?
Glad to hear it! Please tell us what you found helpful.
Sorry to hear that. Please tell us how we can improve.