Continuous Queries

What are Continuous Queries and how to use them?

Continuous Queries are the most important component of Drasi. They are the mechanism by which you tell Drasi what changes to detect in source systems as well as the data you want distributed when changes are detected. Sources provide source changes to subscribed Continuous Queries, which then provide query result changes to subscribed Reactions.

End to End

Continuous Queries, as the name implies, are queries that run continuously. To understand what is unique about them, it is useful to contrast them with a the kind of instantaneous queries developers are accustomed to running against databases.

When you execute an instantaneous query, you are running the query against the database at a point in time. The database calculates the results to the query and returns them. While you work with those results, you are working with a static snapshot of the data and are unaware of any changes that may have happened to the data after you ran the query. If you run the same instantaneous query periodically, the query results might be different each time due to changes made to the data by other processes. But to understand what has changed, you would need to compare the most recent result with the previous result.

Instantaneous Query

Continuous Queries, once started, continue to run until they are stopped. While running, Continuous Queries maintain a perpetually accurate query result, incorporating any changes made to the source database as they occur. Not only do Continuous Queries allow you to request the query result as it was at any point in time, but as changes occur, the Continuous Query determines exactly which result elements have been added, updated, and deleted, and distributes a precise description of the changes to all Reactions that have subscribed to the Continuous Query.

Continuous Query

Continuous Queries are implemented as graph queries written in the Cypher Query Language. The use of a declarative graph query language means you can:

  • describe in a single query expression which changes you are interested in detecting and what data you want notifications of those changes to contain.
  • express rich query logic that takes into consideration both the properties of the data you are querying and the relationships between data.
  • create queries that span data across multiple Sources without complex join syntax, even when there is no natural connection between data in the Source systems, including queries that incorporate both relational and graph sources.

Example: Incident Alerts

Imagine an Incident Alerting Service which notifies managers if any of the employees in their team are at risk due to dangerous incidents happening in their location (e.g. fires, storms, protests, etc). For this example, assume the source data is a property graph of nodes (rectangles) and relations (lines) shown in the following diagram:

Incident Alerting

Using Drasi, the following Cypher query could be used to to identify each employee at risk:

MATCH
  (e:Employee)-[:ASSIGNED_TO]->(t:Team),
  (m:Employee)-[:MANAGES]->(t:Team),
  (e:Employee)-[:LOCATED_IN]->(:Building)-[:LOCATED_IN]->(r:Region),
  (i:Incident {type:'environmental'})-[:OCCURS_IN]->(r:Region) 
WHERE
  elementId(e) <> elementId(m) AND i.severity IN [‘critical’, ‘extreme’] AND i.endTimeMs IS NULL
RETURN 
  m.name AS ManagerName, m.email AS ManagerEmail, 
  e.name AS EmployeeName, e.email AS EmployeeEmail,
  r.name AS RegionName, 
  elementId(i) AS IncidentId, i.severity AS IncidentSeverity, i.description AS IncidentDescription

The MATCH and WHERE clauses of the query describe a pattern that identifies all Employees located in Buildings within Regions where there are active Incidents of type ’environmental’ that have a severity level of ‘critical’ or ‘extreme’. This means that any combination of correctly connected nodes with the required property values should be included in the query result.

The RETURN clause of the query generates output containing the name and email address of the at risk employee and their manager, as well as details about the incident and the region in which it is located. This defines the schema for results generated by the Continuous Query.

When the above Continuous Query is first run, there are no results that satisfy the query, because there are no Incidents in the data. But as soon as an extreme severity Forest Fire Incident in Southern California is added to the database, as follows:

Incident Added

The query would generate the following output showing that two records (for employees Bob and Claire) now meet the query criteria and have been added to the query result:

{
 “added”: [
  { “ManagerName”: “Allen”, “ManagerEmail”: “allen@contoso.com”, “EmployeeName”: “Bob”, “EmployeeEmail”: “bob@contoso.com”, “RegionName”: “Southern California”, “IncidentId”: “in1000”, “IncidentSeverity”: “extreme”, “IncidentDescription”: “Forest Fire” },
  { “ManagerName”: “Allen”, “ManagerEmail”: “allen@contoso.com”, “EmployeeName”: “Claire”, “EmployeeEmail”: “claire@contoso.com”, “RegionName”: “Southern California”, “IncidentId”: “in1000”, “IncidentSeverity”: “extreme”, “IncidentDescription”: “Forest Fire” }
 ],
 “updated”: [],
 “deleted”: []
}

If Bob subsequently changed location, removing him from the Southern Californian Region while the Forest Fire was still active, the Continuous Query would generate the following output, showing that Bob’s record had been deleted from the query result:

