Implement a Reaction
You can develop custom reactions by writing an application in any language that adheres to a certain specification and publish it as a docker image to the registry serving Drasi images to your cluster.
Query Configuration
The Drasi control plane will mount a folder at /etc/queries
where each file will be named after each queryId that is configured for the reaction. The contents of each file will be the options
field from the config.
Consider the following reaction configuration, this will result in 3 empty files named query1
, query2
and query3
under /etc/queries
.
apiVersion: query.reactive-graph.io/v1
kind: Reaction
metadata:
name: my-reaction
spec:
reactionImage: my-reaction
queries:
- queryId: query1
- queryId: query2
- queryId: query3
The following reaction configuration shows how to include additional metadata per query. This will result in a file named query1
with the contents of foo
and a file named query2
with the contents of bar
apiVersion: query.reactive-graph.io/v1
kind: Reaction
metadata:
name: my-reaction
spec:
reactionImage: my-reaction
queries:
- queryId: query1
options: >
foo
- queryId: query2
options: >
bar
The format of the content of the options field is completely up to the developer of that particular reaction, for example you could include yaml or json content and it is up to you to deserialize and make sense of it within the context of your custom reaction.
Receiving Changes
When the projection of a continuous query is changed, a message will be published to a Dapr topic. The pubsub name will be available on the PUBSUB
environment variable (default is rg-pubsub
). The topic name will be <queryId>-results
, so for each queryId you discover in /etc/queries
, you should subscribe to that Dapr topic.
A skeleton implementation in JavaScript would look something like this
import { DaprServer } from "@dapr/dapr";
import { readdirSync, readFileSync } from 'fs';
import path from 'path';
const pubsubName = process.env["PUBSUB"] ?? "rg-pubsub";
const configDirectory = process.env["QueryConfigPath"] ?? "/etc/queries";
const daprServer = new DaprServer();
let queryIds = readdirSync(configDir);
for (let queryId of queryIds) {
if (!queryId || queryId.startsWith("."))
continue;
await daprServer.pubsub.subscribe(pubsubName, queryId + "-results", (events) => {
//implement code that reacts to changes here
});
}
await daprServer.start();
Message Format
The format of the incoming messages is a Json array, with each item itself containing an array for addedResults
, deletedResults
and updatedResults
.
The basic structure looks like this
[
{
"addedResults": [],
"deletedResults": [],
"updatedResults": [],
"metadata": {}
}
]
An example of a row being added to the continuous query projection would look like this
[
{
"addedResults": [
{
"Id": 1,
"Name": "Foo"
}
],
"deletedResults": [],
"updatedResults": []
}
]
An example that row being updated would look like this
[
{
"addedResults": [],
"deletedResults": [],
"updatedResults": [
{
"before": {
"Id": 1,
"Name": "Foo"
},
"after": {
"Id": 1,
"Name": "Bar"
}
}
]
}
]
An example that row being deleted would look like this
[
{
"addedResults": [],
"deletedResults": [
{
"Id": 1,
"Name": "Bar"
}
],
"updatedResults": []
}
]
Registering a new reaction
To add support for a new kind of Reaction to Drasi, you must develop the services that will connect to the Reaction and register a new Reaction Provider with Drasi. The Reaction Provider definition describes the services Drasi must run, where to get the images, and the configuration settings that are required when an instance of that Reaction is created
The definition for a ReactionProvider has the following basic structure:
apiVersion: v1
kind: ReactionProvider
name: <name>
tag: <tag> # Optional.
spec:
services:
<reaction-name>:
image: <image_name> # Required. Cannot be overwritten.
dapr: # Optional; used for specifying dapr-related annotations
app-port: <value> # Optional
app-protocol: <value> # Optional
endpoints: # Optional; used for configuring internal/external endpoints
<endpoint_name>:
setting: internal/external
target: <target> # name of the config to use, which
# should be defined under the
# `config_schema` section of the service
(any additional endpoints)...
config_schema: # Optional; used for specifying any additional environment variables
type: object
properties:
<name>:
type: <type> # One of [string, integer, boolean, array or object]
default: <value> # Optional.
(any additioanl properties)...
required: # Optional. List any required properties here
config_schema: # Optional;
# The environment variables defined here will be
# accessible by all services
type: object
properties:
<name>:
type: <type> # One of [string, integer, boolean, array or object]
default: <value> # Optional.
(any additioanl properties)...
required: # Optional. List any required properties here
In the ReactionProvider definition:
- apiVersion: Must be v1
- kind: Must be ReactionProvider
- name: Specifies the kind of Reaction that we are trying to create
- tag: Optional. This is used for specifying the “version” of the ReactionProvider
The section below provides a more detailed walkthrough of the various fields under the spec
section.
Config Schema
The config_schema
section that is at the top level is used for defining any environment variables that will be shared and accessible by all services. Similarly, this field can be defined in a similar way as how you would define the config_schema
field for each service.
For example, the following section will specify two environment variables foo
and isTrue
for this Reaction. foo
is a required environment variable and it expects the input to be of type string
, whereas isTrue
expects the input to be of type boolean
and is not a required value (default value is set to true
)
spec:
services:
...
config_schema:
type: object
properties:
foo:
type: string
isTrue:
type: boolean
default: true
required:
- foo
Services
The services
field configures the definition of the service(s) of a Reaction. At the moment, a service must be defined for any ReactionProvider. For each service
, there are four fields that you can configure:
image
image
is a required field and you can specify the image to use for this Reaction service here.- (NOTE: Drasi assumes that the image lives in the same registry that you used when you executed
drasi init
).
- (NOTE: Drasi assumes that the image lives in the same registry that you used when you executed
endpoints
- If your Reaction has a port that needs to be exposed, you can specify them under the
endpoints
section. Theendpoints
section takes in a series ofendpoint
, which is a JSON object. Eachendpoint
object should have two properties:setting
andtarget
.setting
can be either “internal” or “external”, although we currently only support internal endpoints. Thetarget
field will reference the value of a config that is defined under theconfig_schema
section of the service. You can provide a default value when defining the ReactionProvider and/or overwrite in the actual Reaction definition file. - Each endpoint will be rendered into a Kubernetes Service, with the value of
target
being set as the port number. - The following block defines a Reaction that will create a Kubernetes service called
<Reaction-name>-gateway
with a default port of4318
when deployed.-
endpoints: gateway: setting: internal target: gateway-port config_schema: type: object gateway-port: type: number default: 4318
-
- If your Reaction has a port that needs to be exposed, you can specify them under the
dapr
: optional. This field is used for specifying any dapr annotation that the user wishes to include. Currently we only supportapp-port
andapp-protocol
.- The
app-port
annotation is used to tell Dapr which port the application is listening on, whereas theapp-protocol
annotation configures the protocol that Dapr uses to communicate with your app- Sample yaml block:
-
dapr: app-port: 4002
- The
config_schema
- This is used for defining environment variables; however, the environment variables that are defined here are only accessible for this particular service.
- The configurations are defined by following JSON Schema. We define this field to be of type
object
, and list all of the configs (environment variables) under theproperties
section. For each of the property, you need to specify its type and an optional default value. For any required environment variables, you can list them under therequire
section as an array of elements - Sample:
config_schema: type: object properties: foo: type: string default: bar property2: type: boolean default: true required: - foo
Validating the ReactionProvider file
To validate a ReactionProvider yaml file, there are two approaches:
- Using
apply
command from the Drasi CLI. The CLI will automatically validate the ReactionProvider before registering it. - Using the Drasi VSCode Extension. The extension will detect all of the ReactionProvider yaml files in the current workspace. Click on the
Validate
button next to each instance to validate a specific ReactionProvider definition.
Sample ReactionProvider and Reaction file
This section contains a sample Reactionprovider file and Reaction file for the Debug
reaction.
The Debug
reaction:
- Only needs one service with the name
debug
. - Uses the
reaction-debug
image - Needs to have an internal (k8s) endpoint at port “8080”
tag
should bev1
ReactionProvider file:
apiVersion: v1
kind: ReactionProvider
name: Debug
spec:
queries:
type: objects
services:
debug:
image: reaction-debug
endpoints:
gateway:
setting: internal
target: port
config_schema:
type: object
properties:
port:
type: number
default: 8080
The ReactionProvider can be applied using the Drasi CLI:
drasi apply -f <name-of-the-provider-file>.yaml
Reaction file:
apiVersion: v1
kind: Reaction
name: hello-world-debug
spec:
kind: Debug:v1
queries:
hello-world-from:
message-count:
inactive-people:
Similarly, this Reaction file can also be applied using the CLI:
drasi apply -f <name-of-the-reaction-file>.yaml