What's this then? - Kai's Blog

Leveraging Dask and Xarray for 3-D Interpolation

Edit

(30-01-2025) Ok, I have made a grave mistake of only interpolating in one axis in each direction (whoops). I am still working on a solution that does NOT involve the use of for loops (which I hate) and will hopefully give an update regarding this in a future blog post.  

However, I believe the use of dask is important in speeding up data analysis. In fact, I have recently used dask arrays and parallelism to perform time-area averages of rotating Rayleigh-Benard Convection flow fields. Again, will give an update in another blog post soon.  

Till then, feel free to use my code :>  

TL;DR

Array too large, RAM go boom.  

Solution:

My Jupyter Notebook that performs 3-D interpolation of large CFD datasets can be found here  

Adapt to your own needs.


Problem

A common problem that arises during post-processing of numerical simulation data is the need to read, manipulate, process and write extremely large datasets (for my case, up to ~2GB for each timestep) quickly and in a memory efficient manner.  

As part of my PHD project, I am expected to carry out simulations of turbulent Rayleigh-Bénard Convection using the fourth-order accurate finite volume scheme numerical simulation code goldfish. Due to the lack of adaptive mesh refinement (AMR) implementation in goldfish, I often had to interpolate flow field data from a coarse mesh to a finer mesh.  

For moderate dimensions, e.g. 512×126×512 in cylindrical coordinates, this is usually no problem. However, I started to run into memory problems when interpolating to a grid with very large dimensions in the order of 1500×1000×1500 elements.  

With my measly 16GB of RAM on my local machine (upgrade soon), there is no way scipy and numpy can load 19GB of temperature data array onto physical memory. I desperately needed a convenient array chunking solution.  

From Dask till dawn

Allow me to introduce to you the best Python library I have ever used:  

Dask
 

This nifty Python library allows one to conveniently chop a large numpy.ndarray into a dask.array object, which holds the address of all the chunked numpy arrays. For example, in my interpolation ipynb, loading the temperature field from the netcdf4 flow file 'flow0037.nc':

import dask.array as da
import netCDF4

f_old = netCDF4.Dataset('flow0037.nc', 'r')
temp_coarse = da.from_array(f_old['temp'])

print(temp_coarse)

yields a Dask array object instead of a numpy array:

Output:
dask.array<array, shape=(386, 474, 132), dtype=float64, chunksize=(356, 356, 132), chunktype=numpy.MaskedArray>

Notice that:

  1. The array is now chunked into a chunk size (chunk space) of (356, 356, 132)
  2. Since the returned variable is an object, conversion into a numpy.ndarray will require the following line:
# Get numpy array back from Dask array object
temp_coarse.compute()
  1. The individual chunks are of type numpy.MaskedArray, which should be avoided as masked arrays cannot be pickled.  
     

The brilliant part about Dask arrays is the ability to perform most numpy.ndarray methods. For instance, we want to get the mean of an array. This can be done by using the .mean() method as follows:

test = da.zeros([12, 12, 12])
testmaxval = test.max()
print(test)
print(testmaxval)
print(testmaxval.compute())
Output:
dask.array<zeros_like, shape=(12, 12, 12), dtype=float64, chunksize=(12, 12, 12), chunktype=numpy.ndarray>
dask.array<max-aggregate, shape=(), dtype=float64, chunksize=(), chunktype=numpy.ndarray>
0.0

Finally, Dask also allows one to start their own distributed computing cluster. With bokeh installed, I can look at cool stats about my sucky AMD Ryzen 9 mini-ITX PC: Image

interpolate_mesh.ipynb code explaination

This Jupyter notebook was originally based on an IDL script written by my Ph.D. supervisor to interpolate temperature and velocity field from a coarse grid to a finer grid. I will be using a Python function translated from the SPLINE module from IDL to interpolate my flow field data.  

Import libraries

import numpy as np
from scipy.interpolate import griddata
import netCDF4
from dask.distributed import Client
import dask.array as da
import dask

Prerequisites are outlined in the Github repository.  

Start Dask client

As mentioned prior, this starts a distributed compute cluster on your local machine. n_workers allocates the number of workers to use (set this according to the available physical cores) and memory_limit limits the physical memory one worker can use.

client=Client(n_workers=16,
               threads_per_worker=2,
               memory_target_fraction=0.95,
               memory_limit='1GB')

With bokeh installed, one can monitor the workers by pasting the IP address of the local host into the browser ('localhost:8787/status' for me): Image  

