MQTT Ingestion Source

MQTT (Message Queue Telemetry Transport) is a machine-to-machine communication protocol particularly well suited for machine data and Internet of Things (IoT) applications.

The MQTT ingestion source allows CrateDB to function as an MQTT endpoint for data ingestion. Incoming messages can be written to user defined tables according to user defined rules. These tables can then be queried or polled at will by CrateDB clients.

Using CrateDB as an MQTT endpoint removes the need for intermediary brokers, message queues, or MQTT subscribers doing message transformation or persistence.

Note

The MQTT ingestion source an enterprise feature.

Warning

The MQTT ingestion is deprecated and will be removed in the future.

Table of Contents

Configuration

This ingestion source has a few node settings that you can configure.

SSL support is available for the MQTT connections and can be enabled using the ssl.ingestion.mqtt.enabled setting. When SSL is enabled for the MQTT connection, consider changing ingestion.mqtt.port to 8883 which is the TCP/IP registered port for MQTT over SSL.

Once CrateDB is configured to use encrypted MQTT connections, unencrypted connections will not be supported anymore.

Usage

Quality of Service

This ingestion source only implements MQTT Quality of Service (QoS) level one, meaning that messages will be delivered at least once to the endpoint and stored in the database.

Other QoS levels are not supported and messages sent with those levels are discared and an error is logged.

Data Ingestion

The ingestion of data is controlled with Ingestion Rules.

Without any defined valid rule, no data will be ingested at all and messages will not be acknowledged. The PUBLISH messages will receive the corresponding PUBACK reply only after the message is stored in all matching ingest rules target tables (as there can be multiple ingestion rules configured for this source). If at least one matching ingestion rule fails to store the incoming message, the client will not receive any reply and will have to resend the request message. Ingest rules execution failures will usually be temporary (eg. a mismatch between a rule target table schema and ingestion source payload structure) and the administrator will have the opportunity to fix the rule execution between message retransmissions (eg. alter the target table schema).

The source_ident of this implementation is: mqtt

Rule conditions and the target table must match the MQTT data structure.

The default user for the INSERT operations in the target_table is the superuser crate.

MQTT Data Structure

Name Type Description
client_id STRING ID of the client that sent the MQTT message.
packet_id STRING packet_id of the PUBLISH message.
topic STRING topic of the PUBLISH message.
ts TIMESTAMP Insert timestamp (CURRENT_TIMESTAP).
payload OBJECT payload of the PUBLISH message. Must be a valid JSON string!

Example

To start ingesting with the MQTT data ingestion source, first create a target table matching the MQTT data structure, like so:

cr> CREATE TABLE mqtt_temperature (
...  "client_id" STRING,
...  "packet_id" INTEGER,
...  "topic" STRING,
...  "ts" TIMESTAMP,
...  "payload" OBJECT(IGNORED),
...  PRIMARY KEY ("client_id", "packet_id")
... )
CREATE OK, 1 row affected (... sec)

The structure of this target table is very important, as it can prevent or allow duplicates in case of message retransmission.

In this example, if a message is delivered multiple times, the message will only be stored once in the mqtt_temperature table because the PRIMARY KEY includes both the client_id and packet_id. If the packet_id were to be omitted from the primary key, a message arriving at CrateDB multiple times will be stored multiple times.

Once you have done that, you can create ingestion rules, like so:

cr> CREATE INGEST RULE temperature ON mqtt
...  WHERE topic like 'temperature/%'
...  INTO mqtt_temperature;
CREATE OK, 1 row affected (... sec)