Kafka adapter

Note:

KafkaAdapter is an experimental feature, changes in public API and usage are expected.

For instructions on downloading and building Calcite, start with thetutorial.

The Kafka adapter exposes an Apache Kafka topic as a STREAM table, so it can be queried using Calcite Stream SQL. Note that the adapter will not attempt to scan all topics, instead, users need to configure tables manually, one Kafka stream table is mapping to one Kafka topic.

A basic example of a model file is given below:

{
  "version": "1.0",
  "defaultSchema": "KAFKA",
  "schemas": [
    {
      "name": "KAFKA",
      "tables": [
        {
          "name": "TABLE_NAME",
          "type": "custom",
          "factory": "org.apache.calcite.adapter.kafka.KafkaTableFactory",
          "row.converter": "com.example.CustKafkaRowConverter",
          "operand": {
            "bootstrap.servers": "host1:port,host2:port",
            "topic.name": "kafka.topic.name",
            "consumer.params": {
              "key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
              "value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
            }
          }
        }
      ]
    }
  ]
}

Note that:

  1. As Kafka message is schemaless, a KafkaRowConverter is required to specify row schema explicitly(with parameter row.converter), and how to decode Kafka message to Calcite row. KafkaRowConverterImpl is used if not provided;

  2. More consumer settings can be added in parameter consumer.params;

Assuming this file is stored as kafka.model.json, you can connect to Kafka via sqlline as follows:

$ ./sqlline
sqlline> !connect jdbc:calcite:model=kafka.model.json admin admin

sqlline will now accept SQL queries which access your Kafka topics.

With the Kafka table configured in the above model. We can run a simple query to fetch messages:

sqlline> SELECT STREAM *
         FROM KAFKA.TABLE_NAME;
+---------------+---------------------+---------------------+---------------+-----------------+
| MSG_PARTITION |    MSG_TIMESTAMP    |     MSG_OFFSET      | MSG_KEY_BYTES | MSG_VALUE_BYTES |
+---------------+---------------------+---------------------+---------------+-----------------+
| 0             | -1                  | 0                   | mykey0        | myvalue0        |
| 0             | -1                  | 1                   | mykey1        | myvalue1        |
+---------------+---------------------+---------------------+---------------+-----------------+

Kafka table is a streaming table, which runs continuously.

If you want the query to end quickly, add LIMIT as follows:

sqlline> SELECT STREAM *
         FROM KAFKA.TABLE_NAME
         LIMIT 5;