dask.dataframe.DataFrame.reduction
dask.dataframe.DataFrame.reduction¶
- DataFrame.reduction(chunk, aggregate=None, combine=None, meta=_NoDefault.no_default, token=None, split_every=None, chunk_kwargs=None, aggregate_kwargs=None, combine_kwargs=None, **kwargs)¶
Generic row-wise reductions.
- Parameters
- chunkcallable
Function to operate on each partition. Should return a
pandas.DataFrame
,pandas.Series
, or a scalar.- aggregatecallable, optional
Function to operate on the concatenated result of
chunk
. If not specified, defaults tochunk
. Used to do the final aggregation in a tree reduction.The input to
aggregate
depends on the output ofchunk
. If the output ofchunk
is a:scalar: Input is a Series, with one row per partition.
Series: Input is a DataFrame, with one row per partition. Columns are the rows in the output series.
DataFrame: Input is a DataFrame, with one row per partition. Columns are the columns in the output dataframes.
Should return a
pandas.DataFrame
,pandas.Series
, or a scalar.- combinecallable, optional
Function to operate on intermediate concatenated results of
chunk
in a tree-reduction. If not provided, defaults toaggregate
. The input/output requirements should match that ofaggregate
described above.- metapd.DataFrame, pd.Series, dict, iterable, tuple, optional
An empty
pd.DataFrame
orpd.Series
that matches the dtypes and column names of the output. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of aDataFrame
, adict
of{name: dtype}
or iterable of(name, dtype)
can be provided (note that the order of the names should match the order of the columns). Instead of a series, a tuple of(name, dtype)
can be used. If not provided, dask will try to infer the metadata. This may lead to unexpected results, so providingmeta
is recommended. For more information, seedask.dataframe.utils.make_meta
.- tokenstr, optional
The name to use for the output keys.
- split_everyint, optional
Group partitions into groups of this size while performing a tree-reduction. If set to False, no tree-reduction will be used, and all intermediates will be concatenated and passed to
aggregate
. Default is 8.- chunk_kwargsdict, optional
Keyword arguments to pass on to
chunk
only.- aggregate_kwargsdict, optional
Keyword arguments to pass on to
aggregate
only.- combine_kwargsdict, optional
Keyword arguments to pass on to
combine
only.- kwargs
All remaining keywords will be passed to
chunk
,combine
, andaggregate
.
Examples
>>> import pandas as pd >>> import dask.dataframe as dd >>> df = pd.DataFrame({'x': range(50), 'y': range(50, 100)}) >>> ddf = dd.from_pandas(df, npartitions=4)
Count the number of rows in a DataFrame. To do this, count the number of rows in each partition, then sum the results:
>>> res = ddf.reduction(lambda x: x.count(), ... aggregate=lambda x: x.sum()) >>> res.compute() x 50 y 50 dtype: int64
Count the number of rows in a Series with elements greater than or equal to a value (provided via a keyword).
>>> def count_greater(x, value=0): ... return (x >= value).sum() >>> res = ddf.x.reduction(count_greater, aggregate=lambda x: x.sum(), ... chunk_kwargs={'value': 25}) >>> res.compute() 25
Aggregate both the sum and count of a Series at the same time:
>>> def sum_and_count(x): ... return pd.Series({'count': x.count(), 'sum': x.sum()}, ... index=['count', 'sum']) >>> res = ddf.x.reduction(sum_and_count, aggregate=lambda x: x.sum()) >>> res.compute() count 50 sum 1225 dtype: int64
Doing the same, but for a DataFrame. Here
chunk
returns a DataFrame, meaning the input toaggregate
is a DataFrame with an index with non-unique entries for both ‘x’ and ‘y’. We groupby the index, and sum each group to get the final result.>>> def sum_and_count(x): ... return pd.DataFrame({'count': x.count(), 'sum': x.sum()}, ... columns=['count', 'sum']) >>> res = ddf.reduction(sum_and_count, ... aggregate=lambda x: x.groupby(level=0).sum()) >>> res.compute() count sum x 50 1225 y 50 3725