MQTT Ingestion Source

MQTT (Message Queue Telemetry Transport) is a machine-to-machine communication protocol particularly well suited for machine data and Internet of Things (IoT) applications.

The MQTT ingestion source allows CrateDB to function as an MQTT endpoint for data ingestion. Incoming messages can be written to user defined tables according to user defined rules. These tables can then be queried or polled at will by CrateDB clients.

Using CrateDB as an MQTT endpoint removes the need for intermediary brokers, message queues, or MQTT subscribers doing message transformation or persistence.

Note

The MQTT ingestion source an enterprise feature.

Table of Contents

Configuration

This ingestion source adds a few additional settings that you can configure.

These node settings can be configured like usual, via the crate.yml configuration file or as command line parameters using the -C option.

Node Settings

Note

Node settings only affect the node they are configured on.

ingestion.mqtt.enabled
Default: false
Runtime: no

Enables the MQTT ingestion source on this node.

ingestion.mqtt.port
Default: 1883
Runtime: no

TCP port on which the endpoint is exposed.

Can either be a number, or a string defining a port range. The first free port of this range is used.

ingestion.mqtt.timeout
Default: 10s
Runtime: no

The default keep-alive timeout for establised connections.

This timeout is used if the client does not specify a keepAlive option when sending the CONNECT message.

SSL Support

SSL support is available for the MQTT connections and can be enabled using the ssl.ingestion.mqtt.enabled setting. When SSL is enabled for the MQTT connection, consider changing ingestion.mqtt.port to 8883 which is the TCP/IP registered port for MQTT over SSL.

Once CrateDB is configured to use encrypted MQTT connections, unencrypted connections will not be supported anymore.

Usage

Quality of Service

This ingestion source only implements MQTT Quality of Service (QoS) level one, meaning that messages will be delivered at least once to the endpoint and stored in the database.

Other QoS levels are not supported and messages sent with those levels are discared and an error is logged.

Data Ingestion

The ingestion of data is controlled with Ingestion Rules.

Without any defined valid rule, no data will be ingested at all and messages will not be acknowledged. The PUBLISH messages will receive the corresponding PUBACK reply only after the message is stored in all matching ingest rules target tables (as there can be multiple ingestion rules configured for this source). If at least one matching ingestion rule fails to store the incoming message, the client will not receive any reply and will have to resend the request message. Ingest rules execution failures will usually be temporary (eg. a mismatch between a rule target table schema and ingestion source payload structure) and the administrator will have the opportunity to fix the rule execution between message retransmissions (eg. alter the target table schema).

The source_ident of this implementation is: mqtt

Rule conditions and the target table must match the MQTT data structure.

The default user for the INSERT operations in the target_table is the superuser crate.

MQTT Data Structure

Name Type Description
client_id STRING ID of the client that sent the MQTT message.
packet_id STRING packet_id of the PUBLISH message.
topic STRING topic of the PUBLISH message.
ts TIMESTAMP Insert timestamp (CURRENT_TIMESTAP).
payload OBJECT payload of the PUBLISH message. Must be a valid JSON string!

Example

To start ingesting with the MQTT data ingestion source, first create a target table matching the MQTT data structure, like so:

cr> CREATE TABLE mqtt_temperature (
...  "client_id" STRING,
...  "packet_id" INTEGER,
...  "topic" STRING,
...  "ts" TIMESTAMP,
...  "payload" OBJECT(IGNORED),
...  PRIMARY KEY ("client_id", "packet_id")
... )
CREATE OK, 1 row affected (... sec)

The structure of this target table is very important, as it can prevent or allow duplicates in case of message retransmission.

In this example, if a message is delivered multiple times, the message will only be stored once in the mqtt_temperature table because the PRIMARY KEY includes both the client_id and packet_id. If the packet_id were to be omitted from the primary key, a message arriving at CrateDB multiple times will be stored multiple times.

Once you have done that, you can create ingestion rules, like so:

cr> CREATE INGEST RULE temperature ON mqtt
...  WHERE topic like 'temperature/%'
...  INTO mqtt_temperature;
CREATE OK, 1 row affected (... sec)