RAM blowing up on computation

Below is a runnable code snippet using dask and cupy, which I have problems with. I run this on Google Colab with GPU activated.

Basically my problem is, that A and At are arrays which are too big for RAM, thats why I use Dask. On these too big for RAM arrays, I run operations, but I would like to obtain AtW1[:,k] (as a cupy array) without blowing my RAM or GPU Memory, because i need this value for further operations. How can I achieve this?

import dask.array as da
import cupy as cp

from dask_cuda import LocalCUDACluster
from dask.distributed import Client

cluster = LocalCUDACluster()
client = Client(cluster)

n_topics = 10
n_terms = 10000
n_docs = 250000
k = 0

A = da.zeros_like(cp.zeros(1), shape=(n_terms,n_docs), chunks=(1,n_docs))
At = A.transpose().rechunk((10, n_terms))

n_row = A.shape[0]
n_col = A.shape[1]

W1 = cp.random.random((n_row, n_topics))
H = cp.random.random((n_col, n_topics))
W1tW1 = W1.T.dot(W1)

# The problem starts here: i want to get AtW1[:,k] as a cupy array without blowing my RAM
AtW1 = da.dot(At, W1).rechunk((n_docs, 1))
AtW1 = AtW1.persist()
val = AtW1[:,k].compute()
val

Edit: Another example with the same problem. How can i achieve the computation, without changing the first part? The variables in the first part are just there, so the dimensions of the arrays are clear and the example is runnable.

import cupy as cp
import dask.array as da
from dask.array.linalg import norm
from dask_cuda import LocalCUDACluster
from dask.distributed import Client

cluster = LocalCUDACluster()
client = Client(cluster)

####### These are given and cannot be changed #######
big = 250000
small = 10000
full = da.full_like(cp.full(1, 2.0), fill_value=2.0, shape=(small,big), chunks=(1,big))
a = cp.random.random((small, 10))
b = cp.transpose(cp.random.random((big, 10)))

###### How can i make this work? #######
ab = da.dot(da.from_array(a), da.from_array(b))
subtracted = full - ab
normalized = norm(subtracted, ord='fro')
val = normalized.compute()

Solution 1:

Although the idea of rechunking makes a lot of sense on paper, in practice rechunking needs to be done with great care, since it will only be able to reshape the work that can be blocked in principle.

For example, compare the following two approaches:

A = da.zeros_like(cp.zeros(1), shape=(n_terms,n_docs), chunks=(1,n_docs))
At = A.transpose().rechunk((1, n_terms))

At_version2 = da.zeros_like(cp.zeros(1), shape=(n_docs, n_terms), chunks=(1,n_terms))

When requesting At[0,:] the amount of work dask will do is going to be much, much larger than the work that would have been done for At_version2[0,:]. Why? Because the original data for At will be defined in chunks of (1,n_docs), so to generate the first row of the transpose, dask will have to evaluate the original array A at every row.

Using this logic, it's best to make sure that your data is provided in shapes that are indeed compatible with the block multiplication needed to get the final result. This will result in fewer tasks and lower memory requirements, because now only subset of the data will have to be held in memory.

This is a modified version of the code that computes quickly:

import dask.array as da
import cupy as cp

from dask_cuda import LocalCUDACluster
from dask.distributed import Client

cluster = LocalCUDACluster()
client = Client(cluster)

n_topics = 10
n_terms = 10000
n_docs = 250000
k = 0

At_v2 = da.zeros_like(cp.zeros(1), shape=(n_docs, n_terms), chunks=(1000,n_terms))

W1 = cp.random.random((n_terms, n_topics))

result = da.dot(At_v2, W1)[:,0].compute()

Also note that running .persist will keep the array in memory, so if memory is a constraint you probably don't want to clog it with .persist.