Commit b41c932e authored by Kruyff,D.L.W. (Dylan)'s avatar Kruyff,D.L.W. (Dylan)
Browse files

Redo lost changes

parent 8d4ead19
......@@ -41,6 +41,7 @@ from ..base import tokenize, dont_optimize, DaskMethodsMixin
from ..bytes import open_files
from ..context import globalmethod
from ..core import quote, istask, get_dependencies, reverse_dict, flatten
from ..sizeof import sizeof
from ..delayed import Delayed, unpack_collections
from ..highlevelgraph import HighLevelGraph
from ..multiprocessing import get as mpget
......@@ -56,6 +57,8 @@ from ..utils import (
ensure_bytes,
ensure_unicode,
key_split,
parse_bytes,
iter_chunks,
)
from . import chunk
......@@ -104,7 +107,7 @@ def lazify(dsk):
def inline_singleton_lists(dsk, keys, dependencies=None):
""" Inline lists that are only used once.
"""Inline lists that are only used once.
>>> d = {'b': (list, 'a'),
... 'c': (f, 'b', 1)} # doctest: +SKIP
......@@ -172,7 +175,7 @@ def to_textfiles(
last_endline=False,
**kwargs
):
""" Write dask Bag to disk, one filename per partition, one line per element.
"""Write dask Bag to disk, one filename per partition, one line per element.
**Paths**: This will create one file for each partition in your bag. You
can specify the filenames in a variety of ways.
......@@ -271,7 +274,7 @@ def finalize_item(results):
class StringAccessor(object):
""" String processing functions
"""String processing functions
Examples
--------
......@@ -308,7 +311,7 @@ class StringAccessor(object):
raise
def match(self, pattern):
""" Filter strings by those that match a pattern.
"""Filter strings by those that match a pattern.
Examples
--------
......@@ -363,7 +366,7 @@ class Item(DaskMethodsMixin):
@staticmethod
def from_delayed(value):
""" Create bag item from a dask.delayed value.
"""Create bag item from a dask.delayed value.
See ``dask.bag.from_delayed`` for details
"""
......@@ -410,7 +413,7 @@ class Item(DaskMethodsMixin):
class Bag(DaskMethodsMixin):
""" Parallel collection of Python objects
"""Parallel collection of Python objects
Examples
--------
......@@ -606,7 +609,7 @@ class Bag(DaskMethodsMixin):
self.dask, self.name, self.npartitions = state
def filter(self, predicate):
""" Filter elements in collection by a predicate function.
"""Filter elements in collection by a predicate function.
>>> def iseven(x):
... return x % 2 == 0
......@@ -625,7 +628,7 @@ class Bag(DaskMethodsMixin):
return type(self)(graph, name, self.npartitions)
def random_sample(self, prob, random_state=None):
""" Return elements from bag with probability of ``prob``.
"""Return elements from bag with probability of ``prob``.
Parameters
----------
......@@ -660,7 +663,7 @@ class Bag(DaskMethodsMixin):
return type(self)(graph, name, self.npartitions)
def remove(self, predicate):
""" Remove elements in collection that match predicate.
"""Remove elements in collection that match predicate.
>>> def iseven(x):
... return x % 2 == 0
......@@ -722,7 +725,7 @@ class Bag(DaskMethodsMixin):
return map_partitions(func, self, *args, **kwargs)
def pluck(self, key, default=no_default):
""" Select item from all tuples/dicts in collection.
"""Select item from all tuples/dicts in collection.
>>> b = from_sequence([{'name': 'Alice', 'credits': [1, 2, 3]},
... {'name': 'Bob', 'credits': [10, 20]}])
......@@ -817,7 +820,7 @@ class Bag(DaskMethodsMixin):
def fold(
self, binop, combine=None, initial=no_default, split_every=None, out_type=Item
):
""" Parallelizable reduction
"""Parallelizable reduction
Fold is like the builtin function ``reduce`` except that it works in
parallel. Fold takes two binary operator functions, one to reduce each
......@@ -881,7 +884,7 @@ class Bag(DaskMethodsMixin):
)
def frequencies(self, split_every=None, sort=False):
""" Count number of occurrences of each distinct element.
"""Count number of occurrences of each distinct element.
>>> b = from_sequence(['Alice', 'Bob', 'Alice'])
>>> dict(b.frequencies()) # doctest: +SKIP
......@@ -899,7 +902,7 @@ class Bag(DaskMethodsMixin):
return result
def topk(self, k, key=None, split_every=None):
""" K largest elements in collection
"""K largest elements in collection
Optionally ordered by some key function
......@@ -925,7 +928,7 @@ class Bag(DaskMethodsMixin):
)
def distinct(self, key=None):
""" Distinct elements of collection
"""Distinct elements of collection
Unordered without repeats.
......@@ -953,7 +956,7 @@ class Bag(DaskMethodsMixin):
def reduction(
self, perpartition, aggregate, split_every=None, out_type=Item, name=None
):
""" Reduce collection with reduction operators.
"""Reduce collection with reduction operators.
Parameters
----------
......@@ -1070,7 +1073,7 @@ class Bag(DaskMethodsMixin):
return self.var(ddof=ddof).apply(math.sqrt)
def join(self, other, on_self, on_other=None):
""" Joins collection with another collection.
"""Joins collection with another collection.
Other collection must be one of the following:
......@@ -1162,7 +1165,7 @@ class Bag(DaskMethodsMixin):
combine_initial=no_default,
split_every=None,
):
""" Combined reduction and groupby.
"""Combined reduction and groupby.
Foldby provides a combined groupby and reduce for efficient parallel
split-apply-combine tasks.
......@@ -1337,7 +1340,7 @@ class Bag(DaskMethodsMixin):
return type(self)(graph, e, 1)
def take(self, k, npartitions=1, compute=True, warn=True):
""" Take the first k elements.
"""Take the first k elements.
Parameters
----------
......@@ -1391,7 +1394,7 @@ class Bag(DaskMethodsMixin):
return b
def flatten(self):
""" Concatenate nested lists into one long list.
"""Concatenate nested lists into one long list.
>>> b = from_sequence([[1], [2, 3]])
>>> list(b)
......@@ -1420,7 +1423,7 @@ class Bag(DaskMethodsMixin):
max_branch=None,
shuffle=None,
):
""" Group collection by key function
"""Group collection by key function
This requires a full dataset read, serialization and shuffle.
This is expensive. If possible you should use ``foldby``.
......@@ -1473,7 +1476,7 @@ class Bag(DaskMethodsMixin):
raise NotImplementedError(msg)
def to_dataframe(self, meta=None, columns=None):
""" Create Dask Dataframe from a Dask Bag.
"""Create Dask Dataframe from a Dask Bag.
Bag should contain tuples, dict records, or scalars.
......@@ -1566,65 +1569,42 @@ class Bag(DaskMethodsMixin):
dsk = self.__dask_optimize__(dsk, keys)
return [Delayed(k, dsk) for k in keys]
def repartition(self, npartitions):
""" Changes the number of partitions of the bag.
def repartition(self, npartitions=None, partition_size=None):
"""Repartition Bag across new divisions.
This can be used to reduce or increase the number of partitions
of the bag.
Parameters
----------
npartitions : int, optional
Number of partitions of output.
partition_size : int or string, optional
Max number of bytes of memory for each partition. Use numbers or
strings like 5MB.
.. warning::
This keyword argument triggers computation to determine
the memory size of each partition, which may be expensive.
Notes
-----
Exactly one of ``npartitions`` or ``partition_size`` should be specified.
A ``ValueError`` will be raised when that is not the case.
Examples
--------
>>> b.repartition(5) # set to have 5 partitions # doctest: +SKIP
"""
new_name = "repartition-%d-%s" % (npartitions, tokenize(self, npartitions))
if npartitions == self.npartitions:
return self
elif npartitions < self.npartitions:
ratio = self.npartitions / npartitions
new_partitions_boundaries = [
int(old_partition_index * ratio)
for old_partition_index in range(npartitions + 1)
]
dsk = {}
for new_partition_index in range(npartitions):
value = (
list,
(
toolz.concat,
[
(self.name, old_partition_index)
for old_partition_index in range(
new_partitions_boundaries[new_partition_index],
new_partitions_boundaries[new_partition_index + 1],
)
],
),
)
dsk[new_name, new_partition_index] = value
else: # npartitions > self.npartitions
ratio = npartitions / self.npartitions
split_name = "split-%s" % tokenize(self, npartitions)
dsk = {}
last = 0
j = 0
for i in range(self.npartitions):
new = last + ratio
if i == self.npartitions - 1:
k = npartitions - j
else:
k = int(new - last)
dsk[(split_name, i)] = (split, (self.name, i), k)
for jj in range(k):
dsk[(new_name, j)] = (operator.getitem, (split_name, i), jj)
j += 1
last = new
graph = HighLevelGraph.from_collections(new_name, dsk, dependencies=[self])
return Bag(graph, name=new_name, npartitions=npartitions)
if sum([partition_size is not None, npartitions is not None]) != 1:
raise ValueError(
"Please provide exactly one ``npartitions`` or ``partition_size`` keyword arguments"
)
if npartitions is not None:
return repartition_npartitions(self, npartitions)
elif partition_size is not None:
return repartition_size(self, partition_size)
def accumulate(self, binop, initial=no_default):
""" Repeatedly apply binary function to a sequence, accumulating results.
"""Repeatedly apply binary function to a sequence, accumulating results.
This assumes that the bag is ordered. While this is typically the case
not all Dask.bag functions preserve this property.
......@@ -1688,7 +1668,7 @@ def collect(grouper, group, p, barrier_token):
def from_sequence(seq, partition_size=None, npartitions=None):
""" Create a dask Bag from Python sequence.
"""Create a dask Bag from Python sequence.
This sequence should be relatively small in memory. Dask Bag works
best when it handles loading your data itself. Commonly we load a
......@@ -1766,7 +1746,7 @@ def from_url(urls):
def dictitems(d):
""" A pickleable version of dict.items
"""A pickleable version of dict.items
>>> dictitems({'x': 1})
[('x', 1)]
......@@ -1775,7 +1755,7 @@ def dictitems(d):
def concat(bags):
""" Concatenate many bags together, unioning all elements.
"""Concatenate many bags together, unioning all elements.
>>> import dask.bag as db
>>> a = db.from_sequence([1, 2, 3])
......@@ -1801,7 +1781,7 @@ def reify(seq):
def from_delayed(values):
""" Create bag from many dask Delayed objects.
"""Create bag from many dask Delayed objects.
These objects will become the partitions of the resulting Bag. They should
evaluate to a ``list`` or some other concrete sequence.
......@@ -1870,7 +1850,7 @@ def merge_frequencies(seqs):
def bag_range(n, npartitions):
""" Numbers from zero to n
"""Numbers from zero to n
Examples
--------
......@@ -1893,7 +1873,7 @@ def bag_range(n, npartitions):
def bag_zip(*bags):
""" Partition-wise bag zip
"""Partition-wise bag zip
All passed bags must have the same number of partitions.
......@@ -2347,7 +2327,10 @@ def groupby_tasks(b, grouper, hash=hash, max_branch=32):
name = "shuffle-" + token
end = dict(
((name, i), (list, (dict.items, (groupby, grouper, (pluck, 1, j)))),)
(
(name, i),
(list, (dict.items, (groupby, grouper, (pluck, 1, j)))),
)
for i, j in enumerate(join)
)
......@@ -2475,7 +2458,7 @@ def random_state_data_python(n, random_state=None):
def split(seq, n):
""" Split apart a sequence into n equal pieces.
"""Split apart a sequence into n equal pieces.
>>> split(range(10), 3)
[[0, 1, 2], [3, 4, 5], [6, 7, 8, 9]]
......@@ -2498,3 +2481,129 @@ def to_dataframe(seq, columns, dtypes):
seq = list(seq)
res = pd.DataFrame(seq, columns=list(columns))
return res.astype(dtypes, copy=False)
def repartition_npartitions(bag, npartitions):
"""Changes the number of partitions of the bag.
This can be used to reduce or increase the number of partitions
of the bag.
"""
if npartitions == bag.npartitions:
return bag
new_name = "repartition-%d-%s" % (npartitions, tokenize(bag, npartitions))
if bag.npartitions > npartitions:
ratio = bag.npartitions / npartitions
new_partitions_boundaries = [
int(old_partition_index * ratio)
for old_partition_index in range(npartitions + 1)
]
return _repartition_from_boundaries(bag, new_partitions_boundaries, new_name)
else: # npartitions > bag.npartitions
div, mod = divmod(npartitions, bag.npartitions)
nsplits = [div] * bag.npartitions
nsplits[-1] += mod
return _split_partitions(bag, nsplits, new_name)
def total_mem_usage(partition):
from copy import deepcopy
from itertools import chain
# if repartition is called multiple times prior to calling compute(), the partitions
# will be itertools.chain objects. Copy the object to avoid consuming the iterable.
if isinstance(partition, chain):
partition = reify(deepcopy(partition))
return sizeof(partition)
def repartition_size(bag, size):
"""
Repartition bag so that new partitions have approximately `size` memory usage each
"""
if isinstance(size, str):
size = parse_bytes(size)
size = int(size)
mem_usages = bag.map_partitions(total_mem_usage).compute()
# 1. split each partition that is larger than partition size
nsplits = [1 + mem_usage // size for mem_usage in mem_usages]
if any((nsplit > 1 for nsplit in nsplits)):
split_name = "repartition-split-{}".format(tokenize(bag, size))
bag = _split_partitions(bag, nsplits, split_name)
# update mem_usages to account for the split partitions
split_mem_usages = []
for n, usage in zip(nsplits, mem_usages):
split_mem_usages.extend([usage / n] * n)
mem_usages = split_mem_usages
# 2. now that all partitions are less than size, concat them up to size
assert all((mem_usage <= size for mem_usage in mem_usages))
new_npartitions = list(map(len, iter_chunks(mem_usages, size)))
new_partitions_boundaries = accumulate(operator.add, new_npartitions)
new_name = "repartition-{}".format(tokenize(bag, size))
return _repartition_from_boundaries(bag, new_partitions_boundaries, new_name)
def _split_partitions(bag, nsplits, new_name):
"""Split a Dask bag into new partitions
Parameters
----------
bag: Dask bag
nsplits: List[int]
Number of target bags for each partition
The length of nsplits should be the same as bag.npartitions
new_name: str
See Also
--------
repartition_npartitions
repartition_size
"""
if len(nsplits) != bag.npartitions:
raise ValueError("nsplits should have len={}".format(bag.npartitions))
dsk = {}
split_name = "split-{}".format(tokenize(bag, nsplits))
j = 0
for i, k in enumerate(nsplits):
if k == 1:
dsk[new_name, j] = (bag.name, i)
j += 1
else:
dsk[split_name, i] = (split, (bag.name, i), k)
for jj in range(k):
dsk[new_name, j] = (operator.getitem, (split_name, i), jj)
j += 1
graph = HighLevelGraph.from_collections(new_name, dsk, dependencies=[bag])
return Bag(graph, name=new_name, npartitions=sum(nsplits))
def _repartition_from_boundaries(bag, new_partitions_boundaries, new_name):
if not isinstance(new_partitions_boundaries, list):
new_partitions_boundaries = list(new_partitions_boundaries)
if new_partitions_boundaries[0] > 0:
new_partitions_boundaries.insert(0, 0)
if new_partitions_boundaries[-1] < bag.npartitions:
new_partitions_boundaries.append(bag.npartitions)
num_new_partitions = len(new_partitions_boundaries) - 1
dsk = {}
for new_partition_index in range(num_new_partitions):
value = (
list,
(
toolz.concat,
[
(bag.name, old_partition_index)
for old_partition_index in range(
new_partitions_boundaries[new_partition_index],
new_partitions_boundaries[new_partition_index + 1],
)
],
),
)
dsk[new_name, new_partition_index] = value
graph = HighLevelGraph.from_collections(new_name, dsk, dependencies=[bag])
return Bag(graph, name=new_name, npartitions=num_new_partitions)
......@@ -26,6 +26,7 @@ from dask.bag.core import (
inline_singleton_lists,
optimize,
from_delayed,
total_mem_usage,
)
from dask.bag.utils import assert_eq
from dask.delayed import Delayed
......@@ -647,7 +648,7 @@ def test_read_text_large():
def test_read_text_encoding():
with tmpfile() as fn:
with open(fn, "wb") as f:
f.write((u"你好!" + os.linesep).encode("gb18030") * 100)
f.write(("你好!" + os.linesep).encode("gb18030") * 100)
b = db.read_text(fn, blocksize=100, encoding="gb18030")
c = db.read_text(fn, encoding="gb18030")
assert len(b.dask) > 5
......@@ -683,11 +684,11 @@ def test_from_s3():
pytest.importorskip("s3fs")
five_tips = (
u"total_bill,tip,sex,smoker,day,time,size\n",
u"16.99,1.01,Female,No,Sun,Dinner,2\n",
u"10.34,1.66,Male,No,Sun,Dinner,3\n",
u"21.01,3.5,Male,No,Sun,Dinner,3\n",
u"23.68,3.31,Male,No,Sun,Dinner,2\n",
"total_bill,tip,sex,smoker,day,time,size\n",
"16.99,1.01,Female,No,Sun,Dinner,2\n",
"10.34,1.66,Male,No,Sun,Dinner,3\n",
"21.01,3.5,Male,No,Sun,Dinner,3\n",
"23.68,3.31,Male,No,Sun,Dinner,2\n",
)
# test compressed data
......@@ -947,7 +948,7 @@ def test_to_textfiles_name_function_warn():
def test_to_textfiles_encoding():
b = db.from_sequence([u"汽车", u"苹果", u"天气"], npartitions=2)
b = db.from_sequence(["汽车", "苹果", "天气"], npartitions=2)
for ext, myopen in ext_open:
with tmpdir() as dir:
c = b.to_textfiles(
......@@ -960,7 +961,7 @@ def test_to_textfiles_encoding():
text = f.read()
if hasattr(text, "decode"):
text = text.decode("gb18030")
assert u"天气" in text
assert "天气" in text
f.close()
......@@ -1011,12 +1012,12 @@ def test_string_namespace():
def test_string_namespace_with_unicode():
b = db.from_sequence([u"Alice Smith", u"Bob Jones", "Charlie Smith"], npartitions=2)
b = db.from_sequence(["Alice Smith", "Bob Jones", "Charlie Smith"], npartitions=2)
assert list(b.str.lower()) == ["alice smith", "bob jones", "charlie smith"]
def test_str_empty_split():
b = db.from_sequence([u"Alice Smith", u"Bob Jones", "Charlie Smith"], npartitions=2)
b = db.from_sequence(["Alice Smith", "Bob Jones", "Charlie Smith"], npartitions=2)
assert list(b.str.split()) == [
["Alice", "Smith"],
["Bob", "Jones"],
......@@ -1065,7 +1066,7 @@ def test_bag_class_extend():
def test_gh715():
bin_data = u"\u20ac".encode("utf-8")
bin_data = "\u20ac".encode("utf-8")
with tmpfile() as fn:
with open(fn, "wb") as f:
f.write(bin_data)
......@@ -1137,12 +1138,15 @@ def test_from_delayed_iterator():
delayed_records = delayed(lazy_records, pure=False)
bag = db.from_delayed([delayed_records(5) for _ in range(5)])
assert db.compute(
bag.count(),
bag.pluck("operations").count(),
bag.pluck("operations").flatten().count(),
scheduler="sync",
) == (25, 25, 50)
assert (
db.compute(
bag.count(),
bag.pluck("operations").count(),
bag.pluck("operations").flatten().count(),
scheduler="sync",
)
== (25, 25, 50)
)