Implement a Source

Source proxy
- The Source proxy is responsible for bootstrapping the initial data when a continuous query is deployed.
- For any source proxy, we need to set up an HTTP server that listens for a POST request on the route ‘/acquire’. The bootstrap logic will be implemented under this section. As a result, when a POST request is received, the initial data will be obtained based on the logic here.
- For example, the SQL proxy uses knex to query and retrieve all of the rows in a SQL table and use the data as the bootstrapping data.
- It is also worth noting that the passthru proxy might be useful in certain situations. The passthru proxy handles the bootstrapping process by using dapr to invoke a HTTP Post method called “acquire”, which is defined in the Source reactivator.
Sample code block:
require("dotenv").config();
const fs = require('fs');
const dapr = require("@dapr/dapr");
const express = require('express');
const bodyParser = require('body-parser');
const cors = require('cors');
const port = parseInt(process.env["PORT"] ?? "4002");
const sourceId = process.env["SOURCE_ID"];
const daprClient = new dapr.DaprClient();
async function main() {
const app = express();
app.use(cors());
app.use(bodyParser.urlencoded( { extended: false }));
app.use(bodyParser.json());
app.post('/acquire', async (req, res, next) => {
try {
// Implement the bootstrap logic here
res.status(200).json(result); // Send the retrieved result as the response body
} catch (err) {
next(err);
}
});
app.listen(port, () => console.log(`sourceProxy.main - Reactive Graph Source Node Proxy listening on port:${port}`));
}
Source reactivator
- The Source Reactivator is responsible for tracking and propagating any changes in the data to the subsequent Source component, the change service. You will implement the logic for tracking the changes in your source System in this component.
- The Source reactivator and the Source change service uses the pub/sub feature from Dapr to transmit the events. The reactivator acts as the publisher while the change service acts as the subscriber. The pubsub name will be available on the ‘PUBSUB’ environment variable (default is ‘rg-pubsub’) and the topic name will be ‘
-change’.
Formatting a change event:
The code blocks below showcase how the change event should be formatted depending on the operation. For any change event, three fields are required in the json block: op
, payload
and ts_ms
.
Insert operation:
{
"op": "i",
"payload": {
"after": {
"id": <id>,
"labels": [<labels>],
"properties": {} // Any additional data of the change event
},
"before": {}, // empty
"source": {
"db": "",
"lsn": "",
"table": "", // Oneof `node` or `relation`. Use `relation` if we are working with a relational Source event
"ts_ms": "",
"ts_sec": ""
}
},
"ts_ms": <Current time in milliseconds>
}
Delete Operation:
{
"op": "d",
"payload": {
"after": {}, // Empty
"before": {
"id": <id>,
"labels": [<labels>],
"properties": {}
},
"source": {
"db": "",
"lsn": "",
"table": "", // Oneof `node` or `relation`
"ts_ms": "",
"ts_sec": ""
}
},
"ts_ms": <Current time in milliseconds>
}
Update Operation:
{
"op": "u",
"payload": {
"after": {
"id": <id>,
"labels": [<labels>],
"properties": {} // different across sources
},
"before": {
"id": <id>,
"labels": [<labels>],
"properties": {
}
},
"source": {
"db": "",
"lsn": "",
"table": "", // Oneof `node` or `relation`
"ts_ms": "",
"ts_sec": ""
}
},
"ts_ms": <Current time in milliseconds>
}
pub/sub sample code
Sample Dapr publisher code:
import com.fasterxml.jackson.databind.JsonNode;
import io.dapr.client.DaprClient;
import io.dapr.client.DaprClientBuilder;
import java.util.List;
public class DaprChangePublisher implements ChangePublisher {
private DaprClient client;
private String pubsubName;
private String sourceId;
public DaprChangePublisher(String sourceId, String pubsubName) {
this.sourceId = sourceId;
this.pubsubName = pubsubName;
client = new DaprClientBuilder().build();
}
@Override
public void Publish(List<JsonNode> changes) {
client.publishEvent(pubsubName, sourceId + "-change", changes).block();
}
@Override
public void close() throws Exception {
client.close();
}
}
Registering a new Source
To add support for a new kind of source to Drasi, you must develop the services that will connect to the source and register a new Source Provider with Drasi. The Source Provider definition describes the services Drasi must run, where to get the images, and the configuration settings that are required when an instance of that Source is created
The definition for a SourceProvider has the following basic structure:
apiVersion: v1
kind: SourceProvider
name: <name>
tag: <tag> # Optional.
spec:
services:
proxy: # One of the required services.
image: <image_name> # Required. Cannot be overwritten.
dapr: # Optional; used for specifying dapr-related annotations
app-port: <value> # Optional
app-protocol: <value> # Optional
endpoints: # Optional; used for configuring internal/external endpoints
<endpoint_name>:
setting: internal/external
target: <target> # endpoint target
(any additional endpoints)...
config_schema: # Optional; used for specifying any additional environment variables
type: object
properties:
<name>:
type: <type> # One of [string, integer, boolean, array or object]
default: <value> # Optional.
(any additioanl properties)...
required: # Optional. List any required properties here
reactivator: # One of the required services.
image: <image_name> # Required. Cannot be overwritten.
dapr: # Optional; used for specifying dapr-related annotations
app-port: <value> # Optional
app-protocol: <value> # Optional
endpoints: # Optional; used for configuring internal/external endpoints
<endpoint_name>:
setting: internal/external
target: <target> # endpoint target
(any additional endpoints)...
config_schema: # Optional; used for specifying any additional environment variables
type: object
properties:
<name>:
type: <type> # One of [string, integer, boolean, array or object]
default: <value> # Optional.
(any additioanl properties)...
required: # Optional. List any required properties here
(any additional services)...
config_schema: # Optional;
# The environment variables defined here will be
# accessible by all services
type: object
properties:
<name>:
type: <type> # One of [string, integer, boolean, array or object]
default: <value> # Optional.
(any additioanl properties)...
required: # Optional. List any required properties here
In the SourceProvider definition:
- apiVersion: Must be v1
- kind: Must be SourceProvider
- name: Specifies the kind of Source that we are trying to register
- tag: Optional. This is used for specifying the “version” of the SourceProvider
The section below provides a more detailed walkthrough of the various fields under the spec
section.
Services
The services
field configures the definition of the service(s) of a Source. For any SourceProvider, you must define two required services: proxy
and reactivator
, and you can choose to define additional services if needed. Every service will be rendered into an unique Kubernetes deployment and ultimately a Kubernetes pod. For each service
, there are four fields that you can configure:
image
image
is a required field and you can specify the image to use for this source service here.- (NOTE: Drasi assumes that the image lives in the same registry that you used when you executed
drasi init
).
- (NOTE: Drasi assumes that the image lives in the same registry that you used when you executed
endpoints
- If your source has a port that needs to be exposed, you can specify them under the
endpoints
section. Theendpoints
section takes in a series ofendpoint
, which is a JSON object. Eachendpoint
object should have two properties:setting
andtarget
.setting
can be either “internal” or “external”, although we currently only support internal endpoints. For thetarget
attribute, if the setting is set tointernal
, thetarget
should be a port number. - Each endpoint will be rendered into a Kubernetes Service, with the value of
target
being set as the port number. - The following block defines a Source that will create a Kubernetes service called
<source-name>-gateway
with a port of4318
when deployed.-
endpoints: gateway: setting: internal target: "4318"
-
- If your source has a port that needs to be exposed, you can specify them under the
dapr
: optional. This field is used for specifying any dapr annotation that the user wishes to include. Currently we only supportapp-port
andapp-protocol
.- The
app-port
annotation is used to tell Dapr which port the application is listening on, whereas theapp-protocol
annotation configures the protocol that Dapr uses to communicate with your app- Sample yaml block:
-
dapr: app-port: 4002
- The
config_schema
- This is used for defining environment variables; however, the environment variables that are defined here are only accessible for this particular service.
- The configurations are defined by following JSON Schema. We define this field to be of type
object
, and list all of the configs (environment variables) under theproperties
section. For each of the property, you need to specify its type and an optional default value. For any required environment variables, you can list them under therequire
section as an array of elements - Sample:
config_schema: type: object properties: foo: type: string default: bar property2: type: boolean default: true required: - foo
Config Schema
The config_schema
section that is at the same level as the services
section is used for defining any environment variables that will be shared and accessible by all services. Similarly, this field can be defined in a similar way as how you would define the config_schema
field for each service.
For example, the following section will specify two environment variables foo
and isTrue
for this source. foo
is a required environment variable and it expects the input to be of type string
, whereas isTrue
expects the input to be of type boolean
and is not a required value (default value is set to true
)
spec:
services:
...
config_schema:
type: object
properties:
foo:
type: string
isTrue:
type: boolean
default: true
required:
- foo
Validating the SourceProvider file
To validate a SourceProvider yaml file, there are two approaches:
- Using
apply
command from the Drasi CLI. The CLI will automatically validate the SourceProvider before registering it. - Using the Drasi VSCode Extension. The extension will detect all of the SourceProvider yaml files in the current workspace. Click on the
Validate
button next to each instance to validate a specific SourceProvider definition.
Sample SourceProvider and Source file
This section contains a sample SourceProvider file and a Source file for the Postgres
source.
The Postgres
source:
- Contains two services:
proxy
andreactivator
proxy
uses the imagesource-sql-proxy
, and its dapr app-port should be set to4002
.reactivator
uses the imagesource-debezium-reactivator
. It has an additional environment variable with the name ofclient
and a default value ofpg
.
- Has five required environment variables: database, host, port, password, user SourceProvider file:
apiVersion: v1
kind: SourceProvider
name: PostgreSQL
spec:
services:
proxy:
image: source-sql-proxy
dapr:
app-port: "4002"
reactivator:
image: source-debezium-reactivator
config_schema:
type: object
properties:
client:
type: string
default: pg
endpoints:
gateway:
setting: internal
target: "8080"
config_schema:
type: object
properties:
database:
type: string
host:
type: string
password:
type: string
port:
type: number
ssl:
type: boolean
default: false
user:
type: string
tables:
type: array
required:
- database
- host
- port
- password
- user
- tables
To register a SourceProvider file, use the apply
command from the Drasi CLI
:
drasi apply -f <name-of-source-provider-file>.yaml
You can list all of the registered types of Source using the following command:
drasi list sourceprovider
To deploy a PostgreSQL
source, we simply need to create a Source file that supplies all of the required values. In this case, we need to supply a value for all of the environment variables that are marked as required. Below is a sample Source file for deploying a PostgreSQL
source (Notice that since we are not overwriting any service configurations, we can simply omit the services
field in this file):
apiVersion: v1
kind: Source
name: hello-world
spec:
kind: PostgreSQL
properties:
host: postgres
user: test
port: 5432
ssl: false
password: test
database: hello-world
tables:
- public.Message
Similarly, this source file can also be registered using the CLI:
drasi apply -f <name-of-the-source-file>.yaml