API¶
Top level user functions:
Bag (dsk, name, npartitions) |
Parallel collection of Python objects |
Bag.all ([split_every]) |
Are all elements truthy? |
Bag.any ([split_every]) |
Are any of the elements truthy? |
Bag.compute (**kwargs) |
Compute this dask collection |
Bag.concat () |
Concatenate nested lists into one long list |
Bag.count ([split_every]) |
Count the number of elements |
Bag.distinct () |
Distinct elements of collection |
Bag.filter (predicate) |
Filter elements in collection by a predicate function |
Bag.fold (binop[, combine, initial, split_every]) |
Parallelizable reduction |
Bag.foldby (key, binop[, initial, combine, ...]) |
Combined reduction and groupby |
Bag.frequencies ([split_every]) |
Count number of occurrences of each distinct element |
Bag.groupby (grouper[, method, npartitions, ...]) |
Group collection by key function |
Bag.join (other, on_self[, on_other]) |
Join collection with another collection |
Bag.map (func, **kwargs) |
Map a function across all elements in collection |
Bag.map_partitions (func, **kwargs) |
Apply function to every partition within collection |
Bag.max ([split_every]) |
Maximum element |
Bag.mean () |
Arithmetic mean |
Bag.min ([split_every]) |
Minimum element |
Bag.pluck (key[, default]) |
Select item from all tuples/dicts in collection |
Bag.product (other) |
Cartesian product between two bags |
Bag.reduction (perpartition, aggregate[, ...]) |
Reduce collection with reduction operators |
Bag.random_sample (prob[, random_state]) |
Return elements from bag with probability of prob . |
Bag.remove (predicate) |
Remove elements in collection that match predicate |
Bag.repartition (npartitions) |
Coalesce bag into fewer partitions |
Bag.std ([ddof]) |
Standard deviation |
Bag.sum ([split_every]) |
Sum all elements |
Bag.take (k[, npartitions, compute]) |
Take the first k elements |
Bag.to_dataframe ([columns]) |
Create Dask Dataframe from a Dask Bag |
Bag.to_delayed () |
Convert bag to list of dask Delayed |
Bag.to_textfiles (path[, name_function, ...]) |
Write bag to disk, one filename per partition, one line per element |
Bag.topk (k[, key, split_every]) |
K largest elements in collection |
Bag.var ([ddof]) |
Variance |
Bag.visualize ([filename, format, optimize_graph]) |
Render the computation of this object’s task graph using graphviz. |
Create Bags¶
from_sequence (seq[, partition_size, npartitions]) |
Create dask from Python sequence |
from_delayed (values) |
Create bag from many dask.delayed objects |
read_text (urlpath[, blocksize, compression, ...]) |
Read lines from text files |
from_url (urls) |
Create a dask.bag from a url |
range (n, npartitions) |
Numbers from zero to n |
concat (bags) |
Concatenate many bags together, unioning all elements |
zip (*bags) |
Partition-wise bag zip |
Turn Bags into other things¶
Bag.to_textfiles (path[, name_function, ...]) |
Write bag to disk, one filename per partition, one line per element |
Bag.to_dataframe ([columns]) |
Create Dask Dataframe from a Dask Bag |
Bag methods¶
-
class
dask.bag.
Bag
(dsk, name, npartitions)¶ Parallel collection of Python objects
Examples
Create Bag from sequence
>>> import dask.bag as db >>> b = db.from_sequence(range(5)) >>> list(b.filter(lambda x: x % 2 == 0).map(lambda x: x * 10)) [0, 20, 40]
Create Bag from filename or globstring of filenames
>>> b = db.read_text('/path/to/mydata.*.json.gz').map(json.loads)
Create manually (expert use)
>>> dsk = {('x', 0): (range, 5), ... ('x', 1): (range, 5), ... ('x', 2): (range, 5)} >>> b = Bag(dsk, 'x', npartitions=3)
>>> sorted(b.map(lambda x: x * 10)) [0, 0, 0, 10, 10, 10, 20, 20, 20, 30, 30, 30, 40, 40, 40]
>>> int(b.fold(lambda x, y: x + y)) 30
-
accumulate
(binop, initial='__no__default__')¶ Repeatedly apply binary function to a sequence, accumulating results.
This assumes that the bag is ordered. While this is typically the case not all Dask.bag functions preserve this property.
Examples
>>> from operator import add >>> b = from_sequence([1, 2, 3, 4, 5], npartitions=2) >>> b.accumulate(add).compute() [1, 3, 6, 10, 15]
Accumulate also takes an optional argument that will be used as the first value.
>>> b.accumulate(add, -1) [-1, 0, 2, 5, 9, 15]
-
all
(split_every=None)¶ Are all elements truthy?
-
any
(split_every=None)¶ Are any of the elements truthy?
-
concat
()¶ Concatenate nested lists into one long list
>>> b = from_sequence([[1], [2, 3]]) >>> list(b) [[1], [2, 3]]
>>> list(b.concat()) [1, 2, 3]
-
count
(split_every=None)¶ Count the number of elements
-
distinct
()¶ Distinct elements of collection
Unordered without repeats.
>>> b = from_sequence(['Alice', 'Bob', 'Alice']) >>> sorted(b.distinct()) ['Alice', 'Bob']
-
filter
(predicate)¶ Filter elements in collection by a predicate function
>>> def iseven(x): ... return x % 2 == 0
>>> import dask.bag as db >>> b = db.from_sequence(range(5)) >>> list(b.filter(iseven)) [0, 2, 4]
-
fold
(binop, combine=None, initial='__no__default__', split_every=None)¶ Parallelizable reduction
Fold is like the builtin function
reduce
except that it works in parallel. Fold takes two binary operator functions, one to reduce each partition of our dataset and another to combine results between partitionsbinop
: Binary operator to reduce within each partitioncombine
: Binary operator to combine results from binop
Sequentially this would look like the following:
>>> intermediates = [reduce(binop, part) for part in partitions] >>> final = reduce(combine, intermediates)
If only one function is given then it is used for both functions
binop
andcombine
as in the following example to compute the sum:>>> def add(x, y): ... return x + y
>>> b = from_sequence(range(5)) >>> b.fold(add).compute() 10
In full form we provide both binary operators as well as their default arguments
>>> b.fold(binop=add, combine=add, initial=0).compute() 10
More complex binary operators are also doable
>>> def add_to_set(acc, x): ... ''' Add new element x to set acc ''' ... return acc | set([x]) >>> b.fold(add_to_set, set.union, initial=set()).compute() {1, 2, 3, 4, 5}
See also
-
foldby
(key, binop, initial='__no__default__', combine=None, combine_initial='__no__default__')¶ Combined reduction and groupby
Foldby provides a combined groupby and reduce for efficient parallel split-apply-combine tasks.
The computation
>>> b.foldby(key, binop, init)
is equivalent to the following:
>>> def reduction(group): ... return reduce(binop, group, init)
>>> b.groupby(key).map(lambda (k, v): (k, reduction(v)))
But uses minimal communication and so is much faster.
>>> b = from_sequence(range(10)) >>> iseven = lambda x: x % 2 == 0 >>> add = lambda x, y: x + y >>> dict(b.foldby(iseven, add)) {True: 20, False: 25}
Key Function
The key function determines how to group the elements in your bag. In the common case where your bag holds dictionaries then the key function often gets out one of those elements.
>>> def key(x): ... return x['name']
This case is so common that it is special cased, and if you provide a key that is not a callable function then dask.bag will turn it into one automatically. The following are equivalent:
>>> b.foldby(lambda x: x['name'], ...) >>> b.foldby('name', ...)
Binops
It can be tricky to construct the right binary operators to perform analytic queries. The
foldby
method accepts two binary operators,binop
andcombine
. Binary operators two inputs and output must have the same type.Binop takes a running total and a new element and produces a new total:
>>> def binop(total, x): ... return total + x['amount']
Combine takes two totals and combines them:
>>> def combine(total1, total2): ... return total1 + total2
Each of these binary operators may have a default first value for total, before any other value is seen. For addition binary operators like above this is often
0
or the identity element for your operation.>>> b.foldby('name', binop, 0, combine, 0)
See also
toolz.reduceby
,pyspark.combineByKey
-
frequencies
(split_every=None)¶ Count number of occurrences of each distinct element
>>> b = from_sequence(['Alice', 'Bob', 'Alice']) >>> dict(b.frequencies()) {'Alice': 2, 'Bob', 1}
-
groupby
(grouper, method=None, npartitions=None, blocksize=1048576, max_branch=None)¶ Group collection by key function
This requires a full dataset read, serialization and shuffle. This is expensive. If possible you should use
foldby
.Parameters: grouper: function
Function on which to group elements
method: str
Either ‘disk’ for an on-disk shuffle or ‘tasks’ to use the task scheduling framework. Use ‘disk’ if you are on a single machine and ‘tasks’ if you are on a distributed cluster.
npartitions: int
If using the disk-based shuffle, the number of output partitions
blocksize: int
If using the disk-based shuffle, the size of shuffle blocks
max_branch: int
If using the task-based shuffle, the amount of splitting each partition undergoes. Increase this for fewer copies but more scheduler overhead.
See also
Examples
>>> b = from_sequence(range(10)) >>> iseven = lambda x: x % 2 == 0 >>> dict(b.groupby(iseven)) {True: [0, 2, 4, 6, 8], False: [1, 3, 5, 7, 9]}
-
join
(other, on_self, on_other=None)¶ Join collection with another collection
Other collection must be an Iterable, and not a Bag.
>>> people = from_sequence(['Alice', 'Bob', 'Charlie']) >>> fruit = ['Apple', 'Apricot', 'Banana'] >>> list(people.join(fruit, lambda x: x[0])) [('Apple', 'Alice'), ('Apricot', 'Alice'), ('Banana', 'Bob')]
-
map
(func, **kwargs)¶ Map a function across all elements in collection
>>> import dask.bag as db >>> b = db.from_sequence(range(5)) >>> list(b.map(lambda x: x * 10)) [0, 10, 20, 30, 40]
Keyword arguments are passed through to
func
. These can be eitherdask.bag.Item
, or normal python objects.Examples
>>> import dask.bag as db >>> b = db.from_sequence(range(1, 101), npartitions=10) >>> def div(num, den=1): ... return num / den
Using a python object:
>>> hi = b.max().compute() >>> hi 100 >>> b.map(div, den=hi).take(5) (0.01, 0.02, 0.03, 0.04, 0.05)
Using an
Item
:>>> b.map(div, den=b.max()).take(5) (0.01, 0.02, 0.03, 0.04, 0.05)
Note that while both versions give the same output, the second forms a single graph, and then computes everything at once, and in some cases may be more efficient.
-
map_partitions
(func, **kwargs)¶ Apply function to every partition within collection
Note that this requires you to understand how dask.bag partitions your data and so is somewhat internal.
>>> b.map_partitions(myfunc)
Keyword arguments are passed through to
func
. These can be eitherdask.bag.Item
, or normal python objects.Examples
>>> import dask.bag as db >>> b = db.from_sequence(range(1, 101), npartitions=10) >>> def div(nums, den=1): ... return [num / den for num in nums]
Using a python object:
>>> hi = b.max().compute() >>> hi 100 >>> b.map_partitions(div, den=hi).take(5) (0.01, 0.02, 0.03, 0.04, 0.05)
Using an
Item
:>>> b.map_partitions(div, den=b.max()).take(5) (0.01, 0.02, 0.03, 0.04, 0.05)
Note that while both versions give the same output, the second forms a single graph, and then computes everything at once, and in some cases may be more efficient.
-
max
(split_every=None)¶ Maximum element
-
mean
()¶ Arithmetic mean
-
min
(split_every=None)¶ Minimum element
-
pluck
(key, default='__no__default__')¶ Select item from all tuples/dicts in collection
>>> b = from_sequence([{'name': 'Alice', 'credits': [1, 2, 3]}, ... {'name': 'Bob', 'credits': [10, 20]}]) >>> list(b.pluck('name')) ['Alice', 'Bob'] >>> list(b.pluck('credits').pluck(0)) [1, 10]
-
product
(other)¶ Cartesian product between two bags
-
random_sample
(prob, random_state=None)¶ Return elements from bag with probability of
prob
.Parameters: prob : float
A float between 0 and 1, representing the probability that each element will be returned.
random_state : int or random.Random, optional
If an integer, will be used to seed a new
random.Random
object. If provided, results in deterministic sampling.Examples
>>> import dask.bag as db >>> b = db.from_sequence(range(5)) >>> list(b.random_sample(0.5, 42)) [1, 4] >>> list(b.random_sample(0.5, 42)) [1, 4]
-
reduction
(perpartition, aggregate, split_every=None, out_type=<class 'dask.bag.core.Item'>, name=None)¶ Reduce collection with reduction operators
Parameters: perpartition: function
reduction to apply to each partition
aggregate: function
reduction to apply to the results of all partitions
split_every: int (optional)
Group partitions into groups of this size while performing reduction Defaults to 8
out_type: {Bag, Item}
The out type of the result, Item if a single element, Bag if a list of elements. Defaults to Item.
Examples
>>> b = from_sequence(range(10)) >>> b.reduction(sum, sum).compute() 45
-
remove
(predicate)¶ Remove elements in collection that match predicate
>>> def iseven(x): ... return x % 2 == 0
>>> import dask.bag as db >>> b = db.from_sequence(range(5)) >>> list(b.remove(iseven)) [1, 3]
-
repartition
(npartitions)¶ Coalesce bag into fewer partitions
Examples
>>> b.repartition(5) # set to have 5 partitions
-
std
(ddof=0)¶ Standard deviation
-
str
¶ String processing functions
Examples
>>> import dask.bag as db >>> b = db.from_sequence(['Alice Smith', 'Bob Jones', 'Charlie Smith']) >>> list(b.str.lower()) ['alice smith', 'bob jones', 'charlie smith']
>>> list(b.str.match('*Smith')) ['Alice Smith', 'Charlie Smith']
>>> list(b.str.split(' ')) [['Alice', 'Smith'], ['Bob', 'Jones'], ['Charlie', 'Smith']]
-
sum
(split_every=None)¶ Sum all elements
-
take
(k, npartitions=1, compute=True)¶ Take the first k elements
Parameters: k : int
The number of elements to return
npartitions : int, optional
Elements are only taken from the first
npartitions
, with a default of 1. If there are fewer thank
rows in the firstnpartitions
a warning will be raised and any found rows returned. Pass -1 to use all partitions.compute : bool, optional
Whether to compute the result, default is True.
>>> b = from_sequence(range(10))
>>> b.take(3) # doctest: +SKIP
(0, 1, 2)
-
to_dataframe
(columns=None)¶ Create Dask Dataframe from a Dask Bag
Bag should contain tuples, dict records, or scalars.
Index will not be particularly meaningful. Use
reindex
afterwards if necessary.Parameters: columns : pandas.DataFrame or list, optional
If a
pandas.DataFrame
, it should mirror the column names and dtypes of the output dataframe. If a list, it provides the desired column names. If not provided or a list, a single element from the first partition will be computed, triggering a potentially expensive call tocompute
. Providing a list is only useful for selecting subset of columns, to avoid an internal compute call you must provide apandas.DataFrame
as dask requires dtype knowledge ahead of time.Examples
>>> import dask.bag as db >>> b = db.from_sequence([{'name': 'Alice', 'balance': 100}, ... {'name': 'Bob', 'balance': 200}, ... {'name': 'Charlie', 'balance': 300}], ... npartitions=2) >>> df = b.to_dataframe()
>>> df.compute() balance name 0 100 Alice 1 200 Bob 0 300 Charlie
-
to_delayed
()¶ Convert bag to list of dask Delayed
Returns list of Delayed, one per partition.
-
to_textfiles
(path, name_function=None, compression='infer', encoding='utf-8', compute=True)¶ Write bag to disk, one filename per partition, one line per element
Paths: This will create one file for each partition in your bag. You can specify the filenames in a variety of ways.
Use a globstring
>>> b.to_textfiles('/path/to/data/*.json.gz')
The * will be replaced by the increasing sequence 1, 2, ...
/path/to/data/0.json.gz /path/to/data/1.json.gz
Use a globstring and a
name_function=
keyword argument. The name_function function should expect an integer and produce a string. Strings produced by name_function must preserve the order of their respective partition indices.>>> from datetime import date, timedelta >>> def name(i): ... return str(date(2015, 1, 1) + i * timedelta(days=1))
>>> name(0) '2015-01-01' >>> name(15) '2015-01-16'
>>> b.to_textfiles('/path/to/data/*.json.gz', name_function=name)
/path/to/data/2015-01-01.json.gz /path/to/data/2015-01-02.json.gz ...
You can also provide an explicit list of paths.
>>> paths = ['/path/to/data/alice.json.gz', '/path/to/data/bob.json.gz', ...] >>> b.to_textfiles(paths)
Compression: Filenames with extensions corresponding to known compression algorithms (gz, bz2) will be compressed accordingly.
Bag Contents: The bag calling
to_textfiles
must be a bag of text strings. For example, a bag of dictionaries could be written to JSON text files by mappingjson.dumps
on to the bag first, and then callingto_textfiles
:>>> b_dict.map(json.dumps).to_textfiles("/path/to/data/*.json")
-
topk
(k, key=None, split_every=None)¶ K largest elements in collection
Optionally ordered by some key function
>>> b = from_sequence([10, 3, 5, 7, 11, 4]) >>> list(b.topk(2)) [11, 10]
>>> list(b.topk(2, lambda x: -x)) [3, 4]
-
unzip
(n)¶ Transform a bag of tuples to
n
bags of their elements.Examples
>>> b = from_sequence([(i, i + 1, i + 2) for i in range(10)]) >>> first, second, third = b.unzip(3) >>> isinstance(first, Bag) True >>> first.compute() [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Note that this is equivalent to:
>>> first, second, third = (b.pluck(i) for i in range(3))
-
var
(ddof=0)¶ Variance
-
Other functions¶
-
dask.bag.
from_sequence
(seq, partition_size=None, npartitions=None)¶ Create dask from Python sequence
This sequence should be relatively small in memory. Dask Bag works best when it handles loading your data itself. Commonly we load a sequence of filenames into a Bag and then use
.map
to open them.Parameters: seq: Iterable
A sequence of elements to put into the dask
partition_size: int (optional)
The length of each partition
npartitions: int (optional)
The number of desired partitions
It is best to provide either ``partition_size`` or ``npartitions``
(though not both.)
See also
read_text
- Create bag from textfiles
Examples
>>> b = from_sequence(['Alice', 'Bob', 'Chuck'], partition_size=2)
-
dask.bag.
from_delayed
(values)¶ Create bag from many dask.delayed objects
These objects will become the partitions of the resulting Bag. They should evaluate to a
list
or some other concrete sequence.Parameters: values: list of delayed values
An iterable of dask Delayed objects. Each evaluating to a list.
Returns: Bag
See also
dask.delayed
Examples
>>> x, y, z = [delayed(load_sequence_from_file)(fn) ... for fn in filenames] >>> b = from_delayed([x, y, z])
-
dask.bag.
read_text
(urlpath, blocksize=None, compression='infer', encoding='utf-8', errors='strict', linedelimiter='\n', collection=True, storage_options=None)¶ Read lines from text files
Parameters: urlpath: string or list
Absolute or relative filepath, URL (may include protocols like
s3://
), globstring, or a list of beforementioned strings.blocksize: None or int
Size to cut up larger files. Streams by default.
compression: string
Compression format like ‘gzip’ or ‘xz’. Defaults to ‘infer’
encoding: string
errors: string
linedelimiter: string
collection: bool, optional
Return dask.bag if True, or list of delayed values if false
storage_options: dict
Extra options that make sense to a particular storage connection, e.g. host, port, username, password, etc.
Returns: dask.bag.Bag if collection is True or list of Delayed lists otherwise
See also
from_sequence
- Build bag from Python sequence
Examples
>>> b = read_text('myfiles.1.txt') >>> b = read_text('myfiles.*.txt') >>> b = read_text('myfiles.*.txt.gz') >>> b = read_text('s3://bucket/myfiles.*.txt') >>> b = read_text('s3://key:secret@bucket/myfiles.*.txt') >>> b = read_text('hdfs://namenode.example.com/myfiles.*.txt')
Parallelize a large file by providing the number of uncompressed bytes to load into each partition.
>>> b = read_text('largefile.txt', blocksize=1e7)
-
dask.bag.
from_url
(urls)¶ Create a dask.bag from a url
Examples
>>> a = from_url('http://raw.githubusercontent.com/dask/dask/master/README.rst') >>> a.npartitions 1
>>> a.take(8) ('Dask\n', '====\n', '\n', '|Build Status| |Coverage| |Doc Status| |Gitter|\n', '\n', 'Dask provides multi-core execution on larger-than-memory datasets using blocked\n', 'algorithms and task scheduling. It maps high-level NumPy and list operations\n', 'on large datasets on to graphs of many operations on small in-memory datasets.\n')
>>> b = from_url(['http://github.com', 'http://google.com']) >>> b.npartitions 2
-
dask.bag.
range
(n, npartitions)¶ Numbers from zero to n
Examples
>>> import dask.bag as db >>> b = db.range(5, npartitions=2) >>> list(b) [0, 1, 2, 3, 4]
-
dask.bag.
concat
(bags)¶ Concatenate many bags together, unioning all elements
>>> import dask.bag as db >>> a = db.from_sequence([1, 2, 3]) >>> b = db.from_sequence([4, 5, 6]) >>> c = db.concat([a, b])
>>> list(c) [1, 2, 3, 4, 5, 6]
-
dask.bag.
zip
(*bags)¶ Partition-wise bag zip
All passed bags must have the same number of partitions.
NOTE: corresponding partitions should have the same length; if they do not, the “extra” elements from the longer partition(s) will be dropped. If you have this case chances are that what you really need is a data alignment mechanism like pandas’s, and not a missing value filler like zip_longest.
Examples
Correct usage:
>>> import dask.bag as db >>> evens = db.from_sequence(range(0, 10, 2), partition_size=4) >>> odds = db.from_sequence(range(1, 10, 2), partition_size=4) >>> pairs = db.zip(evens, odds) >>> list(pairs) [(0, 1), (2, 3), (4, 5), (6, 7), (8, 9)]
Incorrect usage:
>>> numbers = db.range(20) >>> fizz = numbers.filter(lambda n: n % 3 == 0) >>> buzz = numbers.filter(lambda n: n % 5 == 0) >>> fizzbuzz = db.zip(fizz, buzz) >>> list(fizzbuzzz) [(0, 0), (3, 5), (6, 10), (9, 15), (12, 20), (15, 25), (18, 30)]
When what you really wanted was more along the lines of the following:
>>> list(fizzbuzzz) [(0, 0), (3, None), (None, 5), (6, None), (None 10), (9, None), (12, None), (15, 15), (18, None), (None, 20), (None, 25), (None, 30)]