Importing Huge Datasets into Crate

Usually projects do not start from scratch, most of the time there is pre-existing data that needs to be imported and sometimes the amount of data is significant.

Assuming that you have an existing application that generates a few hundred thousand records a day and you are about to migrate to a new stack with Crate as the database backend. You’ll need a way to import the existing millions of records into Crate as quickly as possible.

This best practice example will guide you through the process and shows tips and tricks on how to import your data quickly and safely.

Defining the Data Structure

Before starting the import you’ll have to consider how your table structure will look. Decisions made at this point will influence the import process later.

For this example we have a simple single user table with 6 columns of various types:

cr> CREATE TABLE user (
...   id INT primary key,
...   name STRING,
...   day_joined TIMESTAMP,
...   bio STRING INDEX using fulltext,
...   address OBJECT (dynamic) AS (
...     city STRING,
...     country STRING
...   )
... );
CREATE OK, 1 row affected (... sec)

There’s nothing wrong with this method and it does the job, but it’s not very performant and therefore not what we want to use in a real world application.

Shards & Replicas

If you do not set the number of shards and/or number of replicas, the default configuration will be used:

shards:Depends on the number of data-nodes in the cluster (see CLUSTERED Clause)
replicas:1

So we recommend you choose the number of shards wisely. They depend on the number of nodes in the cluster as well as on the amount of data that goes into the table.

Note

Be careful, you cannot change the number of shards once the table is created!

Assuming there are 6 nodes in the cluster, and we put 2 shards on each node giving us a total of 12 shards, which should be enough for millions of records.

Shards can be thought of as “virtual nodes” - create enough for your needs for scaling, but use as few as possible to keep the resource overhead (such as file descriptors and memory) as small as possible.

More importantly, set the number of replicas as low as possible, ideally to zero while importing data. In case the import fails, we can drop the table and re-import again. When the import succeeds, adjust the number of replicas according to your availability requirements.

The CREATE TABLE statement now looks like:

cr> CREATE TABLE user (
...   id INT primary key,
...   name STRING,
...   day_joined TIMESTAMP,
...   bio STRING INDEX using fulltext,
...   address OBJECT (dynamic) AS (
...     city STRING,
...     country STRING
...   )
... ) CLUSTERED INTO 12 shards
... WITH (number_of_replicas = 0);
CREATE OK, 1 row affected (... sec)

See also

Refresh Interval

Another simple, but very important tweak to speed up importing is to set the refresh interval of the table to 0. This will disable the periodic refresh of the table that is needed to minimise the effect of eventual consistency and therefore also minimise the overhead during import.

cr> ALTER TABLE user SET (refresh_interval = 0);
ALTER OK, -1 rows affected (... sec)

It’s possible to set the refresh interval in the CREATE TABLE statement:

cr> CREATE TABLE user (
...   id INT primary key,
...   name STRING,
...   day_joined TIMESTAMP,
...   bio STRING INDEX using fulltext,
...   address OBJECT (dynamic) AS (
...     city STRING,
...     country STRING
...   )
... ) CLUSTERED INTO 12 shards
... WITH (
...   number_of_replicas = 0,
...   refresh_interval = 0
... );
CREATE OK, 1 row affected (... sec)

Once the import is finished you can set the refresh interval to a reasonable value (time in ms):

cr> ALTER TABLE user SET (refresh_interval = 1000);
ALTER OK, -1 rows affected (... sec)

Store Level Throttling

If you do not need to query your data during import, which is the case most of the time, you can lighten the merge throttling behaviour that otherwise would ensure better search performance.

To improve indexing performance you can temporarily disable throttling completely by setting the indices.store.throttle.type to none.

cr> SET GLOBAL TRANSIENT indices.store.throttle.type = 'none';
SET OK, 1 row affected (... sec)

However if you still want to throttle the merging of segments during import you can increase the maximum bytes per second from its default of 20mb to something like 100-200mb/s for SSD disks:

cr> SET GLOBAL TRANSIENT indices.store.throttle.max_bytes_per_sec = '150mb';
SET OK, 1 row affected (... sec)

After import don’t forget to turn throttling on again by setting its value to merge (default) or all.

cr> SET GLOBAL TRANSIENT indices.store.throttle.type = 'merge';
SET OK, 1 row affected (... sec)

Importing Data Using COPY FROM

Once the table is created it’s time for the actual import. Use the COPY FROM command to import data into a table efficiently. For more in-depth documentation on COPY FROM see COPY FROM.

JSON Import Format

Crate has native support for JSON formatted data, where each line is a JSON string and represents a single record. Empty lines are skipped. The keys of the JSON objects are mapped to columns when imported - nonexistent columns will be created if necessary. .

For example: users.json

