Middleware Reference
10 minute read
Middleware Middleware Custom processing logic applied to source changes before query evaluation. Learn more components transform and enrich incoming Source Change Events Source Change Event A notification from a Source describing a data change (insert, update, or delete). Learn more before they reach your Continuous Queries Continuous Query A query that runs continuously, maintaining an always-current result set as data changes. Learn more . This reference provides complete specifications for each available middleware type.
For a conceptual overview of middleware, see Middleware Concepts.
Configuration Structure
Middleware is configured within Continuous Query definitions. The specific syntax varies by product:
- drasi-lib drasi-lib A Rust crate for building change-driven solutions with embedded Drasi capabilities. Learn more : Configured programmatically via the Rust API
- Drasi Server Drasi Server A standalone Data Change Processing server running as a process or Docker container. Learn more : Configured in YAML/JSON configuration files
- Drasi for Kubernetes Drasi for Kubernetes A scalable Data Change Processing platform running on Kubernetes clusters. Learn more : Configured in the Continuous Query resource manifest
Middleware components are modular and can be combined in a pipeline to process changes sequentially. Each middleware in the pipeline receives the output of the previous one, allowing you to chain multiple transformations.
Available Middleware Components
- Unwind - Expand nested arrays into top-level graph elements
- JQ - Transform data using jq expressions
- Promote - Copy nested values to top-level properties
- ParseJson - Parse JSON strings into structured objects
- Decoder - Decode encoded string values
- Relabel - Remap element labels
Unwind
The unwind middleware component expands an array of values nested inside a Node Node A data entity in the property graph model, representing a discrete object or concept. Learn more or Relation's Relationship A connection between two nodes in the property graph, representing how entities relate. Learn more properties into separate top-level elements in the graph. This allows you to query unwound elements using standard Cypher openCypher A declarative graph query language used to write Drasi Continuous Queries. Learn more or GQL GQL An ISO-standard graph query language supported by Drasi for Continuous Queries. Learn more patterns.
Configuration Properties
| Property | Description |
|---|---|
| kind | Must be unwind |
| name | The name of this configuration, used in a source pipeline |
| {Node Label} | The unwind configuration for all nodes with the given label |
In addition to the kind and name properties, any additional properties on this configuration object will be a dictionary of unwind configurations, where the key is the label of the incoming node.
Unwind Element Configuration
| Property | Description |
|---|---|
| selector | A JSONPath expression to locate the array to unwind within the properties of the incoming node |
| label | The label of the newly created child Node (one for each array element) |
| key | (optional) A JSONPath expression to locate a unique key for the child node within the parent context. If not specified, the array index is used |
| relation | (optional) The label of the relation created between parent and child nodes |
| condition | (optional) A JSONPath expression that filters whether this mapping should be applied. If the expression returns no result or empty, the mapping is skipped |
Example: Vehicle Tires
Given a source that produces Vehicle nodes with nested tire data:
{
"id": "vehicle-1",
"tires": [
{ "position": "Front-Left", "pressure": 250 },
{ "position": "Front-Right", "pressure": 258 },
{ "position": "Rear-Left", "pressure": 243 },
{ "position": "Rear-Right", "pressure": 252 }
]
}
You can unwind the tires array into separate Tire nodes connected to the Vehicle:
Middleware configuration:
- name: extract-tires
kind: unwind
Vehicle:
- selector: $.tires[*]
label: Tire
key: $.position
relation: HAS
Query:
MATCH (v:Vehicle)-[:HAS]->(t:Tire)
RETURN v.id, t.position, t.pressure
Conditional Unwind Example
The condition property allows you to selectively apply the unwind operation based on node properties. For example, only unwind container information when the action is “update”:
Input data:
{
"action": "update",
"metadata": { "name": "pod-1", "namespace": "default" },
"status": {
"containerStatuses": [
{ "containerID": "c1", "name": "nginx" },
{ "containerID": "c2", "name": "redis" }
]
}
}
Middleware configuration:
- name: extract-containers
kind: unwind
Pod:
- selector: $.status.containerStatuses[*]
label: Container
key: $.containerID
relation: OWNS
condition: $[?(@.action == 'update')]
When condition evaluates to empty or no result, the unwind is skipped for that element.
JQ
The jq middleware component transforms incoming source changes using jq expressions. It can reshape data or project a single change into multiple changes.
External Resources
Configuration Properties
| Property | Description |
|---|---|
| kind | Must be jq |
| name | The name of this configuration, used in a source pipeline |
| {Label}.insert | Mapping configuration for elements with the given label when an insert change is received |
| {Label}.update | Mapping configuration for elements with the given label when an update change is received |
| {Label}.delete | Mapping configuration for elements with the given label when a delete change is received |
Mapping Configuration
| Property | Description |
|---|---|
| op | The operation to apply to output element(s): Insert, Update, or Delete |
| query | The JQ query to apply. Output can be a single object or an array of objects |
| label | (optional) A JQ query to extract a new label for each projected element |
| id | (optional) A JQ query to extract a new ID for each projected element |
| haltOnError | (optional) If true, errors stop query processing. If false (default), errors skip the problematic change |
Example: Append-Only Log to Latest Value
Transform an append-only log of sensor readings into updates to a single Sensor node with the latest value:
Input data (sequence of inserts):
{ "id": "log-1", "sensorId": "thermostat-1", "value": "25" }
{ "id": "log-2", "sensorId": "thermostat-1", "value": "27" }
{ "id": "log-3", "sensorId": "thermostat-1", "value": "28" }
Middleware configuration:
- kind: jq
name: extract-latest
SensorLog:
insert:
- op: Update
query: |
{
"sensorId": .sensorId,
"currentValue": .value | tonumber
}
label: "Sensor"
id: .sensorId
Instead of indexing 3 SensorLog nodes, you now have 1 Sensor node with the current value.
Example: Array Selection
Use JQ select to extract specific values from arrays:
Input data:
{
"signals": [
{ "name": "Vehicle.CurrentLocation.Heading", "value": "96" },
{ "name": "Vehicle.Speed", "value": "119" },
{ "name": "Vehicle.TraveledDistance", "value": "4563" }
],
"vehicleId": "v1"
}
Middleware configuration:
- kind: jq
name: process-telemetry
Telemetry:
insert:
- op: Insert
query: |
{
"id": .vehicleId,
"currentSpeed": .signals[] | select(.name == "Vehicle.Speed").value | tonumber
}
label: "Vehicle"
id: .id
Example: Conditional Routing
Use JQ conditionals to route different event types to different operations:
Input data (GitHub webhook):
{
"action": "opened",
"issue": { "id": 123, "title": "Bug report", "state": "open" },
"repository": { "id": 456, "name": "my-repo" }
}
Middleware configuration:
- kind: jq
name: process-github-events
WebhookEvent:
insert:
# Map issue opened events to create Issue nodes
- op: Insert
query: |
if .action == "opened" then
.issue + { "repositoryId": .repository.id }
else
empty
end
label: "Issue"
id: .id
# Map issue closed events to update Issue nodes
- op: Update
query: |
if .action == "closed" then
{ "id": .issue.id, "state": "closed" }
else
empty
end
label: "Issue"
id: .id
# Map issue deleted events to delete Issue nodes
- op: Delete
query: |
if .action == "deleted" then
{ "id": .issue.id }
else
empty
end
label: "Issue"
id: .id
Promote
The promote middleware copies values from deep-nested locations inside an element’s properties to new top-level properties. Selection uses JSONPath expressions.
Configuration Properties
| Property | Type | Description | Required | Default |
|---|---|---|---|---|
kind |
String | Must be promote | Yes | |
name |
String | The name of this configuration, used in a source pipeline | Yes | |
config |
Object | Contains the specific configuration for the promote middleware | Yes | |
config.mappings |
Array | Defines the promotion rules. Must contain at least one mapping entry | Yes | |
config.on_conflict |
"overwrite" | "skip" | "fail" |
Action when target_name already exists | No | "overwrite" |
config.on_error |
"skip" | "fail" |
Behavior when a mapping encounters an error | No | "fail" |
Mapping Object Properties
| Property | Type | Description | Required |
|---|---|---|---|
path |
String | A JSONPath expression that must select exactly one value | Yes |
target_name |
String | The name of the new top-level property | Yes |
Example
Middleware configuration:
- name: promote_user_and_order_data
kind: promote
config:
mappings:
- path: "$.user.id"
target_name: "userId"
- path: "$.user.location.city"
target_name: "city"
- path: "$.order.total"
target_name: "orderTotal"
- path: "$.metadata"
target_name: "meta"
on_conflict: skip
on_error: skip
Input:
{
"user": {
"id": "user123",
"location": { "city": "New York" }
},
"order": { "total": 100.50 },
"metadata": { "source": "api", "version": "1.1" }
}
Output:
{
"user": { "id": "user123", "location": { "city": "New York" } },
"order": { "total": 100.50 },
"metadata": { "source": "api", "version": "1.1" },
"userId": "user123",
"city": "New York",
"orderTotal": 100.50,
"meta": { "source": "api", "version": "1.1" }
}
ParseJson
The parse_json middleware parses a string property containing a JSON document into a structured object or list. Useful when a source provides JSON embedded within a string field.
Configuration Properties
| Property | Type | Description | Required | Default |
|---|---|---|---|---|
kind |
String | Must be parse_json | Yes | |
name |
String | The name of this configuration, used in a source pipeline | Yes | |
config |
Object | Contains the specific configuration | Yes | |
config.target_property |
String | The property containing the JSON string to parse | Yes | |
config.output_property |
String | Where to store the parsed result. If omitted, overwrites target_property | No | null |
config.on_error |
"skip" | "fail" |
Behavior on errors | No | "fail" |
config.max_json_size |
Integer | Maximum JSON string size in bytes | No | 1048576 (1MB) |
config.max_nesting_depth |
Integer | Maximum allowed nesting depth | No | 20 |
Example
Middleware configuration:
- name: parse_event_data
kind: parse_json
config:
target_property: "raw_event_json"
output_property: "event_details"
on_error: "skip"
Input:
{
"id": "event001",
"timestamp": "2025-06-01T12:00:00Z",
"raw_event_json": "{\"user\": \"alice\", \"action\": \"login\", \"details\": {\"ip\": \"192.168.1.100\"}}"
}
Output:
{
"id": "event001",
"timestamp": "2025-06-01T12:00:00Z",
"raw_event_json": "{\"user\": \"alice\", \"action\": \"login\", \"details\": {\"ip\": \"192.168.1.100\"}}",
"event_details": {
"user": "alice",
"action": "login",
"details": { "ip": "192.168.1.100" }
}
}
Decoder
The decoder middleware decodes a string value from various encoding formats: Base64, Hexadecimal, URL encoding, and JSON string escapes.
Configuration Properties
| Property | Type | Description | Required | Default |
|---|---|---|---|---|
kind |
String | Must be decoder | Yes | |
name |
String | The name of this configuration, used in a source pipeline | Yes | |
config |
Object | Contains the specific configuration | Yes | |
config.encoding_type |
String | The encoding format. See supported types below | Yes | |
config.target_property |
String | The property containing the encoded string | Yes | |
config.output_property |
String | Where to store the decoded result. If omitted, overwrites target_property | No | null |
config.strip_quotes |
Boolean | Remove surrounding double quotes before decoding | No | false |
config.on_error |
"skip" | "fail" |
Behavior on errors | No | "fail" |
Supported Encoding Types
| Type | Description |
|---|---|
base64 |
Standard Base64 encoding (RFC 4648) |
base64url |
URL-safe Base64 encoding (RFC 4648 section 5), without padding |
hex |
Hexadecimal encoding (e.g., 48656c6c6f) |
url |
Percent-encoding (e.g., Hello%20World) |
json_escape |
JSON string escape sequences (e.g., \", \\, \n, \uXXXX) |
Example
Middleware configuration:
- name: decode_user_data
kind: decoder
config:
encoding_type: "base64"
target_property: "raw_user_payload"
output_property: "user_data"
strip_quotes: true
on_error: "skip"
Input:
{
"message_id": "msg123",
"raw_user_payload": "\"SGVsbG8gV29ybGQh\""
}
Output:
{
"message_id": "msg123",
"raw_user_payload": "\"SGVsbG8gV29ybGQh\"",
"user_data": "Hello World!"
}
The strip_quotes: true removes the surrounding double quotes before Base64 decoding.
Relabel
The relabel middleware rewrites element labels according to a user-defined mapping. Useful for:
- Normalizing heterogeneous sources to a common vocabulary
- Consolidating legacy or versioned labels
- Preparing data for queries with a stable set of labels
Configuration Properties
| Property | Description |
|---|---|
| kind | Must be relabel |
| name | The name of this configuration, used in a source pipeline |
| labelMappings | (Required) Object mapping original labels to new labels. Must contain at least one entry |
Behavior
- Each label is looked up in
labelMappings. If present, it’s replaced; otherwise retained - Multiple source labels can map to different targets in the same element
- Applies to Node and Relation labels
- Applies to Insert, Update, and Delete changes (so deletes match expected labels)
Example: Normalize Labels
Middleware configuration:
- name: normalize-user-labels
kind: relabel
labelMappings:
Person: User
Employee: Staff
Company: Organization
Label transformations:
| Incoming Labels | After Relabel |
|---|---|
[Person] |
[User] |
[Person, Employee] |
[User, Staff] |
[Company] |
[Organization] |
[UnmappedLabel] |
[UnmappedLabel] (unchanged) |
Example: Relation Label Remapping
Middleware configuration:
- name: relabel-links
kind: relabel
labelMappings:
KNOWS: CONNECTED_TO
WORKS_FOR: EMPLOYED_BY
An incoming relation labeled KNOWS will appear as CONNECTED_TO after middleware processing.
Feedback
Was this page helpful?
Glad to hear it! Please tell us what you found helpful.
Sorry to hear that. Please tell us how we can improve.