Data Pipeline Examples
This page explains ways to create data pipelines.
Stream Joins
This example shows joining two stream based on a condition.
For more information on other join operations, refer to the Stream Worker Query Guide.
Stream Joins Example
CREATE STREAM TemperatureStream (roomNo string, temperature double);
CREATE STREAM HumidityStream (roomNo string, humidity double);
@info(name = 'Equi-join')
-- Join latest `temperature` and `humidity` events arriving within 1 minute for each `roomNo`.
INSERT INTO TemperatureHumidityStream
SELECT t.roomNo, t.temperature, h.humidity
FROM TemperatureStream window unique:time(roomNo, 1 min) AS t
JOIN HumidityStream window unique:time(roomNo, 1 min) AS h
ON t.roomNo == h.roomNo;
@info(name = 'Join-on-temperature')
INSERT INTO EnrichedTemperatureStream
SELECT t.roomNo, t.temperature, h.humidity
-- Join when events arrive in `TemperatureStream`.
FROM TemperatureStream AS t
-- When events get matched in `time()` window, all matched events are emitted, else `null` is emitted.
LEFT OUTER JOIN HumidityStream window sliding_time(1 min) AS h
ON t.roomNo == h.roomNo;
JOIN Behavior
When events are sent to TemperatureStream stream and HumidityStream stream, the following events are emitted at TemperatureHumidityStream via Equi-JOIN query, and EnrichedTemperatureStream via JOIN-on-temperature query:
| Time | Input to TemperatureStream | Input to HumidityStream | Output at TemperatureHumidityStream | Output at EnrichedTemperatureStream |
|---|---|---|---|---|
| 9:00:00 | ['1001', 18.0] | - | - | ['1001', 18.0, null] |
| 9:00:10 | - | ['1002', 72.0] | - | - |
| 9:00:15 | - | ['1002', 73.0] | - | - |
| 9:00:30 | ['1002', 22.0] | - | ['1002', 22.0, 73.0] | ['1002', 22.0, 72.0], [ '1002', 22.0, 73.0] |
| 9:00:50 | - | ['1001', 60.0] | ['1001', 18.0, 60.0] | - |
| 9:01:10 | - | ['1001', 62.0] | - | - |
| 9:01:20 | ['1001', 17.0] | - | ['1001', 17.0, 62.0] | ['1001', 17.0, 60.0], [ '1001', 17.0, 62.0] |
| 9:02:10 | ['1002', 23.5] | - | - | ['1002', 23.5, null] |
Partition Events by Value
This example shows partitioning events by attribute values. For more informatiON ON partition refer the Stream Query Guide.
Partition Events by Value Example
CREATE STREAM LoginStream ( userID string, loginSuccessful bool);
-- Optional purging configuratiON to remove partitiON instances that haven't received events for `1 hour` by checking every `10 sec`.
@purge(enable='true', interval='10 sec', idle.period='1 hour')
-- Partitions the events based ON `userID`.
partitiON with ( userID of LoginStream )
begin
@info(name='Aggregation-query')
-- Calculates success and failure login attempts FROM the last 3 events of each `userID`.
INSERT INTO #LoginAttempts
SELECT userID, loginSuccessful, count() AS attempts
FROM LoginStream WINDOW SLIDING_LENGTH(3)
GROUP BY loginSuccessful;
-- Inserts results to `#LoginAttempts` inner stream that is only accessible within the partitiON instance.
@info(name='Alert-query')
-- Consumes events FROM the inner stream, and suspends `userID`s that have 3 consecutive login failures.
INSERT INTO UserSuspensionStream
SELECT userID, "Three consecutive login failures!" AS message
FROM #LoginAttempts[loginSuccessful==false and attempts==3];
end;
Partition Behavior
When events are sent to LoginStream stream, following events will be generated at #LoginAttempts inner stream via Aggregation-query query, and UserSuspensionStream via Alert-query query:
Input to TemperatureStream | At #LoginAttempts | Output at UserSuspensionStream |
|---|---|---|
['1001', false] | ['1001', false, 1] | - |
['1002', true] | ['1002', true, 1] | - |
['1002', false] | ['1002', false, 1] | - |
['1002', false] | ['1002', false, 2] | - |
['1001', false] | ['1001', false, 2] | - |
['1001', true] | ['1001', true, 1] | - |
['1001', false] | ['1001', false, 2] | - |
['1002', false] | ['1002', false, 2] | ['1002', '3 consecutive login failures!'] |
Scatter and Gather (String)
This example shows performing scatter and gather ON string values.
Scatter and Gather (String) Example
CREATE STREAM PurchaseStream (userId string, items string, store string);
@info(name = 'Scatter-query')
-- Scatter value of `items` in to separate events by `,`.
INSERT INTO TokenizedItemStream
SELECT userId, token AS item, store
FROM PurchaseStream#str:tokenize(items, ',', true);
@info(name = 'Transform-query')
-- Concat tokenized `item` with `store`.
INSERT INTO TransformedItemStream
SELECT userId, str:concat(store, "-", item) AS itemKey
FROM TokenizedItemStream;
@info(name = 'Gather-query')
INSERT INTO GroupedPurchaseItemStream
-- Concat all events in a batch separating them by `,`.
SELECT userId, str:groupConcat(itemKey, ",") AS itemKeys
-- Collect events traveling AS a batch via `batch()` window.
FROM TransformedItemStream window batch();
Scatter and Gather (String) Input
The following event containing a JSON string is sent to PurchaseStream:
['501', 'cake,cookie,bun,cookie', 'CA']
Scatter and Gather (String) Output
After processing, the events arrive at TokenizedItemStream:
['501', 'cake', 'CA'], ['501', 'cookie', 'CA'], ['501', 'bun', 'CA']
The events arrive at TransformedItemStream:
['501', 'CA-cake'], ['501', 'CA-cookie'], ['501', 'CA-bun']
The event arrive at GroupedPurchaseItemStream:
['501', 'CA-cake,CA-cookie,CA-bun']
Scatter and Gather (JSON)
This example shows performing scatter and gather ON JSON values.
Scatter and Gather (JSON) Example
CREATE STREAM PurchaseStream (order string, store string);
@info(name = 'Scatter-query')
-- Scatter elements under `$.order.items` in to separate events.
INSERT INTO TokenizedItemStream
SELECT json:getString(order, '$.order.id') AS orderId,
jsonElement AS item,
store
FROM PurchaseStream#json:tokenize(order, '$.order.items');
@info(name = 'Transform-query')
-- Provide `$5` discount to cakes.
INSERT INTO DiscountedItemStream
SELECT orderId,
ifThenElse(json:getString(item, 'name') == "cake",
json:toString(
json:setElement(item, 'price',
json:getDouble(item, 'price') - 5
)
),
item) AS item,
store
FROM TokenizedItemStream;
@info(name = 'Gather-query')
INSERT INTO GroupedItemStream
-- Combine `item` FROM all events in a batch AS a single JSON array.
SELECT orderId, json:group(item) AS items, store
-- Collect events traveling AS a batch via `batch()` window.
FROM DiscountedItemStream window batch();
@info(name = 'Format-query')
INSERT INTO DiscountedOrderStream
-- Format the final JSON by combining `orderId`, `items`, and `store`.
SELECT str:fillTemplate("""
{"discountedOrder":
{"id":"{{1}}", "store":"{{3}}", "items":{{2}} }
}""", orderId, items, store) AS discountedOrder
FROM GroupedItemStream;
Scatter and Gather (JSON) Input
Below event is sent to PurchaseStream:
[{
"order":{
"id":"501",
"items":[{"name":"cake", "price":25.0},
{"name":"cookie", "price":15.0},
{"name":"bun", "price":20.0}
]
}
}, 'CA']
Scatter and Gather (JSON) Output
After processing, following events arrive at TokenizedItemStream:
['501', '{"name":"cake","price":25.0}', 'CA'],
['501', '{"name":"cookie","price":15.0}', 'CA'],
['501', '{"name":"bun","price":20.0}', 'CA']
The events arrive at DiscountedItemStream:
['501', '{"name":"cake","price":20.0}', 'CA'],
['501', '{"name":"cookie","price":15.0}', 'CA'],
['501', '{"name":"bun","price":20.0}', 'CA']
The event arriving at GroupedItemStream is:
['501', '[{"price":20.0,"name":"cake"},{"price":15.0,"name":"cookie"},{"price":20.0,"name":"bun"}]', 'CA']
The event arriving at DiscountedOrderStream is:
[
{"discountedOrder":
{
"id":"501",
"store":"CA",
"items":[{"price":20.0,"name":"cake"},
{"price":15.0,"name":"cookie"},
{"price":20.0,"name":"bun"}]
}
}
]