Data Stream Pipelines with CrateDB and StreamSets Data Collector

This integration document details how to create data streaming pipelines using CrateDB and StreamSets Data Collector.

Abstract

The StreamSets Data Collector is a lightweight and powerful engine that allows you to build streaming, batch and change-data-capture (CDC) pipelines that can ingest and transform data from a variety of different sources.

The data collector can be paired with CrateDB either as an endpoint or as a source for the pipelines you create, leveraging CrateDB’s excellent warehousing and query functionality.

This can be done using the CrateDB JDBC driver as an external library.

Implementation

Set Up

For this implementation, you will ingest and transform the data from StreamSet’s New York taxi dataset into a CrateDB table, where it can then be used in analytics/visualization queries.

Prerequisites

To install the CrateDB JDBC driver, download the latest standalone version from Crate.io’s Maven Repository. Once downloaded, the JDBC driver can be installed for StreamSets. We recommend following the StreamSet’s tutorial on installing external libraries, but you can also include the CrateDB JDBC driver JAR file in the StreamSet’s classpath if you wish to provide it that way.

CrateDB

First, you need to create the table that will hold the New York taxi dataset:

CREATE TABLE "doc"."taxi" (
    "hack_license" TEXT,
    "medallion" TEXT,
    "total_amount" FLOAT,
    "tip_amount" FLOAT
);

With the CrateDB table created, you can now open StreamSets and begin building the pipeline.

Using CrateDB as a Destination

Firstly, create a new Data Collector Pipeline:

../../_images/streamsets-new-pipeline.png

You can then add an origin for the pipeline. In this case, you will be reading from the .csv file you downloaded earlier. To do this, add a Directory origin for the pipeline. You should then configure the directory origin so that it picks up the New York taxi CSV file:

../../_images/streamsets-directory-files.png

You can then also configure the data format of the file. The data format should be delimited, with the format type being Default CSV. The CSV file also includes a header line, so ensure the header line setting is With Header Line:

../../_images/streamsets-directory-format.png

Before you can ingest this data into CrateDB, you need to perform some lightweight field type transformations on the data. This is because all source fields are represented as strings in the CSV; but the total_amount and tip_amount columns in the CrateDB table are floats.

To do this, add a new Field Type Converter processor that connects to the Directory origin you created in the previous step. Once created, add the following field conversions to the field type converter’s configuration:

../../_images/streamsets-field-type-conversions.png

Once configured, you can finally add CrateDB as a destination for the pipeline. To do this, select a JDBC Producer destination that connects to the processor you created in the previous step. You can then configure the JDBC producer to connect to your CrateDB instance.

The JDBC settings should be as follows:

../../_images/streamsets-jdbc-producer-settings.png
../../_images/streamsets-jdbc-producer-credentials.png

You can also look at the external libraries of the JDBC producer to ensure the CrateDB JDBC driver is present:

../../_images/streamsets-jdbc-driver.png

Once created, your pipeline should look something like this:

../../_images/streamsets-pipeline.png

You can now start the pipeline by clicking the Start button, and data will flow from the CSV file, through the type transformer and into CrateDB:

../../_images/streamsets-running-pipeline.png

You can verify that the data is now in CrateDB:

cr> SELECT COUNT(*), AVG(total_amount), AVG(tip_amount) FROM doc.taxi;
+----------+--------------------+--------------------+
| count(*) |  avg(total_amount) |    avg(tip_amount) |
+----------+--------------------+--------------------+
|     5383 | 14.881101593837494 | 1.1379472384076705 |
+----------+--------------------+--------------------+
SELECT 1 row in set (0.050 sec)