Streams
Refer to our stream definition for more information.
List
POST /streams.list
The list of created streams is returned.
Sample response
[
{
"name": "customapp",
"topic": "customapp"
},
{
"name": "events",
"topic": "events"
}
]
At the moment the name of the output topic is the same as the name of the stream itself.
Create
POST /streams.create
Sample request
{
"name": "customapp",
"topics": [
{
"name": "messages",
"fields": [
{
"name": "source",
"type": "string",
"newName": "from"
},
{
"name": "senderId",
"type": "string",
"nawName": "sender"
}
]
},
{
"name": "channels",
"fields": [
{
"name": "connectionState",
"type": "string",
"newName": "state"
}
]
}
],
"joins": [
{
"name": "newSource",
"field1": "source", // From the first topic in the "topics" list
"field2": "source" // From the second topic in the "topics" list
}
],
"aggregations": [],
"key": "messages.source"
}
The name of the created stream is returned, alongside with the output topic.
Sample response
{
"name": "customapp",
"outputTopic": "customapp"
}
Note that in the joins
object of the request payload, field1
should come from the first topic in the "topics" list and field2
should come from the second one. At the moment only two topics are supported for joining.
Also the field newName
in the description of the fields supports _
but support .
and -
.
Info
POST /streams.info
Sample request
{
"name": "customapp"
}
Information about the stream is returned.
Sample response
{
"stream": {
"name": "CUSTOMAPP",
"writeQueries": [
{
"queryString": "CREATE STREAM CUSTOMAPP WITH (KAFKA_TOPIC='CUSTOMAPP', PARTITIONS=10, REPLICAS=1) AS SELECT\n A.SOURCE A_SOURCE,\n A.SENDERID SENDERID,\n B.CONNECTIONSTATE CONNECTIONSTATE\nFROM MESSAGES A\nINNER JOIN CHANNELS B WITHIN 365 DAYS ON ((B.SOURCE = A.SOURCE))\nEMIT CHANGES;",
"sinks": ["CUSTOMAPP"],
"sinkKafkaTopics": ["CUSTOMAPP"],
"id": "CSAS_CUSTOMAPP_79",
"statusCount": {
"RUNNING": 1
},
"queryType": "PERSISTENT",
"state": "RUNNING"
}
],
"fields": [
{
"name": "A_SOURCE",
"schema": {
"type": "STRING"
},
"type": "KEY"
},
{
"name": "SENDERID",
"schema": {
"type": "STRING"
}
},
{
"name": "CONNECTIONSTATE",
"schema": {
"type": "STRING"
}
}
],
"type": "STREAM",
"keyFormat": "KAFKA",
"valueFormat": "AVRO",
"topic": "CUSTOMAPP",
"partitions": 10,
"replication": 1,
"statement": "CREATE STREAM CUSTOMAPP WITH (KAFKA_TOPIC='CUSTOMAPP', PARTITIONS=10, REPLICAS=1) AS SELECT\n A.SOURCE A_SOURCE,\n A.SENDERID SENDERID,\n B.CONNECTIONSTATE CONNECTIONSTATE\nFROM MESSAGES A\nINNER JOIN CHANNELS B WITHIN 365 DAYS ON ((B.SOURCE = A.SOURCE))\nEMIT CHANGES;"
}
}
Delete
POST /streams.delete
Sample request
{
"name": "customapp"
}
The name of the deleted stream is returned.
Sample response
{
"name": "customapp"
}