{"id": 1, "name": "foo", "day_joined": 1408312800, "bio": "Lorem ipsum dolor sit amet, consectetuer adipiscing elit.", "address": {"city": "Dornbirn", "country": "Austria"}}
{"id": 2, "name": "bar", "day_joined": 1408312800, "bio": "Lorem ipsum dolor sit amet, consectetuer adipiscing elit.", "address": {"city": "Berlin", "country": "Germany"}}

COPY FROM Command

Upon execution, each node will check the provided path locally if the file exists and import the data it contains. Consequently this command will check /tmp/best_practice_data/ on each node in the cluster to import data from a file called ‘users.json’. Please note that if the file is not found the command will return successfully, reporting COPY OK, 0 rows affected (... sec).

cr> COPY user FROM '/tmp/best_practice_data/users.json';
COPY OK, 150 rows affected (... sec)

Note

When importing data using COPY FROM Crate does not check if the types from the columns and the types from the import file match. It does not cast the types to their target but will always import the data as in the source file(s).

Bulk Size

The bulk size defines the amount of lines that are read at once and imported into the table. You can specify it in the WITH clause of the statement and defaults to 10 000 if not specified.

For example:

cr> COPY user FROM '/tmp/best_practice_data/users.json'
... WITH (bulk_size = 2000);
COPY OK, 150 rows affected (... sec)

In our example it will not make a difference, but if you have a more complex dataset with a lot of columns and large values, it makes sense to decrease the bulk_size. Setting bulk_size too high might consume a lot of node resources while a low bulk_size can increase the overhead per request.

Compression

If you do not have your data locally to the nodes, but somewhere on the network, e.g. a NAS or on S3, it’s recommended to use gzip compressed files to reduce network traffic.

Crate does not automatically detect compression, so you’ll need to specify gzip compression in the WITH clause.

For example:

cr> COPY user FROM '/tmp/best_practice_data/users.json.gz'
... WITH (compression = 'gzip');
COPY OK, 150 rows affected (... sec)

Partitioned Tables

Sometimes you want to split your table into partitions to be able to handle large datasets more efficiently (e.g. for queries to run on a reduced set of rows). To demonstrate data import into partitioned tables, we create partitions for every day (in production, this depends on your use case).

Partitions can be created using the CREATE TABLE statement using the PARTITIONED BY clause.

A partition column has to be part of the primary key (if one was explicitly declared), so in our example this constraint is added to the newly created partition column.

cr> CREATE TABLE user (
...   id INT primary key,
...   name STRING,
...   day_joined TIMESTAMP primary key,
...   bio STRING INDEX using fulltext,
...   address OBJECT (dynamic) AS (
...     city STRING,
...     country STRING
...   )
... ) CLUSTERED INTO 6 shards
... PARTITIONED BY (day_joined)
... WITH (number_of_replicas = 0);
CREATE OK, 1 row affected (... sec)

To import data into partitioned tables efficiently you should import each table partition separately. Since the value of the table partition is not stored in the column of the table, the JSON source must not contain the column value.

For example: users_1408312800.json

{"id": 1, "name": "foo", "bio": "Lorem ipsum dolor sit amet, consectetuer adipiscing elit.", "address": {"city": "Dornbirn", "country": "Austria"}}
{"id": 2, "name": "bar", "bio": "Lorem ipsum dolor sit amet, consectetuer adipiscing elit.", "address": {"city": "Berlin", "country": "Germany"}}

The value of the partition column must be defined in the COPY FROM statement using the PARTITION clause:

cr> COPY user PARTITION (day_joined=1408312800)
... FROM '/tmp/best_practice_data/users_1408312800.json';
COPY OK, 23 rows affected (... sec)

This way, Crate does not need to resolve the partition for each row that is imported but can store it directly into the correct place resulting in a much faster import.

However, it’s still possible (but not recommended) to import into partitioned tables without the PARTITION clause and have the column value in the source.

When importing data into a partitioned table with existing partitions, it may be wanted to apply import optimizations like e.g. disable the Refresh Interval only to newly created partitions. This can be done by altering the partitioned table only by using the ALTER TABLE ONLY statement.

Similarly, the number of shards can be adjusted for newly created partitions to adapt to the increasing data volume! Simply use ALTER TABLE users SET (number_of_shards = X) before creating a new partition.

See also

Summary

To sum up the points described above, importing huge datasets is not difficult if a few things are kept in mind. These are:

  • Reduce the number of replicas as much as possible, ideally to 0. Replication slows down the import process significantly.
  • Use only as many shards as you really need.
  • Disable the periodic table refresh by setting the refresh interval to 0 during import.
  • Adjust the bulk size of the import depending on the size of your records.
  • Import table partitions separately using the PARTITION clause in the COPY TO statement.

And last but not least:

  • Import speed significantly increases with increasing disk I/O. Using SSDs for Crate is recommended anyway, but having one more disk (by adding another node) in the cluster, can make quite a difference.

Further Reading

See also