Reading coarse mesh coordinates from netcdf4 files

goldfish stores the coordinates of grid points once in 'geometry.nc' while flow files containing simulation parameters (Ra,Pr,Ro,gamma), current timestep, and temperature and velocity fields are stored in 'flowxxxx.nc'.  

Here, it is important to use the Dask.array.from_array() function to load the Dask arrays. We want to minimize 'transfer' operations as much as possible, as anytime not spent on computation is a waste. Refer to here for best practices on loading data.

g_new = netCDF4.Dataset('geometry.nc', 'r')
zu_fine = da.from_array(g_new['z_uz'])
zp_fine = da.from_array(g_new['z_p'])
dz_fine = da.from_array(g_new['delta_z_p'])
rp_fine = da.from_array(g_new['r_p'])
rw_fine = da.from_array(g_new['r_ur'])
dr_fine = da.from_array(g_new['delta_r_p'])
dphi_fine = da.from_array(g_new['delta_phi'])

dimz_fine = len(g_new.dimensions['dim_z'])
dimphi_fine = len(g_new.dimensions['dim_phi'])
dimr_fine = len(g_new.dimensions['dim_r'])

height_fine = da.max(zp_fine)
phi_fine = da.arange(dimphi_fine) * dphi_fine - np.pi - dphi_fine - dphi_fine / 2.0

Rinse and repeat this to read in the fine grid coordinates and the coarse grid flow fields.  

SPLINE

The spline(X,Y,T,sigma=1.0) is a 1-D cubic interpolation function mirroring that of the SPLINE module in IDL and takes in 3 arguments:

This Python function is translated by StackOverflow user Swike in his answer here.  

Parallelizing independent functions

A 3-dimensional grid will have, of course, 3 columns of elements for the xyz direction respectively. goldfish runs numerical simulations in the cylindrical coordinate system, as such grid coordinates are stored in ZRϕ format.  

In either case it is obvious that parallelizing the interpolation in all 3 directions will speed up progress immensely. Consider the following test case which interpolates a 10×10×10 grid into a 100×100×100 grid:

import time
testc=da.linspace(0,1,num=10)
testf=da.linspace(0,1,num=100)
testrng = da.random.default_rng()

start_time = time.time()
test = da.zeros([100, 100, 100])

testvals = testrng.standard_normal(10)
test[:, 0, 0] = spline(testc,testvals,testf)

testvals = testrng.standard_normal(10)
test[0, :, 0] = spline(testc,testvals,testf)

testvals = testrng.standard_normal(10)
test[0, 0, :] = spline(testc,testvals,testf)
end_time = time.time()

print(end_time-start_time)

This took 28.775 seconds.  

To speed this up, we want to utilize Dask's delayed library which allows us to parallelize the interpolation:

testc=da.linspace(0,1,num=10)
testf=da.linspace(0,1,num=100)
testrng = da.random.default_rng()
start_time = time.time()

@dask.delayed
def testfunc(cc,vals,ff):
    return spline(cc,vals,ff)

@dask.delayed
def output(arr1,arr2,arr3):
    test=da.zeros([100,100,100])
    test[:,0,0]=arr1
    test[0,:,0]=arr2
    test[0,0,:]=arr3
    return test
    
task1 =testfunc(testc,testrng.standard_normal(10),testf)
task2 =testfunc(testc,testrng.standard_normal(10),testf)
task3 =testfunc(testc,testrng.standard_normal(10),testf)
combineheuhue = output(task1,task2,task3)

# Combine the tasks and compute them in parallel
results = combineheuhue.compute()
end_time = time.time()

print(end_time-start_time)

This took a blinding 0.188 seconds!!
I cannot understate how shocked I was.  

Interpolating my huge flow files from a 386×474×132 grid to a 1538×1538×1028 grid took me less than 1 second:

@dask.delayed
def splinewrapper(finearr,data,coarsearr):
    return spline(finearr,data,coarsearr)

@dask.delayed
def output(arrz,arrr,arrphi):
    arrout=da.zeros([dimz_fine,dimr_fine,dimphi_fine])
    arrout[0,:,0]=arrz
    arrout[0,0,:]=arrr
    arrout[:,0,0]=arrphi
    return arrout


tempr =splinewrapper(rp_coarse,temp_coarse[0,:,0],rp_fine)
tempphi =splinewrapper(phi_coarse,temp_coarse[0,0,:],phi_fine)
tempz =splinewrapper(zu_coarse,temp_coarse[:,0,0],zu_fine)
tempf = output(tempr,tempphi,tempz)

