Building data stream pipelines with CrateDB and StreamSets data collector

Table of contents

Introduction

CrateDB can be integrated with StreamSets using its JDBC driver.

The CrateDB JDBC driver can be used as an origin and destination stage in the StreamSets Data Collector pipeline.

Install and run CrateDB on localhost.

Install the StreamSets Data Collector on localhost.

In order to build your first data stream pipeline the standalone version of the CrateDB JDBC driver has to be obtained and installed in the StreamSets as an external library. You can download the latest standalone version directly from Crate.io’s Bintray Repository.

After the JDBC driver is downloaded to an arbitrary destination, you can proceed with the installation of the driver for StreamSets. We recommend following the StreamSets tutorial on installing external libs. Although, to get started quickly, you can place the CrateDB JDBC driver JAR file in the StreamSets classpath.

The next two sections provide a brief introduction on how to build data stream pipelines using CrateDB with the StreamSets Data Collector. In the first section, we are going to demonstrate how to build the pipeline with the directory origin stage that supplies the CSV data sample and streams data to JDBC destination stage using the CrateDB JDBC driver. In the second section, we demonstrate how to use the CrateDB JDBC driver as a StreamSets origin.

We use the following versions in the tutorial:

  • CrateDB – 3.2.3
  • CrateDB JDBC Driver – 2.5.1
  • StreamSets Data Collector – 3.7.2

CrateDB JDBC Producer

The CrateDB JDBC Producer stage uses the JDBC connection to write data into the database. In this section, we show how to build the StreamSets project for ingesting CSV records from the local filesystem into CrateDB with the pre-processing of some record fields.

  1. Create a new data collector pipeline and create and configure the directory origin to reads CSV files from the local file system. We use the New York taxi data sample in the tutorial.

    Configure the directory origin
  2. For the sake of simplicity, we use only 4 fields from the CSV files. All source fields are represented as strings in the CSV. However, some of the selected fields should be converted into float values. Therefore, we add an additional processing Field Type Converter stage into the pipeline.

    Convert string input values
  3. Create the taxi table in CrateDB.

    CREATE TABLE taxi (
         hack_license STRING,
         medallion STRING,
         total_amount FLOAT,
         tip_amount FLOAT
    );
    
  4. The next step is to configure the CrateDB JDBC destination. First, load the CrateDB JDBC driver. Then configure the JDBC driver with the connection string, schema, table and default operation.

    Configure the CrateDB JDBC Producer

    Finally, we set the default credentials to CrateDB in the “credentials” tab.

    Set credentials for the CrateDB JDBC Producer
  5. Start the pipeline.

    Pipeline run report

CrateDB JDBC Query Consumer

The JDBC Query Consumer uses the JDBC connection to read data from CrateDB. The CrateDB JDBC Query Consumer stage returns data as a map with column names and field values.

Currently, the usage of the CrateDB JDBC driver in combination with StreamSets Data Collector introduces few limitations:

  • Only incremental mode for the data fetch is supported.
  • The offset column should be the primary key column to prevent the insertion of duplicate rows.

The followings steps demonstrate how CrateDB can be used as the origin stage in the data stream pipeline. As sample data, we use the AMPLab rankings dataset. The data can be imported from AWS S3 to the CrateDB database using prepared data import queries hosted in the CrateDB demo repository. Create the rankings table and import the rankings data. In the demo, we use a dataset that contains 18 million of records. Having the CrateDB cluster set up and the rankings sample data is imported, we can start building the data stream pipeline for streaming the data from the CrateDB database to CSV files.

  1. Create a new data collector pipeline and configure the CrateDB JDBC Driver loading as it was done for the JDBC destination configuration. In the JDBC tab of the CrateDB JDBC origin we set the connection string, the SQL query for fetching the data from the database, the initial offset, and offset column. For more detail information on how to set the offset column and its value, see the JDBC Query Consumer offset documentation.

    Configure the CrateDB JDBC driver

    IMPORTANT

    To avoid unsupported transaction setting queries that may be invoked against CrateDB, it is necessary to uncheck Enforce Read-only Connection on the advanced tab of the JDBC consumer.

  2. We stream the records from CrateDB to CSV files. In order to accomplish that we provide the path where the files are going to be created and set the output file format in the directory origin to Delimited.

    Configure CSV destination
  3. Now we can start the pipeline and see rankings data streaming statistics form the CrateDB database to CSV files.

    CrateDB to CSV report