SQLAlchemy: DataFrame operations

Table of Contents

About

This section of the documentation demonstrates support for efficient batch/bulk INSERT operations with pandas and Dask, using the CrateDB SQLAlchemy dialect.

Efficient bulk operations are needed for typical ETL batch processing and data streaming workloads, for example to move data in and out of OLAP data warehouses, as contrasted to interactive online transaction processing (OLTP) applications. The strategies of batching together series of records for improving performance are also referred to as chunking.

Introduction

pandas

The pandas DataFrame is a structure that contains two-dimensional data and its corresponding labels. DataFrames are widely used in data science, machine learning, scientific computing, and many other data-intensive fields.

DataFrames are similar to SQL tables or the spreadsheets that you work with in Excel or Calc. In many cases, DataFrames are faster, easier to use, and more powerful than tables or spreadsheets because they are an integral part of the Python and NumPy ecosystems.

The pandas I/O subsystem for relational databases using SQL is based on SQLAlchemy.

Dask

Dask is a flexible library for parallel computing in Python, which scales Python code from multi-core local machines to large distributed clusters in the cloud. Dask provides a familiar user interface by mirroring the APIs of other libraries in the PyData ecosystem, including pandas, scikit-learn, and NumPy.

A Dask DataFrame is a large parallel DataFrame composed of many smaller pandas DataFrames, split along the index. These pandas DataFrames may live on disk for larger-than-memory computing on a single machine, or on many different machines in a cluster. One Dask DataFrame operation triggers many operations on the constituent pandas DataFrames.

Compatibility notes

Note

Please note that DataFrame support for pandas and Dask is only validated with Python 3.8 and higher, and SQLAlchemy 1.4 and higher. We recommend to use the most recent versions of those libraries.

Efficient INSERT operations with pandas

The package provides a bulk_insert function to use the pandas.DataFrame.to_sql() method more efficiently, based on the CrateDB bulk operations endpoint. It will effectively split your insert workload across multiple batches, using a defined chunk size.

>>> import sqlalchemy as sa
>>> from crate.client.sqlalchemy.support import insert_bulk
>>> from pueblo.testing.pandas import makeTimeDataFrame
...
>>> # Define number of records, and chunk size.
>>> INSERT_RECORDS = 42
>>> CHUNK_SIZE = 8
...
>>> # Create a pandas DataFrame, and connect to CrateDB.
>>> df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S")
>>> engine = sa.create_engine(f"crate://{crate_host}")
...
>>> # Insert content of DataFrame using batches of records.
>>> # Effectively, it's six. 42 / 8 = 5.25.
>>> df.to_sql(
...     name="test-testdrive",
...     con=engine,
...     if_exists="replace",
...     index=False,
...     chunksize=CHUNK_SIZE,
...     method=insert_bulk,
... )

Tip

You will observe that the optimal chunk size highly depends on the shape of your data, specifically the width of each record, i.e. the number of columns and their individual sizes, which will in the end determine the total size of each batch/chunk.

A few details should be taken into consideration when determining the optimal chunk size for a specific dataset. We are outlining the two major ones.

  • First, when working with data larger than the main memory available on your machine, each chunk should be small enough to fit into the memory, but large enough to minimize the overhead of a single data insert operation. Depending on whether you are running other workloads on the same machine, you should also account for the total share of heap memory you will assign to each domain, to prevent overloading the system as a whole.

  • Second, as each batch is submitted using HTTP, you should know about the request size limits and other constraints of your HTTP infrastructure, which may include any types of HTTP intermediaries relaying information between your database client application and your CrateDB cluster. For example, HTTP proxy servers or load balancers not optimally configured for performance, or web application firewalls and intrusion prevention systems may hamper HTTP communication, sometimes in subtle ways, for example based on request size constraints, or throttling mechanisms. If you are working with very busy systems, and hosting it on shared infrastructure, details like SNAT port exhaustion may also come into play.

