Leveraging Dask and Xarray for 3-D Interpolation
Edit
(30-01-2025)
Ok, I have made a grave mistake of only interpolating in one axis in each direction (whoops). I am still working on a solution that does NOT involve the use of for
loops (which I hate) and will hopefully give an update regarding this in a future blog post.
However, I believe the use of dask
is important in speeding up data analysis. In fact, I have recently used dask
arrays and parallelism to perform time-area averages of rotating Rayleigh-Benard Convection flow fields. Again, will give an update in another blog post soon.
Till then, feel free to use my code :>
TL;DR
Array too large, RAM go boom.
Solution:
Use
Dask
to:- start your own distributed cluster
- parallelize code
- chunk arrays
- perform
numpy
like operation
Use
Xarray
:- compress data
- With
Dask
, compression becomes parallelized. - write data to NETCDF4 format
My Jupyter Notebook that performs 3-D interpolation of large CFD datasets can be found here
Adapt to your own needs.
Problem
A common problem that arises during post-processing of numerical simulation data is the need to read, manipulate, process and write extremely large datasets (for my case, up to ~2GB for each timestep) quickly and in a memory efficient manner.
As part of my PHD project, I am expected to carry out simulations of turbulent Rayleigh-Bénard Convection using the fourth-order accurate finite volume scheme numerical simulation code goldfish. Due to the lack of adaptive mesh refinement (AMR) implementation in goldfish
, I often had to interpolate flow field data from a coarse mesh to a finer mesh.
For moderate dimensions, e.g. in cylindrical coordinates, this is usually no problem. However, I started to run into memory problems when interpolating to a grid with very large dimensions in the order of elements.
With my measly 16GB of RAM on my local machine (upgrade soon), there is no way scipy
and numpy
can load 19GB of temperature data array onto physical memory. I desperately needed a convenient array chunking solution.
From Dask
till dawn
Allow me to introduce to you the best Python library I have ever used:
This nifty Python library allows one to conveniently chop a large numpy.ndarray
into a dask.array
object, which holds the address of all the chunked numpy
arrays. For example, in my interpolation ipynb
, loading the temperature field from the netcdf4
flow file 'flow0037.nc':
import dask.array as da
import netCDF4
f_old = netCDF4.Dataset('flow0037.nc', 'r')
temp_coarse = da.from_array(f_old['temp'])
print(temp_coarse)
yields a Dask
array object instead of a numpy
array:
Output:
dask.array<array, shape=(386, 474, 132), dtype=float64, chunksize=(356, 356, 132), chunktype=numpy.MaskedArray>
Notice that:
- The array is now chunked into a chunk size (chunk space) of
(356, 356, 132)
- Since the returned variable is an object, conversion into a
numpy.ndarray
will require the following line:
# Get numpy array back from Dask array object
temp_coarse.compute()
- This is because
Dask
does everything lazily, i.e. objects remain as objects and functions remains as functions until the.compute()
function is specified.
- The individual chunks are of type
numpy.MaskedArray
, which should be avoided as masked arrays cannot bepickled
.
The brilliant part about Dask
arrays is the ability to perform most numpy.ndarray
methods. For instance, we want to get the mean of an array. This can be done by using the .mean()
method as follows:
test = da.zeros([12, 12, 12])
testmaxval = test.max()
print(test)
print(testmaxval)
print(testmaxval.compute())
Output:
dask.array<zeros_like, shape=(12, 12, 12), dtype=float64, chunksize=(12, 12, 12), chunktype=numpy.ndarray>
dask.array<max-aggregate, shape=(), dtype=float64, chunksize=(), chunktype=numpy.ndarray>
0.0
Finally, Dask
also allows one to start their own distributed computing cluster. With bokeh
installed, I can look at cool stats about my sucky AMD Ryzen 9 mini-ITX PC:
interpolate_mesh.ipynb
code explaination
This Jupyter notebook was originally based on an IDL
script written by my Ph.D. supervisor to interpolate temperature and velocity field from a coarse grid to a finer grid. I will be using a Python function translated from the SPLINE
module from IDL
to interpolate my flow field data.
Import libraries
import numpy as np
from scipy.interpolate import griddata
import netCDF4
from dask.distributed import Client
import dask.array as da
import dask
Prerequisites are outlined in the Github repository.
Start Dask client
As mentioned prior, this starts a distributed compute cluster on your local machine. n_workers
allocates the number of workers to use (set this according to the available physical cores) and memory_limit
limits the physical memory one worker can use.
client=Client(n_workers=16,
threads_per_worker=2,
memory_target_fraction=0.95,
memory_limit='1GB')
With bokeh
installed, one can monitor the workers by pasting the IP address of the local host into the browser ('localhost:8787/status'
for me):
Reading coarse mesh coordinates from netcdf4 files
goldfish
stores the coordinates of grid points once in 'geometry.nc'
while flow files containing simulation parameters (), current timestep, and temperature and velocity fields are stored in 'flowxxxx.nc'
.
Here, it is important to use the Dask.array.from_array()
function to load the Dask
arrays. We want to minimize 'transfer' operations as much as possible, as anytime not spent on computation is a waste. Refer to here for best practices on loading data.
g_new = netCDF4.Dataset('geometry.nc', 'r')
zu_fine = da.from_array(g_new['z_uz'])
zp_fine = da.from_array(g_new['z_p'])
dz_fine = da.from_array(g_new['delta_z_p'])
rp_fine = da.from_array(g_new['r_p'])
rw_fine = da.from_array(g_new['r_ur'])
dr_fine = da.from_array(g_new['delta_r_p'])
dphi_fine = da.from_array(g_new['delta_phi'])
dimz_fine = len(g_new.dimensions['dim_z'])
dimphi_fine = len(g_new.dimensions['dim_phi'])
dimr_fine = len(g_new.dimensions['dim_r'])
height_fine = da.max(zp_fine)
phi_fine = da.arange(dimphi_fine) * dphi_fine - np.pi - dphi_fine - dphi_fine / 2.0
Rinse and repeat this to read in the fine grid coordinates and the coarse grid flow fields.
SPLINE
The spline(X,Y,T,sigma=1.0)
is a 1-D cubic interpolation function mirroring that of the SPLINE
module in IDL
and takes in 3 arguments:
- X: Array containing the coarse grid coordinates in one direction (e.g. in the direction),
- Y: Flow field data in one direction, must have the same shape as X,
- T: Array containing the target (fine) grid coordinates.
This Python function is translated by StackOverflow user Swike in his answer here.
Parallelizing independent functions
A 3-dimensional grid will have, of course, 3 columns of elements for the direction respectively. goldfish
runs numerical simulations in the cylindrical coordinate system, as such grid coordinates are stored in format.
In either case it is obvious that parallelizing the interpolation in all 3 directions will speed up progress immensely. Consider the following test case which interpolates a grid into a grid:
import time
testc=da.linspace(0,1,num=10)
testf=da.linspace(0,1,num=100)
testrng = da.random.default_rng()
start_time = time.time()
test = da.zeros([100, 100, 100])
testvals = testrng.standard_normal(10)
test[:, 0, 0] = spline(testc,testvals,testf)
testvals = testrng.standard_normal(10)
test[0, :, 0] = spline(testc,testvals,testf)
testvals = testrng.standard_normal(10)
test[0, 0, :] = spline(testc,testvals,testf)
end_time = time.time()
print(end_time-start_time)
This took 28.775
seconds.
To speed this up, we want to utilize Dask
's delayed
library which allows us to parallelize the interpolation:
testc=da.linspace(0,1,num=10)
testf=da.linspace(0,1,num=100)
testrng = da.random.default_rng()
start_time = time.time()
@dask.delayed
def testfunc(cc,vals,ff):
return spline(cc,vals,ff)
@dask.delayed
def output(arr1,arr2,arr3):
test=da.zeros([100,100,100])
test[:,0,0]=arr1
test[0,:,0]=arr2
test[0,0,:]=arr3
return test
task1 =testfunc(testc,testrng.standard_normal(10),testf)
task2 =testfunc(testc,testrng.standard_normal(10),testf)
task3 =testfunc(testc,testrng.standard_normal(10),testf)
combineheuhue = output(task1,task2,task3)
# Combine the tasks and compute them in parallel
results = combineheuhue.compute()
end_time = time.time()
print(end_time-start_time)
This took a blinding 0.188
seconds!!
I cannot understate how shocked I was.
Interpolating my huge flow files from a grid to a grid took me less than 1 second:
@dask.delayed
def splinewrapper(finearr,data,coarsearr):
return spline(finearr,data,coarsearr)
@dask.delayed
def output(arrz,arrr,arrphi):
arrout=da.zeros([dimz_fine,dimr_fine,dimphi_fine])
arrout[0,:,0]=arrz
arrout[0,0,:]=arrr
arrout[:,0,0]=arrphi
return arrout
tempr =splinewrapper(rp_coarse,temp_coarse[0,:,0],rp_fine)
tempphi =splinewrapper(phi_coarse,temp_coarse[0,0,:],phi_fine)
tempz =splinewrapper(zu_coarse,temp_coarse[:,0,0],zu_fine)
tempf = output(tempr,tempphi,tempz)
# Combine the tasks and compute them in parallel
temp_fine = tempf.compute()
# Print the shapes of the computed arrays
print(temp_fine.shape)
"What the actual f-?" was all that was going through my head at that time. It impressed me even more that the Dask
library was first released in 2015 and that the Dask
team is sponsored by NASA, among many others.
Writing to netcdf4
Now that all of my flow fields have been interpolated to a finer mesh, I will want to write my flow files to disk. This can be done easily by utilizing the xarray
library which can work directly with HDF5
, NETCDF4
, and Zarr
file formats:
#Write everything, need to use Xarrays for this
import xarray as xr
flow_fine = xr.Dataset().chunk(chunks={"auto"})
flow_fine['Ra']=flow_coarse.Ra
flow_fine['Pr']=flow_coarse.Pr
flow_fine['Ro']=flow_coarse.Ro
flow_fine['gamma']=flow_coarse.gamma
flow_fine['time']=flow_coarse.time
flow_fine['timestep']=flow_coarse.timestep
flow_fine['temp'] = xr.DataArray(temp_fine)
flow_fine['uz'] = xr.DataArray(uz_fine)
flow_fine['uphi'] = xr.DataArray(uphi_fine)
flow_fine['ur'] = xr.DataArray(ur_fine)
flow_fine.to_netcdf("flow0000.nc", format="NETCDF4",engine="netcdf4",mode="w")
One problem though, the interpolated dataset is 70GB in size:
print('Flow file summary:')
<xarray.Dataset> Size: 78GB
Dimensions: (dim_0: 1538, dim_1: 1538, dim_2: 1028)
Dimensions without coordinates: dim_0, dim_1, dim_2
Data variables:
Ra float64 8B ...
Pr float64 8B ...
Ro float64 8B ...
gamma float64 8B ...
time float64 8B ...
timestep int32 4B ...
temp (dim_0, dim_1, dim_2) float64 19GB dask.array<chunksize=(255, 255, 255), meta=np.ndarray>
uz (dim_0, dim_1, dim_2) float64 19GB dask.array<chunksize=(255, 255, 255), meta=np.ndarray>
uphi (dim_0, dim_1, dim_2) float64 19GB dask.array<chunksize=(255, 255, 255), meta=np.ndarray>
ur (dim_0, dim_1, dim_2) float64 19GB dask.array<chunksize=(255, 255, 255), meta=np.ndarray>
We can utilize xarray
's bundled zlib
compression engine to reduce the file size that is written to disk:
encoding = {"temp": {"zlib": True, "complevel": 9},
"ur": {"zlib": True, "complevel": 9},
"uphi": {"zlib": True, "complevel": 9},
"uz": {"zlib": True, "complevel": 9}}
flow_fine.to_netcdf("flow0000.nc", format="NETCDF4",engine="netcdf4",mode="w",encoding=encoding)
Here, Dask
's distributed compute comes in handy as the compression can now be done parallel:
The final flow field produced is only in size, but given that my compression level setting is high, and that most of my flow file is made up of sparse matrices, this is par for course. &nbdp;
Final thoughts
Machine learning has revolutionized a great many things, and chiefly among those are the multitude of convenient and powerful open-sourced Python
libraries that are created to perform data analysis.
If one wanted to do 3-D interpolation of very large datasets as a scientist or engineer not 10-years ago, the only options would have been either MATLAB
, IDL
or FORTRAN
, two of which requires a paid license.
MATLAB
is bulky, bloated and resource heavy (try running a parpool
instance and see how long it takes to initialise). IDL
's exceedingly poor documentation, clunky syntax and vague error messages makes it frustrating to use. Imagine having to debug your code and all you get from IDL is 'Killed'. Dota 2 was free to play since 2013 and the IDL
devs can't even be bothered to make a decent API documentation.
FORTRAN
while being open-sourced, requires a Linux operating system and an MPI
implementation. Sure I had HDF5
, NETCDF4 and
OpenMPinstalled on my Linux VM, but guess how long it took me to build from source? A whole day. For context, developing (plagiarizing) the
interpolate_mesh.ipynb` notebook took me 2 days.
After stumbling upon Dask
, I have also developed a newfound appreciation for all the developers, maintainers and software engineers in the Scientific Python community. For nearly zero pay, they have managed to produce countless free and powerful libraries for the wider scientific community.