dask.dataframe.Series.map_overlap
dask.dataframe.Series.map_overlap¶
- Series.map_overlap(func, before, after, *args, **kwargs)¶
Apply a function to each partition, sharing rows with adjacent partitions.
This can be useful for implementing windowing functions such as
df.rolling(...).mean()ordf.diff().- Parameters
- funcfunction
Function applied to each partition.
- beforeint, timedelta or string timedelta
The rows to prepend to partition
ifrom the end of partitioni - 1.- afterint, timedelta or string timedelta
The rows to append to partition
ifrom the beginning of partitioni + 1.- args, kwargs
Positional and keyword arguments to pass to the function. Positional arguments are computed on a per-partition basis, while keyword arguments are shared across all partitions. The partition itself will be the first positional argument, with all other arguments passed after. Arguments can be
Scalar,Delayed, or regular Python objects. DataFrame-like args (both dask and pandas) will be repartitioned to align (if necessary) before applying the function; seealign_dataframesto control this behavior.- enforce_metadatabool, default True
Whether to enforce at runtime that the structure of the DataFrame produced by
funcactually matches the structure ofmeta. This will rename and reorder columns for each partition, and will raise an error if this doesn’t work, but it won’t raise if dtypes don’t match.- transform_divisionsbool, default True
Whether to apply the function onto the divisions and apply those transformed divisions to the output.
- align_dataframesbool, default True
Whether to repartition DataFrame- or Series-like args (both dask and pandas) so their divisions align before applying the function. This requires all inputs to have known divisions. Single-partition inputs will be split into multiple partitions.
If False, all inputs must have either the same number of partitions or a single partition. Single-partition inputs will be broadcast to every partition of multi-partition inputs.
- metapd.DataFrame, pd.Series, dict, iterable, tuple, optional
An empty
pd.DataFrameorpd.Seriesthat matches the dtypes and column names of the output. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of aDataFrame, adictof{name: dtype}or iterable of(name, dtype)can be provided (note that the order of the names should match the order of the columns). Instead of a series, a tuple of(name, dtype)can be used. If not provided, dask will try to infer the metadata. This may lead to unexpected results, so providingmetais recommended. For more information, seedask.dataframe.utils.make_meta.
Notes
Given positive integers
beforeandafter, and a functionfunc,map_overlapdoes the following:Prepend
beforerows to each partitionifrom the end of partitioni - 1. The first partition has no rows prepended.Append
afterrows to each partitionifrom the beginning of partitioni + 1. The last partition has no rows appended.Apply
functo each partition, passing in any extraargsandkwargsif provided.Trim
beforerows from the beginning of all but the first partition.Trim
afterrows from the end of all but the last partition.
Examples
Given a DataFrame, Series, or Index, such as:
>>> import pandas as pd >>> import dask.dataframe as dd >>> df = pd.DataFrame({'x': [1, 2, 4, 7, 11], ... 'y': [1., 2., 3., 4., 5.]}) >>> ddf = dd.from_pandas(df, npartitions=2)
A rolling sum with a trailing moving window of size 2 can be computed by overlapping 2 rows before each partition, and then mapping calls to
df.rolling(2).sum():>>> ddf.compute() x y 0 1 1.0 1 2 2.0 2 4 3.0 3 7 4.0 4 11 5.0 >>> ddf.map_overlap(lambda df: df.rolling(2).sum(), 2, 0).compute() x y 0 NaN NaN 1 3.0 3.0 2 6.0 5.0 3 11.0 7.0 4 18.0 9.0
The pandas
diffmethod computes a discrete difference shifted by a number of periods (can be positive or negative). This can be implemented by mapping calls todf.diffto each partition after prepending/appending that many rows, depending on sign:>>> def diff(df, periods=1): ... before, after = (periods, 0) if periods > 0 else (0, -periods) ... return df.map_overlap(lambda df, periods=1: df.diff(periods), ... periods, 0, periods=periods) >>> diff(ddf, 1).compute() x y 0 NaN NaN 1 1.0 1.0 2 2.0 1.0 3 3.0 1.0 4 4.0 1.0
If you have a
DatetimeIndex, you can use apd.Timedeltafor time- based windows or anypd.Timedeltaconvertible string:>>> ts = pd.Series(range(10), index=pd.date_range('2017', periods=10)) >>> dts = dd.from_pandas(ts, npartitions=2) >>> dts.map_overlap(lambda df: df.rolling('2D').sum(), ... pd.Timedelta('2D'), 0).compute() 2017-01-01 0.0 2017-01-02 1.0 2017-01-03 3.0 2017-01-04 5.0 2017-01-05 7.0 2017-01-06 9.0 2017-01-07 11.0 2017-01-08 13.0 2017-01-09 15.0 2017-01-10 17.0 Freq: D, dtype: float64