Build Custom ArraysΒΆ
Here we have a serial blocked computation for computing the mean of all positive elements in a large, on disk array:
x = h5py.File('myfile.hdf5')['/x'] # Trillion element array on disk
sums = []
counts = []
for i in range(1000000): # One million times
chunk = x[1000000*i:1000000*(i + 1)] # Pull out chunk
positive = chunk[chunk > 0] # Filter out negative elements
sums.append(positive.sum()) # Sum chunk
counts.append(positive.size) # Count chunk
result = sum(sums) / sum(counts) # Aggregate results
Below is the same code, parallelized using dask.delayed
:
x = delayed(h5py.File('myfile.hdf5')['/x']) # Trillion element array on disk
sums = []
counts = []
for i in range(1000000): # One million times
chunk = x[1000000*i:1000000*(i + 1)] # Pull out chunk
positive = chunk[chunk > 0] # Filter out negative elements
sums.append(positive.sum()) # Sum chunk
counts.append(positive.size) # Count chunk
result = delayed(sum)(sums) / delayed(sum)(counts) # Aggregate results
result.compute() # Perform the computation
Only 3 lines had to change to make this computation parallel instead of serial.
- Wrap the original array in
delayed
. This makes all the slices on it returnDelayed
objects. - Wrap both calls to
sum
withdelayed
. - Call the
compute
method on the result.
While the for loop above still iterates fully, it’s just building up a graph of the computation that needs to happen, without actually doing any computing.