# Combine the tasks and compute them in parallel
temp_fine = tempf.compute()

# Print the shapes of the computed arrays
print(temp_fine.shape)

"What the actual f-?" was all that was going through my head at that time. It impressed me even more that the Dask library was first released in 2015 and that the Dask team is sponsored by NASA, among many others.  

Writing to netcdf4

Now that all of my flow fields have been interpolated to a finer mesh, I will want to write my flow files to disk. This can be done easily by utilizing the xarray library which can work directly with HDF5, NETCDF4, and Zarr file formats:

#Write everything, need to use Xarrays for this
import xarray as xr

flow_fine = xr.Dataset().chunk(chunks={"auto"})
flow_fine['Ra']=flow_coarse.Ra
flow_fine['Pr']=flow_coarse.Pr
flow_fine['Ro']=flow_coarse.Ro
flow_fine['gamma']=flow_coarse.gamma
flow_fine['time']=flow_coarse.time
flow_fine['timestep']=flow_coarse.timestep
flow_fine['temp'] = xr.DataArray(temp_fine)
flow_fine['uz'] = xr.DataArray(uz_fine)
flow_fine['uphi'] = xr.DataArray(uphi_fine)
flow_fine['ur'] = xr.DataArray(ur_fine) 

flow_fine.to_netcdf("flow0000.nc", format="NETCDF4",engine="netcdf4",mode="w")

One problem though, the interpolated dataset is 70GB in size:

print('Flow file summary:')
<xarray.Dataset> Size: 78GB
Dimensions:   (dim_0: 1538, dim_1: 1538, dim_2: 1028)
Dimensions without coordinates: dim_0, dim_1, dim_2
Data variables:
    Ra        float64 8B ...
    Pr        float64 8B ...
    Ro        float64 8B ...
    gamma     float64 8B ...
    time      float64 8B ...
    timestep  int32 4B ...
    temp      (dim_0, dim_1, dim_2) float64 19GB dask.array<chunksize=(255, 255, 255), meta=np.ndarray>
    uz        (dim_0, dim_1, dim_2) float64 19GB dask.array<chunksize=(255, 255, 255), meta=np.ndarray>
    uphi      (dim_0, dim_1, dim_2) float64 19GB dask.array<chunksize=(255, 255, 255), meta=np.ndarray>
    ur        (dim_0, dim_1, dim_2) float64 19GB dask.array<chunksize=(255, 255, 255), meta=np.ndarray>

We can utilize xarray's bundled zlib compression engine to reduce the file size that is written to disk:

encoding = {"temp": {"zlib": True, "complevel": 9},
            "ur": {"zlib": True, "complevel": 9},
            "uphi": {"zlib": True, "complevel": 9},
            "uz": {"zlib": True, "complevel": 9}}
flow_fine.to_netcdf("flow0000.nc", format="NETCDF4",engine="netcdf4",mode="w",encoding=encoding)

Here, Dask's distributed compute comes in handy as the compression can now be done parallel:

Image

The final flow field produced is only 125MB in size, but given that my compression level setting is high, and that most of my flow file is made up of sparse matrices, this is par for course. &nbdp;

Final thoughts

Machine learning has revolutionized a great many things, and chiefly among those are the multitude of convenient and powerful open-sourced Python libraries that are created to perform data analysis.  

If one wanted to do 3-D interpolation of very large datasets as a scientist or engineer not 10-years ago, the only options would have been either MATLAB, IDL or FORTRAN, two of which requires a paid license.  

MATLAB is bulky, bloated and resource heavy (try running a parpool instance and see how long it takes to initialise). IDL's exceedingly poor documentation, clunky syntax and vague error messages makes it frustrating to use. Imagine having to debug your code and all you get from IDL is 'Killed'. Dota 2 was free to play since 2013 and the IDL devs can't even be bothered to make a decent API documentation.  

FORTRAN while being open-sourced, requires a Linux operating system and an MPI implementation. Sure I had HDF5, NETCDF4 and OpenMPinstalled on my Linux VM, but guess how long it took me to build from source? A whole day. For context, developing (plagiarizing) theinterpolate_mesh.ipynb` notebook took me 2 days.  

After stumbling upon Dask, I have also developed a newfound appreciation for all the developers, maintainers and software engineers in the Scientific Python community. For nearly zero pay, they have managed to produce countless free and powerful libraries for the wider scientific community.

#HPC