{
 “added”: [],
 “updated”: [],
 “deleted”: [
  { “ManagerName”: “Allen”, “ManagerEmail”: “allen@contoso.com”, “EmployeeName”: “Bob”, “EmployeeEmail”: “bob@contoso.com”, “RegionName”: “Southern California”, “IncidentId”: “in1000”, “IncidentSeverity”: “extreme”, “IncidentDescription”: “Forest Fire” }
 ]
}

If the severity of the Forest Fire then changed from ’extreme’ to ‘critical’, the Continuous Query would spontaneously generate the following output showing a that the result for Claire had been updated. The update includes what the result was both before and after the change:

{
 “added”: [],
 “updated”: [
  { 
   “before”: { “ManagerName”: “Allen”, “ManagerEmail”: “allen@contoso.com”, “EmployeeName”: “Bob”, “EmployeeEmail”: “bob@contoso.com”, “RegionName”: “Southern California”, “IncidentId”: “in1000”, “IncidentSeverity”: “extreme”, “IncidentDescription”: “Forest Fire” },
   “after”: { “ManagerName”: “Allen”, “ManagerEmail”: “allen@contoso.com”, “EmployeeName”: “Bob”, “EmployeeEmail”: “bob@contoso.com”, “RegionName”: “Southern California”, “IncidentId”: “in1000”, “IncidentSeverity”: “critical”, “IncidentDescription”: “Forest Fire” }
 ],
 “deleted”: []
}

In some instances, a single source change can result in multiple changes to the query result e.g. multiple records can be added, updated, and deleted. In such cases, the Continuous Query generates a single result change notification containing all the changes. This enables subscribed Reactions to treat the related changes atomically given they all arose from a single source change.

Creation

Continuous Queries can be created and managed using the Drasi CLI.

The easiest way to create a Continuous Query, and the way you will often create one as part of a broader software solution, is to:

  1. Create a YAML file containing the Continuous Query definition. This can be stored in your solution repo and versioned along with all the other solution code / resources.
  2. Run drasi apply to apply the YAML file, creating the Continuous Query

When a new Continuous Query is created it:

  1. Subscribes to its Sources, describing the types of change it wants to receive.
  2. Queries its Sources to load the initial data for its query result.
  3. Begins processing the stream of SourceChangeEvents from its Sources that represent the sequence of low-level database changes that have occurred (inserts, updated, deletes) and translates them into changes to its query result.

Here is a simple example of the definition for the Incident Alerting Continuous Query used in the example above:

apiVersion: v1
kind: ContinuousQuery
name: manager-incident-alert
spec:
  sources:    
    subscriptions:
      - id: human-resources
  query: > 
    MATCH
      (e:Employee)-[:ASSIGNED_TO]->(t:Team),
      (m:Employee)-[:MANAGES]->(t:Team),
      (e:Employee)-[:LOCATED_IN]->(:Building)-[:LOCATED_IN]->(r:Region),
      (i:Incident {type:'environmental'})-[:OCCURS_IN]->(r:Region) 
    WHERE
      elementId(e) <> elementId(m) AND i.severity IN [‘critical’, ‘extreme’] AND i.endTimeMs IS NULL
    RETURN 
      m.name AS ManagerName, m.email AS ManagerEmail, 
      e.name AS EmployeeName, e.email AS EmployeeEmail,
      r.name AS RegionName, 
      elementId(i) AS IncidentId, i.severity AS IncidentSeverity, i.description AS IncidentDescription

In this example, the spec.sources.subscriptions property identifies the Source with the id human-resources as the source of data for the Continuous Query. The spec.query property contains the text of the Cypher query. Full details of the Continuous Query configuration options are described in the Configuration section.

If this Continuous Query resource definition was contained in a file called query.yaml, to create this query on a Drasi environment that was the current Kubectl context, you would run the command:

drasi apply -f query.yaml

You can then use additional drasi commands to query the existence and status of the Continuous Query resource. For example, to see a list of the active Continuous Queries, run the following command:

drasi list query

Deletion

To delete an active Continuous Query, run the following command:

drasi delete query <query-id>

For example, if the Continuous Query id from the name property of the resource definition is manager-incident-alert, you would run,

drasi delete query manager-incident-alert

Note: Drasi does not currently enforce dependency integrity between Continuous Queries and Reactions. If you delete a Continuous Query that is used by one or more Reactions, they will stop getting query result changes.

Configuration

The definition for a Continuous Query has the following basic structure:

apiVersion: v1
kind: ContinuousQuery
name: <continuous_query_id>
spec:
  mode: <QUERY | filter>
  container: <query_container_id>
  storageProfile: <storage_profile_id>
  sources:    
    subscriptions:
      - id: <source_1_id>
        nodes:
          - sourceLabel: <source_1_node_1_source_label>, 
            queryLabel: <source_1_node_1_query_label>,
            suppressIndex: <true | FALSE>
          - ...
        relations:
          - sourceLabel: <source_1_rel_1_source_label>, 
            queryLabel: <source_1_rel_1_query_label>,
            suppressIndex: <true | FALSE>
          - ...
      - id: <source_2_id>
        nodes: ...
        relations: ...
    joins:
      - id: <join_1_id>
        keys:
          - label: <key_1_label_name>
            property: <key_1_property_name>
          - label: <key_2_label_name>
            property: <key_2_property_name>
      - id: <join_2_id>
        keys: ...
  view:
    enabled: <true | false>
    retentionPolicy:
      <latest: | all: | expire: afterSeconds: <TTL>>
  params:
    <param_1_key>: <param_1_value>
    <param_2_key>: <param_2_value>
    ...
  query: MATCH ... WHERE ... RETURN ...

In the Continuous Query resource definition:

  • apiVersion must be v1
  • kind must be ContinuousQuery
  • name is the id of the Continuous Query and must be unique. Is used to identify the Continuous Query through the CLI/API and in Reactions to identify the Continuous Queries they should subscribe to..

The following table provides a summary of the other configuration settings from the spec section of the resource definition:

Name Description
mode Can have the value query (default) or filter. If a Continuous Query is running in filter mode, it does not maintain a query result and as such does not generate detailed change notifications in response to Source changes. Instead, any Source change that adds or updates a query result will be output as an added result item. Any change that causes results to be removed from the query result will not generate output.
container The logical Query Container that hosts the query. If none is specified default will be used, which is the default container created when Drasi is installed. Query Containers host a number of queries and can be scaled up or down. Each Query Container also defines a set storage profiles that hold the configuration of the backing data store for indexes.
storageProfile The name of the storage profile on the query container to use. This settings controls how Drasi caches the Continuous Query element and solution indexes. These profiles are defined on the query container. The default query container has memory and redis configured by default and redis is the default profile if none is specified. Using memory-based indexes is good for testing and is also OK for Continuous Queries that do not require significant bootstrapping when they start.
sources Contains two sections: subscriptions and joins. The subscriptions section describes the Sources the Continuous Query will subscribe to for data and optionally maps the Source Labels to the Label names used in the Cypher Query. The joins section describes the way the Continuous Query connects elements from multiple sources to enable you to write graph queries that span sources. Both sections are described in more detail in the Sources section.
params Parameter values that are used by the Cypher query, enabling the repeated use of the same query that can be customized using parameter values.
view (Optional). Defines the behavior of the results view. enabled controls if the results of the query are cached in the results view. retentionPolicy determines how long the results will be stored for. latest (default) only holds the most recent version, all holds all previous versions and allows querying at a time point in the past, expire holds the non current results for a limited time.
query The Cypher query that defines the change the Continuous Query is detecting and the output it generates. Explained in Continuous Query Syntax.

Sources

The configuration settings in the spec.sources.subscriptions section of the Continuous Query resource definition do the following:

  • Define the Sources the Continuous Query will subscribe to for its change data.
  • Within each Source, define the Labels of the Nodes and Relations that the the Continuous Query expects from that Source. If you do not map a Node/Relation Label used in the query to a specific Source, it is assumed to come from the first Source configured.
  • For each Node and Relation, create a mapping between the Label name used in the Source data and the Label name used in the Query. This allows you to write queries independent of the Label/Type names used in the source data, easily use the same Query against multiple sources, and deal with the name collisions across Sources.
  • For each Node and Relation, you also have the ability to disable element level caching. This can be useful when processing append only logs where the main element being processed (i.e. the log record) will never change once created, and will not be referenced in query future query results.

The configuration settings in the spec.sources.joins section of the Continuous Query resource definition create a mapping between a Label and Property name pair from one source, with a Label and Property Name pair in another source. This allows the Continuous Query to be written as a single unified query without consideration from which Source data is originating from. Drasi will use the mapping to create synthetic relations between Nodes as required.

Here is an example of a Continuous Query from the Curbside Pickup demo app that defines two Sources: phys-ops and retail-ops:

apiVersion: v1
kind: ContinuousQuery
name: curbside-pickup
spec:
  mode: query
  sources:    
    subscriptions:
      - id: phys-ops
        nodes:
          - sourceLabel: Vehicle
          - sourceLabel: Zone
        relations:
          - sourceLabel: LOCATED_IN
      - id: retail-ops
        nodes:
          - sourceLabel: Driver
          - sourceLabel: Order
          - sourceLabel: OrderPickup
        relations:
          - sourceLabel: PICKUP_DRIVER
          - sourceLabel: PICKUP_ORDER
    joins:
      - id: VEHICLE_TO_DRIVER
        keys:
          - label: Vehicle
            property: plate
          - label: Driver
            property: plate
  query: ...