You will need to determine a good chunk size by running corresponding experiments on your own behalf. For that purpose, you can use the insert_pandas.py program as a blueprint.

It is a good idea to start your explorations with a chunk size of 5_000, and then see if performance improves when you increase or decrease that figure. People are reporting that 10_000-20_000 is their optimal setting, but if you process, for example, just three “small” columns, you may also experiment with leveling up to 200_000, because the chunksize should not be too small. If it is too small, the I/O cost will be too high to overcome the benefit of batching.

In order to learn more about what wide- vs. long-form (tidy, stacked, narrow) data means in the context of DataFrame computing, let us refer you to a general introduction, the corresponding section in the Data Computing book, and a pandas tutorial about the same topic.

Efficient INSERT operations with Dask

The same bulk_insert function presented in the previous section will also be used in the context of Dask, in order to make the dask.dataframe.to_sql() method more efficiently, based on the CrateDB bulk operations endpoint.

The example below will partition your insert workload into equal-sized parts, and schedule it to be executed on Dask cluster resources, using a defined number of compute partitions. Each worker instance will then insert its partition’s records in a batched/chunked manner, using a defined chunk size, effectively using the pandas implementation introduced in the previous section.

>>> import dask.dataframe as dd
>>> from crate.client.sqlalchemy.support import insert_bulk
>>> from pueblo.testing.pandas import makeTimeDataFrame
...
>>> # Define the number of records, the number of computing partitions,
>>> # and the chunk size of each database insert operation.
>>> INSERT_RECORDS = 100
>>> NPARTITIONS = 4
>>> CHUNK_SIZE = 25
...
>>> # Create a Dask DataFrame.
>>> df = makeTimeDataFrame(nper=INSERT_RECORDS, freq="S")
>>> ddf = dd.from_pandas(df, npartitions=NPARTITIONS)
...
>>> # Insert content of DataFrame using multiple workers on a
>>> # compute cluster, transferred using batches of records.
>>> ddf.to_sql(
...     name="test-testdrive",
...     uri=f"crate://{crate_host}",
...     if_exists="replace",
...     index=False,
...     chunksize=CHUNK_SIZE,
...     method=insert_bulk,
...     parallel=True,
... )

Tip

You will observe that optimizing your workload will now also involve determining a good value for the NPARTITIONS argument, based on the capacity and topology of the available compute resources, and based on workload characteristics or policies like peak- vs. balanced- vs. shared-usage. For example, on a machine or cluster fully dedicated to the problem at hand, you may want to use all available processor cores, while on a shared system, this strategy may not be appropriate.

If you want to dedicate all available compute resources on your machine, you may want to use the number of CPU cores as a value to the NPARTITIONS argument. You can find out about the available CPU cores on your machine, for example by running the nproc command in your terminal.

Depending on the implementation and runtime behavior of the compute task, the optimal number of worker processes, determined by the NPARTITIONS argument, also needs to be figured out by running a few test iterations. For that purpose, you can use the insert_dask.py program as a blueprint.

Adjusting this value in both directions is perfectly fine: If you observe that you are overloading the machine, maybe because there are workloads scheduled other than the one you are running, try to reduce the value. If fragments/steps of your implementation involve waiting for network or disk I/O, you may want to increase the number of workers beyond the number of available CPU cores, to increase utilization. On the other hand, you should be wary about not over-committing resources too much, as it may slow your system down.

Before getting more serious with Dask, you are welcome to read and watch the excellent Dask Best Practices and Dask DataFrames Best Practices resources, in order to learn about things to avoid, and beyond. For finding out if your compute workload scheduling is healthy, you can, for example, use Dask’s Dashboard Diagnostics.

Warning

Because the settings assigned in the example above fit together well, the to_sql() instruction will effectively run four insert operations, executed in parallel, and scheduled optimally on the available cluster resources.

However, not using those settings sensibly, you can easily misconfigure the resource scheduling system, and overload the underlying hardware or operating system, virtualized or not. This is why experimenting with different parameters, and a real dataset, is crucial.