Distributed chained computing with Dask on a high failure-rate cluster?

Solution 1:

This might not be the solution you are looking for, but one option is to divide up the task sequence into small-enough batches that can ensure that the task will complete in time (or will be quick to re-do from scratch).

Something like this perhaps:

import dask.bag as db
from toolz import partition_all

n_per_chunk = 100 # just a guess, the best number depends on the case
tasks = list(partition_all(n_per_chunk, my_ids))

results = []
for t in tasks:
    summed_image = (
        db
        .from_sequence(my_ids)
        .map(gen_image_from_ids)
        .reduction(sum, sum)
        .compute()
    )
    results.append(summed_image)

summed_image = sum(results) # final result

There are other things to keep in mind here regarding re-starting the workflow on failure (or potentially launching smaller tasks in parallel), but hopefully this gives you a starting point for a workable solution.