Using Hive Partitioning with Dask
Contents
Using Hive Partitioning with Dask¶
It is sometimes useful to write your dataset with a hive-like directory scheme.
For example, if your dataframe contains 'year'
and 'semester'
columns,
a hive-based directory structure might look something like the following:
output-path/
├── year=2022/
│ ├── semester=fall/
│ │ └── part.0.parquet
│ └── semester=spring/
│ ├── part.0.parquet
│ └── part.1.parquet
└── year=2023/
└── semester=fall/
└── part.1.parquet
The use of this self-describing structure implies that all rows within
the 'output-path/year=2022/semester=fall/'
directory will contain
the value 2022
in the 'year'
column and the value 'fall'
in the 'semester'
column.
The primary advantage of generating a hive-partitioned dataset
is that certain IO filters can be applied by read_parquet()
without the need to parse any file metadata. In other words,
the following command will typically be faster when the dataset
is already hive-partitioned on the 'year'
column.
>>> dd.read_parquet("output-path", filters=[("year", ">", 2022)])
Writing Parquet Data with Hive Partitioning¶
Dask’s to_parquet()
function will produce a hive-partitioned
directory scheme automatically when the partition_on
option is used.
>>> df.to_parquet("output-path", partition_on=["year", "semester"])
>>> os.listdir("output-path")
["year=2022", "year=2023"]
>>> os.listdir("output-path/year=2022")
["semester=fall", "semester=spring"]
>>> os.listdir("output-path/year=2022/semester=spring")
['part.0.parquet', 'part.1.parquet']
It is important to recognize that Dask will not aggregate the
data files written within each of the leaf directories. This is
because each of the DataFrame partitions is written independently
during the execution of the to_parquet()
task graph. In order
to write out data for partition i, the partition-i write task will
perform a groupby operation on columns ["year", "semester"]
,
and then each distinct group will be written to the corresponding
directory using the file name 'part.{i}.parquet'
. Therefore, it
is possible for a hive-partitioned write to produce a large number
of files in every leaf directory (one for each DataFrame partition).
If your application requires you to produce a single parquet file
for each hive partition, one possible solution is to sort or shuffle
on the partitioning columns before calling to_parquet()
.
>>> partition_on = ["year", "semester"]
>>> df.shuffle(on=partition_on).to_parquet(partition_on=partition_on)
Using a global shuffle like this is extremely expensive, and should be avoided whenever possible. However, it is also guaranteed to produce the minimum number of files, which may be worth the sacrifice at times.
Reading Parquet Data with Hive Partitioning¶
In most cases, read_parquet()
will process hive-partitioned
data automatically. By default, all hive-partitioned columns will
be interpreted as categorical columns.
>>> ddf = dd.read_parquet("output-path", columns=["year", "semester"])
>>> ddf
Dask DataFrame Structure:
year semester
npartitions=4
category[known] category[known]
... ...
... ...
... ...
... ...
Dask Name: read-parquet, 1 graph layer
>>> ddf.compute()
year semester
0 2022 fall
1 2022 fall
2 2022 fall
3 2022 spring
4 2022 spring
5 2022 spring
6 2023 fall
7 2023 fall
Defining a Custom Partitioning Schema¶
When utilizing engine='pyarrow'
, it is possible to specify a custom
schema for the hive-partitioned columns. The columns will then be read
using the specified types and not as category.
>>> schema = pa.schema([("year", pa.int16()), ("semester", pa.string())])
>>> ddf2 = dd.read_parquet(
... path,
... columns=["year", "semester"],
... dataset={"partitioning": {"flavor": "hive", "schema": schema}}
... )
Dask DataFrame Structure:
year semester
npartitions=4
int16 object
... ...
... ...
... ...
... ...
If any of your hive-partitioned columns contain null values, you must specify the partitioning schema in this way.
Although it is not required, we also recommend that you specify
the partitioning schema if you need to partition on high-cardinality
columns. This is because the default 'category'
dtype will
track the known categories in a way that can significantly increase
the overall memory footprint of your Dask collection. In fact,
read_parquet()
already clears the “known categories” of other
columns for this same reason (see Categoricals).
Best Practices¶
Although hive partitioning can sometimes improve read performance by simplifying filtering, it can also lead to degraded performance and errors in other cases.
Avoid High Cardinality¶
A good rule of thumb is to avoid partitioning on float columns, or any column containing many unique values (i.e. high cardinality).
The most common cause of poor user experience with hive partitioning is high-cardinality of the partitioning column(s). For example, if you try to partition on a column with millions of unique values, then :func:`to_parquet` will need to generate millions of directories. The management of these directories is likely to put strain on the file system, and the need for many small files within each directory is sure to compound the issue.
Use Simple Data Types for Partitioning¶
Since hive-partitioned data is “self describing,” we suggest that you avoid partitioning on complex data types, and opt for integer or string-based data types whenever possible. If your data type cannot be easily inferred from the string value used to define the directory name, then the IO engine may struggle to parse the values.
For example, directly partitioning on a column with a datetime64
dtype might produce a directory name like the following:
output-path/
├── date=2022-01-01 00:00:00/
├── date=2022-02-01 00:00:00/
├── ...
└── date=2022-12-01 00:00:00/
These directory names will not be correctly interpreted as
datetime64
values, and are even considered illegal on Windows
systems. For more-reliable behavior, we recommend that such a column
be decomposed into one or more “simple” columns. For example, one
could easily use 'date'
to construct 'year'
, 'month'
,
and 'day'
columns (as needed).
Aggregate Files at Read Time¶
Warning
The aggregate_files
argument is currently listed as
experimental. However, there are currently no plans to remove
the argument or change it’s behavior in a future release.
Since hive-partitioning will typically produce a large number of
small files, read_parquet()
performance will usually benefit
from proper usage of the aggregate_files
argument. Take the
following dataset for example:
dataset-path/
├── region=1/
│ ├── section=a/
│ │ └── 01.parquet
│ │ └── 02.parquet
│ │ └── 03.parquet
│ ├── section=b/
│ └── └── 04.parquet
│ └── └── 05.parquet
└── region=2/
├── section=a/
│ ├── 06.parquet
│ ├── 07.parquet
│ ├── 08.parquet
If we set aggregate_files=True
for this case, we are telling Dask
that any of the parquet data files may be aggregated into the same output
DataFrame partition. If, instead, we specify the name of a partitioning
column (e.g. 'region'
or 'section'
), we allow the aggregation of
any two files sharing a file path up to, and including, the corresponding
directory name. For example, if aggregate_files
is set to 'section'
,
04.parquet
and 05.parquet
may be aggregated together, but
03.parquet
and 04.parquet
cannot be. If, however, aggregate_files
is set to 'region'
, 04.parquet
may be aggregated with 05.parquet
,
and 03.parquet
may be aggregated with 04.parquet
.
Using aggregate_files
will typically improve performance by making it
more likely for DataFrame partitions to approach the size specified by the
blocksize
argument. In contrast, default behavior may produce a large
number of partitions that are much smaller than blocksize
.