Commit 4944fbae authored by Kruyff,D.L.W. (Dylan)'s avatar Kruyff,D.L.W. (Dylan)
Browse files

Make prototype compatible for larger data size

parent 12614e4f
from .core import tensordot_lookup, concatenate_lookup, einsum_lookup
def register_cupy():
import cupy
concatenate_lookup.register(cupy.ndarray, cupy.concatenate)
tensordot_lookup.register(cupy.ndarray, cupy.tensordot)
def _cupy_einsum(*args, **kwargs):
# NB: cupy does not accept `order` or `casting` kwargs - ignore
kwargs.pop("casting", None)
kwargs.pop("order", None)
return cupy.einsum(*args, **kwargs)
def register_cupyx():
from cupyx.scipy.sparse import spmatrix
from cupy.sparse import hstack
from cupy.sparse import vstack
except ImportError as e:
raise ImportError(
"Stacking of sparse arrays requires at least CuPy version 8.0.0"
) from e
def _concat_cupy_sparse(L, axis=0):
if axis == 0:
return vstack(L)
elif axis == 1:
return hstack(L)
msg = (
"Can only concatenate cupy sparse matrices for axis in "
"{0, 1}. Got %s" % axis
raise ValueError(msg)
concatenate_lookup.register(spmatrix, _concat_cupy_sparse)
def register_sparse():
import sparse
concatenate_lookup.register(sparse.COO, sparse.concatenate)
tensordot_lookup.register(sparse.COO, sparse.tensordot)
def register_scipy_sparse():
import scipy.sparse
def _concatenate(L, axis=0):
if axis == 0:
return scipy.sparse.vstack(L)
elif axis == 1:
return scipy.sparse.hstack(L)
msg = (
"Can only concatenate scipy sparse matrices for axis in "
"{0, 1}. Got %s" % axis
raise ValueError(msg)
concatenate_lookup.register(scipy.sparse.spmatrix, _concatenate)
import numbers
import warnings
import tlz as toolz
from .. import base, utils
from ..delayed import unpack_collections
from ..highlevelgraph import HighLevelGraph
from ..blockwise import blockwise as core_blockwise
def blockwise(
""" Tensor operation: Generalized inner and outer products
A broad class of blocked algorithms and patterns can be specified with a
concise multi-index notation. The ``blockwise`` function applies an in-memory
function across multiple blocks of multiple inputs in a variety of ways.
Many dask.array operations are special cases of blockwise including
elementwise, broadcasting, reductions, tensordot, and transpose.
func : callable
Function to apply to individual tuples of blocks
out_ind : iterable
Block pattern of the output, something like 'ijk' or (1, 2, 3)
*args : sequence of Array, index pairs
Sequence like (x, 'ij', y, 'jk', z, 'i')
**kwargs : dict
Extra keyword arguments to pass to function
dtype : np.dtype
Datatype of resulting array.
concatenate : bool, keyword only
If true concatenate arrays along dummy indices, else provide lists
adjust_chunks : dict
Dictionary mapping index to function to be applied to chunk sizes
new_axes : dict, keyword only
New indexes and their dimension lengths
2D embarrassingly parallel operation from two arrays, x, and y.
>>> z = blockwise(operator.add, 'ij', x, 'ij', y, 'ij', dtype='f8') # z = x + y # doctest: +SKIP
Outer product multiplying x by y, two 1-d vectors
>>> z = blockwise(operator.mul, 'ij', x, 'i', y, 'j', dtype='f8') # doctest: +SKIP
z = x.T
>>> z = blockwise(np.transpose, 'ji', x, 'ij', dtype=x.dtype) # doctest: +SKIP
The transpose case above is illustrative because it does same transposition
both on each in-memory block by calling ``np.transpose`` and on the order
of the blocks themselves, by switching the order of the index ``ij -> ji``.
We can compose these same patterns with more variables and more complex
in-memory functions
z = X + Y.T
>>> z = blockwise(lambda x, y: x + y.T, 'ij', x, 'ij', y, 'ji', dtype='f8') # doctest: +SKIP
Any index, like ``i`` missing from the output index is interpreted as a
contraction (note that this differs from Einstein convention; repeated
indices do not imply contraction.) In the case of a contraction the passed
function should expect an iterable of blocks on any array that holds that
index. To receive arrays concatenated along contracted dimensions instead
pass ``concatenate=True``.
Inner product multiplying x by y, two 1-d vectors
>>> def sequence_dot(x_blocks, y_blocks):
... result = 0
... for x, y in zip(x_blocks, y_blocks):
... result +=
... return result
>>> z = blockwise(sequence_dot, '', x, 'i', y, 'i', dtype='f8') # doctest: +SKIP
Add new single-chunk dimensions with the ``new_axes=`` keyword, including
the length of the new dimension. New dimensions will always be in a single
>>> def f(x):
... return x[:, None] * np.ones((1, 5))
>>> z = blockwise(f, 'az', x, 'a', new_axes={'z': 5}, dtype=x.dtype) # doctest: +SKIP
New dimensions can also be multi-chunk by specifying a tuple of chunk
sizes. This has limited utility as is (because the chunks are all the
same), but the resulting graph can be modified to achieve more useful
results (see ``da.map_blocks``).
>>> z = blockwise(f, 'az', x, 'a', new_axes={'z': (5, 5)}, dtype=x.dtype) # doctest: +SKIP
If the applied function changes the size of each chunk you can specify this
with a ``adjust_chunks={...}`` dictionary holding a function for each index
that modifies the dimension size in that index.
>>> def double(x):
... return np.concatenate([x, x])
>>> y = blockwise(double, 'ij', x, 'ij',
... adjust_chunks={'i': lambda n: 2 * n}, dtype=x.dtype) # doctest: +SKIP
Include literals by indexing with None
>>> y = blockwise(add, 'ij', x, 'ij', 1234, None, dtype=x.dtype) # doctest: +SKIP
out = name
new_axes = new_axes or {}
# Input Validation
if len(set(out_ind)) != len(out_ind):
raise ValueError(
"Repeated elements not allowed in output index",
[k for k, v in toolz.frequencies(out_ind).items() if v > 1],
new = (
- {a for arg in args[1::2] if arg is not None for a in arg}
- set(new_axes or ())
if new:
raise ValueError("Unknown dimension", new)
from .core import Array, unify_chunks, normalize_arg
if align_arrays:
chunkss, arrays = unify_chunks(*args)
arginds = [(a, i) for (a, i) in toolz.partition(2, args) if i is not None]
chunkss = {}
# For each dimension, use the input chunking that has the most blocks;
# this will ensure that broadcasting works as expected, and in
# particular the number of blocks should be correct if the inputs are
# consistent.
for arg, ind in arginds:
for c, i in zip(arg.chunks, ind):
if i not in chunkss or len(c) > len(chunkss[i]):
chunkss[i] = c
arrays = args[::2]
for k, v in new_axes.items():
if not isinstance(v, tuple):
v = (v,)
chunkss[k] = v
arginds = zip(arrays, args[1::2])
numblocks = {}
dependencies = []
arrays = []
# Normalize arguments
argindsstr = []
for arg, ind in arginds:
if ind is None:
arg = normalize_arg(arg)
arg, collections = unpack_collections(arg)
if (
hasattr(arg, "ndim")
and hasattr(ind, "__len__")
and arg.ndim != len(ind)
raise ValueError(
"Index string %s does not match array dimension %d"
% (ind, arg.ndim)
numblocks[] = arg.numblocks
arg =
argindsstr.extend((arg, ind))
# Normalize keyword arguments
kwargs2 = {}
for k, v in kwargs.items():
v = normalize_arg(v)
v, collections = unpack_collections(v)
kwargs2[k] = v
# Finish up the name
if not out:
out = "%s-%s" % (
token or utils.funcname(func).strip("_"),
base.tokenize(func, out_ind, argindsstr, dtype, **kwargs),
graph = core_blockwise(
graph = HighLevelGraph.from_collections(
out, graph, dependencies=arrays + dependencies
chunks = [chunkss[i] for i in out_ind]
if adjust_chunks:
for i, ind in enumerate(out_ind):
if ind in adjust_chunks:
if callable(adjust_chunks[ind]):
chunks[i] = tuple(map(adjust_chunks[ind], chunks[i]))
elif isinstance(adjust_chunks[ind], numbers.Integral):
chunks[i] = tuple(adjust_chunks[ind] for _ in chunks[i])
elif isinstance(adjust_chunks[ind], (tuple, list)):
if len(adjust_chunks[ind]) != len(chunks[i]):
raise ValueError(
"Dimension {0} has {1} blocks, "
"adjust_chunks specified with "
"{2} blocks".format(
i, len(chunks[i]), len(adjust_chunks[ind])
chunks[i] = tuple(adjust_chunks[ind])
raise NotImplementedError(
"adjust_chunks values must be callable, int, or tuple"
chunks = tuple(chunks)
if meta is None:
from .utils import compute_meta
meta = compute_meta(func, dtype, *args[::2], **kwargs)
if meta is not None:
return Array(graph, out, chunks, meta=meta)
return Array(graph, out, chunks, dtype=dtype)
def atop(*args, **kwargs):
warnings.warn("The da.atop function has moved to da.blockwise")
return blockwise(*args, **kwargs)
""" A set of NumPy functions to apply per chunk """
from import Container, Iterable, Sequence
from functools import wraps
from tlz import concat
import numpy as np
from . import numpy_compat as npcompat
from ..core import flatten
from ..utils import ignoring
from numbers import Integral
from numpy import take_along_axis
except ImportError: # pragma: no cover
take_along_axis = npcompat.take_along_axis
def keepdims_wrapper(a_callable):
A wrapper for functions that don't provide keepdims to ensure that they do.
def keepdims_wrapped_callable(x, axis=None, keepdims=None, *args, **kwargs):
r = a_callable(x, axis=axis, *args, **kwargs)
if not keepdims:
return r
axes = axis
if axes is None:
axes = range(x.ndim)
if not isinstance(axes, (Container, Iterable, Sequence)):
axes = [axes]
r_slice = tuple()
for each_axis in range(x.ndim):
if each_axis in axes:
r_slice += (None,)
r_slice += (slice(None),)
r = r[r_slice]
return r
return keepdims_wrapped_callable
# Wrap NumPy functions to ensure they provide keepdims.
sum = np.sum
prod =
min = np.min
max = np.max
argmin = keepdims_wrapper(np.argmin)
nanargmin = keepdims_wrapper(np.nanargmin)
argmax = keepdims_wrapper(np.argmax)
nanargmax = keepdims_wrapper(np.nanargmax)
any = np.any
all = np.all
nansum = np.nansum
nanprod = np.nanprod
nancumprod = np.nancumprod
nancumsum = np.nancumsum
nanmin = np.nanmin
nanmax = np.nanmax
mean = np.mean
with ignoring(AttributeError):
nanmean = np.nanmean
var = np.var
with ignoring(AttributeError):
nanvar = np.nanvar
std = np.std
with ignoring(AttributeError):
nanstd = np.nanstd
def coarsen(reduction, x, axes, trim_excess=False, **kwargs):
""" Coarsen array by applying reduction to fixed size neighborhoods
reduction: function
Function like np.sum, np.mean, etc...
x: np.ndarray
Array to be coarsened
axes: dict
Mapping of axis to coarsening factor
>>> x = np.array([1, 2, 3, 4, 5, 6])
>>> coarsen(np.sum, x, {0: 2})
array([ 3, 7, 11])
>>> coarsen(np.max, x, {0: 3})
array([3, 6])
Provide dictionary of scale per dimension
>>> x = np.arange(24).reshape((4, 6))
>>> x
array([[ 0, 1, 2, 3, 4, 5],
[ 6, 7, 8, 9, 10, 11],
[12, 13, 14, 15, 16, 17],
[18, 19, 20, 21, 22, 23]])
>>> coarsen(np.min, x, {0: 2, 1: 3})
array([[ 0, 3],
[12, 15]])
You must avoid excess elements explicitly
>>> x = np.array([1, 2, 3, 4, 5, 6, 7, 8])
>>> coarsen(np.min, x, {0: 3}, trim_excess=True)
array([1, 4])
# Insert singleton dimensions if they don't exist already
for i in range(x.ndim):
if i not in axes:
axes[i] = 1
if trim_excess:
ind = tuple(
slice(0, -(d % axes[i])) if d % axes[i] else slice(None, None)
for i, d in enumerate(x.shape)
x = x[ind]
# (10, 10) -> (5, 2, 5, 2)
newshape = tuple(concat([(x.shape[i] // axes[i], axes[i]) for i in range(x.ndim)]))
return reduction(x.reshape(newshape), axis=tuple(range(1, x.ndim * 2, 2)), **kwargs)
def trim(x, axes=None):
""" Trim boundaries off of array
>>> x = np.arange(24).reshape((4, 6))
>>> trim(x, axes={0: 0, 1: 1})
array([[ 1, 2, 3, 4],
[ 7, 8, 9, 10],
[13, 14, 15, 16],
[19, 20, 21, 22]])
>>> trim(x, axes={0: 1, 1: 1})
array([[ 7, 8, 9, 10],
[13, 14, 15, 16]])
if isinstance(axes, Integral):
axes = [axes] * x.ndim
if isinstance(axes, dict):
axes = [axes.get(i, 0) for i in range(x.ndim)]
return x[tuple(slice(ax, -ax if ax else None) for ax in axes)]
def topk(a, k, axis, keepdims):
""" Chunk and combine function of topk
Extract the k largest elements from a on the given axis.
If k is negative, extract the -k smallest elements instead.
Note that, unlike in the parent function, the returned elements
are not sorted internally.
assert keepdims is True
axis = axis[0]
if abs(k) >= a.shape[axis]:
return a
a = np.partition(a, -k, axis=axis)
k_slice = slice(-k, None) if k > 0 else slice(-k)
return a[tuple(k_slice if i == axis else slice(None) for i in range(a.ndim))]
def topk_aggregate(a, k, axis, keepdims):
""" Final aggregation function of topk
Invoke topk one final time and then sort the results internally.
assert keepdims is True
a = topk(a, k, axis, keepdims)
axis = axis[0]
a = np.sort(a, axis=axis)
if k < 0:
return a
return a[
slice(None, None, -1) if i == axis else slice(None) for i in range(a.ndim)
def argtopk_preprocess(a, idx):
""" Preparatory step for argtopk
Put data together with its original indices in a tuple.
return a, idx
def argtopk(a_plus_idx, k, axis, keepdims):
""" Chunk and combine function of argtopk
Extract the indices of the k largest elements from a on the given axis.
If k is negative, extract the indices of the -k smallest elements instead.
Note that, unlike in the parent function, the returned elements
are not sorted internally.
assert keepdims is True
axis = axis[0]
if isinstance(a_plus_idx, list):
a_plus_idx = list(flatten(a_plus_idx))
a = np.concatenate([ai for ai, _ in a_plus_idx], axis)
idx = np.concatenate(
[np.broadcast_to(idxi, ai.shape) for ai, idxi in a_plus_idx], axis
a, idx = a_plus_idx
if abs(k) >= a.shape[axis]:
return a_plus_idx
idx2 = np.argpartition(a, -k, axis=axis)
k_slice = slice(-k, None) if k > 0 else slice(-k)
idx2 = idx2[tuple(k_slice if i == axis else slice(None) for i in range(a.ndim))]
return take_along_axis(a, idx2, axis), take_along_axis(idx, idx2, axis)
def argtopk_aggregate(a_plus_idx, k, axis, keepdims):
""" Final aggregation function of argtopk
Invoke argtopk one final time, sort the results internally, drop the data
and return the index only.
assert keepdims is True
a, idx = argtopk(a_plus_idx, k, axis, keepdims)
axis = axis[0]
idx2 = np.argsort(a, axis=