Fast Your Task With Dask

#datascience #dask #introductory

Prithviraj Patil Sept 22 2020 · 4 min read
Share this

Why Dask ?

Most of the analyst use Pandas, Numpy, Scikit-Learn to analyze data But these tools are not designed to scale beyond a single machine. As these tools require  more time to execute for the bigger datasets Dask provides solution to this. Dask scale tools like Pandas, Scikit-Learn and Numpy workflows more natively with minimal changes in code.

Dask provides advanced parallelism for analytics, enabling performance at scale for the tools you love

- https://dask.pydata.org/en/latest/ 

Dask is easy to adopt as you can use what already you know without needing to learn a new big data tool like Hadoop or Spark. Dask figures out how to break up large computations and route parts of them efficiently onto distributed hardware.

You can say that cluster is not always a right choice ! Yes, you are right

Debugging errors and analyzing performance is simpler and more pleasant on a single machine. So for that dask allows you to swap out the cluster for single-machine schedulers which are surprisingly lightweight, require no setup, and can run entirely within the same process as the user’s session.

The use cases will give you overview of how peoples use Dask in practice. Readers can go through this and find new ways to implement dask.

User interfaces

High-Level

- > Arrays : Parallel NumPy
- > Bags : Parallel lists
- >DataFrames : Parallel Pandas
- >Machine Learning : Parallel Scikit-Learn        

Low-Level

- > Delayed: Parallel function evaluation
- > Futures: Real-time parallel function evaluation

This document helps you to decide which user interface best suits your needs.
We will look at some of these interfaces just for the introduction purpose.

install and setup  dask and get ready to get fast with Dask  .

Arrays

Dask arrays coordinate many Numpy arrays, arranged into chunks within a grid. Dask array cuts the large array into the small arrays.This allows us to compute arrays larger than memory by using all the cores of system.

Let's look at some code -

create numpy array of 10000 rows by 10000 columns

import numpy as np
x = np.random.random((10000, 10000))
x

create dask array of 10000 rows by 10000 columns divided into 1000 by 1000 numpy arrays.

import dask.array as da
a = da.random.random((10000, 10000), chunks=(1000, 1000))
a

perform operation on numpy array 

%%time
y = x + x.T
z = y[::2, 5000:].mean(axis=1)
z

time required is : Wall time: 2.91 s

perform operation on Dask array  but action is not performed as dask performs operation on compute( ).

y = a + a.T
z = y[::2, 5000:].mean(axis=1)
z
%%time
z.compute()

time required is : Wall time: 1.21s

As you can see dask array taking less time to do computations. Generally it enhances performance by 100X for bigger arrays. As it divides whole array into subparts of numpy arrays and perform execution parallelly.

Bags

Dask bags coordinate many Python lists or Iterators, each of which forms a partition of a larger collection. Bag helps us with operations on structured or semistructured data like Json files,text data etc.

import random
import dask.bag as db

Random list of 20 numbers is generated and then it gets divided into 3 partitions.

x=list(random.randrange(1, 50, 1) for i in range(20))
a=db.from_sequence(x,npartitions=3)
a

dask.bag<from_se..., npartitions=3>

The function is applied on these 3 partitions(3 lists) parallelly So the time required is :

%%time
a.map(lambda x:x**4).compute()

Wall time: 771 ms

a.map(lambda x:x**4).visualize()

From above example we can see that we there is not much change in python code. And we are dividing long list into partitions and then performing operations on it parallely

Dataframes

We have seen that dask array is collection of many small numpy arrays similar to this Dask dataframe is the combination of many small pandas dataframe. These smaller dataframes may present on single machine or can be on multiple machines allowing us to store datasets larger than memory of the system. Each operation on the dask dataframe gets performed parallelly on these smaller dataframes.

importing essential libraries

import pandas as pd
import random
from timeit import timeit
import dask.dataframe as dd
# list of 1000000 numbers is created
numbersData = [random.randint(0, 9999) for _ in range(1000000)]
df = pd.DataFrame(numbersData, columns=['number'])

Now the dataframe is ready with one column "number"  so we will apply function to check if the value of number is even or odd . So the time required for this using pandas dataframe is :

%time result=df.apply(lambda x:x%2==0)

Wall time: 24.9 ms

Now we are using dask dataframe which contains 4 small pandas dataframes 

df = dd.from_pandas(df, npartitions=4)

Perform same operation as above. There is little bit change in code else everything is same.

result=df.map_partitions(lambda x:x%2==0)

Wall time: 9.93 ms

DASK delayed

start dask client (optional). It will provide a dashboard which is useful to gain insight on the computation. You can click on the dashboard link after executing following command

from dask.distributed import Client, progress
client = Client(threads_per_worker=4, n_workers=1)
client

Now just for demonstration purpose basic functions are written

import time

def decrement(x):
    time.sleep(1)
    return x - 1

def increment(x):
    time.sleep(1)
    return x + 1

def multiply(x, y):
    time.sleep(1)
    return x * y

Run them like normal functions

%%time
y = decrement(20)
x = increment(20)
z = multiply(x, y)
z

Wall time: 3 s

Above code executed functions one after another in a sequence. And as we can clearlly see that decrement(20) and increment(20) don't have any relation so we can run them in parallel

Now Let's make our functions lazy using dask.

import dask
decrement = dask.delayed(decrement)
increment = dask.delayed(increment)
multiply = dask.delayed(multiply)

Now our functions are lazy so task will be recorded in graph and run parallelly on hardwa

y = decrement(20)
x = increment(20)
z = multiply(x, y)
z

Delayed('multiply-e1f5f50c-e2f5-4b41-9e9c-ffa03a85334f')

Let's visuralize the execution

z.visualize()
%%time
z.compute()

Wall time: 2.06 s

From above we can see that dask required less time as it executed tasks parallelly and python did that sequentially.

I have illustrated that adding parallel processing to your data science workflow is trivial with Dask.
If you are working with dask then share about it in the comment Box ! Happy Dasking ...

Referred Resources

https://docs.dask.org/en/latest
https://towardsdatascience.com
https://www.data-blogger.com
https://www.analyticsvidhya.com

Comments
Read next