dask.dataframe.from_map
dask.dataframe.from_map¶
- dask.dataframe.from_map(func, *iterables, args=None, meta=None, divisions=None, label=None, token=None, enforce_metadata=True, **kwargs)[source]¶
Create a DataFrame collection from a custom function map
WARNING: The
from_map
API is experimental, and stability is not yet guaranteed. Use at your own risk!- Parameters
- funccallable
Function used to create each partition. If
func
satisfies theDataFrameIOFunction
protocol, column projection will be enabled.- *iterablesIterable objects
Iterable objects to map to each output partition. All iterables must be the same length. This length determines the number of partitions in the output collection (only one element of each iterable will be passed to
func
for each partition).- argslist or tuple, optional
Positional arguments to broadcast to each output partition. Note that these arguments will always be passed to
func
after theiterables
positional arguments.- metapd.DataFrame, pd.Series, dict, iterable, tuple, optional
An empty
pd.DataFrame
orpd.Series
that matches the dtypes and column names of the output. This metadata is necessary for many algorithms in dask dataframe to work. For ease of use, some alternative inputs are also available. Instead of aDataFrame
, adict
of{name: dtype}
or iterable of(name, dtype)
can be provided (note that the order of the names should match the order of the columns). Instead of a series, a tuple of(name, dtype)
can be used. If not provided, dask will try to infer the metadata. This may lead to unexpected results, so providingmeta
is recommended. For more information, seedask.dataframe.utils.make_meta
.- divisionstuple, str, optional
Partition boundaries along the index. For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitions For string ‘sorted’ will compute the delayed values to find index values. Assumes that the indexes are mutually sorted. If None, then won’t use index information
- labelstr, optional
String to use as the function-name label in the output collection-key names.
- tokenstr, optional
String to use as the “token” in the output collection-key names.
- enforce_metadatabool, default True
Whether to enforce at runtime that the structure of the DataFrame produced by
func
actually matches the structure ofmeta
. This will rename and reorder columns for each partition, and will raise an error if this doesn’t work, but it won’t raise if dtypes don’t match.- **kwargs:
Key-word arguments to broadcast to each output partition. These same arguments will be passed to
func
for every output partition.
See also
dask.dataframe.from_delayed
dask.layers.DataFrameIOLayer
Examples
>>> import pandas as pd >>> import dask.dataframe as dd >>> func = lambda x, size=0: pd.Series([x] * size) >>> inputs = ["A", "B"] >>> dd.from_map(func, inputs, size=2).compute() 0 A 1 A 0 B 1 B dtype: object
This API can also be used as an alternative to other file-based IO functions, like
read_parquet
(which are already justfrom_map
wrapper functions):>>> import pandas as pd >>> import dask.dataframe as dd >>> paths = ["0.parquet", "1.parquet", "2.parquet"] >>> dd.from_map(pd.read_parquet, paths).head() name timestamp 2000-01-01 00:00:00 Laura 2000-01-01 00:00:01 Oliver 2000-01-01 00:00:02 Alice 2000-01-01 00:00:03 Victor 2000-01-01 00:00:04 Bob
Since
from_map
allows you to map an arbitrary function to any number of iterable objects, it can be a very convenient means of implementing functionality that may be missing from from other DataFrame-creation methods. For example, if you happen to have apriori knowledge about the number of rows in each of the files in a dataset, you can generate a DataFrame collection with a global RangeIndex:>>> import pandas as pd >>> import numpy as np >>> import dask.dataframe as dd >>> paths = ["0.parquet", "1.parquet", "2.parquet"] >>> file_sizes = [86400, 86400, 86400] >>> def func(path, row_offset): ... # Read parquet file and set RangeIndex offset ... df = pd.read_parquet(path) ... return df.set_index( ... pd.RangeIndex(row_offset, row_offset+len(df)) ... ) >>> def get_ddf(paths, file_sizes): ... offsets = [0] + list(np.cumsum(file_sizes)) ... return dd.from_map( ... func, paths, offsets[:-1], divisions=offsets ... ) >>> ddf = get_ddf(paths, file_sizes) >>> ddf.index Dask Index Structure: npartitions=3 0 int64 86400 ... 172800 ... 259200 ... dtype: int64 Dask Name: myfunc, 6 tasks