Why Dask ?
Most of the analyst use Pandas, Numpy, Scikit-Learn to analyze data But these tools are not designed to scale beyond a single machine. As these tools require more time to execute for the bigger datasets Dask provides solution to this. Dask scale tools like Pandas, Scikit-Learn and Numpy workflows more natively with minimal changes in code.
Dask provides advanced parallelism for analytics, enabling performance at scale for the tools you love
Dask is easy to adopt as you can use what already you know without needing to learn a new big data tool like Hadoop or Spark. Dask figures out how to break up large computations and route parts of them efficiently onto distributed hardware.
You can say that cluster is not always a right choice ! Yes, you are right
Debugging errors and analyzing performance is simpler and more pleasant on a single machine. So for that dask allows you to swap out the cluster for single-machine schedulers which are surprisingly lightweight, require no setup, and can run entirely within the same process as the user’s session.
The use cases will give you overview of how peoples use Dask in practice. Readers can go through this and find new ways to implement dask.
- > Arrays : Parallel NumPy
- > Bags : Parallel lists
- >DataFrames : Parallel Pandas
- >Machine Learning : Parallel Scikit-Learn
- > Delayed: Parallel function evaluation
- > Futures: Real-time parallel function evaluation
This document helps you to decide which user interface best suits your needs.
We will look at some of these interfaces just for the introduction purpose.
Dask arrays coordinate many Numpy arrays, arranged into chunks within a grid. Dask array cuts the large array into the small arrays.This allows us to compute arrays larger than memory by using all the cores of system.
Let's look at some code -
create numpy array of 10000 rows by 10000 columns
import numpy as np x = np.random.random((10000, 10000)) x
create dask array of 10000 rows by 10000 columns divided into 1000 by 1000 numpy arrays.
import dask.array as da a = da.random.random((10000, 10000), chunks=(1000, 1000)) a
perform operation on numpy array
%%time y = x + x.T z = y[::2, 5000:].mean(axis=1) z
time required is : Wall time: 2.91 s
perform operation on Dask array but action is not performed as dask performs operation on compute( ).
y = a + a.T z = y[::2, 5000:].mean(axis=1) z
time required is : Wall time: 1.21s
As you can see dask array taking less time to do computations. Generally it enhances performance by 100X for bigger arrays. As it divides whole array into subparts of numpy arrays and perform execution parallelly.
Dask bags coordinate many Python lists or Iterators, each of which forms a partition of a larger collection. Bag helps us with operations on structured or semistructured data like Json files,text data etc.
import random import dask.bag as db
Random list of 20 numbers is generated and then it gets divided into 3 partitions.
x=list(random.randrange(1, 50, 1) for i in range(20))
The function is applied on these 3 partitions(3 lists) parallelly So the time required is :
%%time a.map(lambda x:x**4).compute()
Wall time: 771 ms
From above example we can see that we there is not much change in python code. And we are dividing long list into partitions and then performing operations on it parallely
We have seen that dask array is collection of many small numpy arrays similar to this Dask dataframe is the combination of many small pandas dataframe. These smaller dataframes may present on single machine or can be on multiple machines allowing us to store datasets larger than memory of the system. Each operation on the dask dataframe gets performed parallelly on these smaller dataframes.
importing essential libraries
import pandas as pd import random from timeit import timeitimport dask.dataframe as dd
# list of 1000000 numbers is created numbersData = [random.randint(0, 9999) for _ in range(1000000)]df = pd.DataFrame(numbersData, columns=['number'])
Now the dataframe is ready with one column "number" so we will apply function to check if the value of number is even or odd . So the time required for this using pandas dataframe is :
%time result=df.apply(lambda x:x%2==0)
Wall time: 24.9 ms
Now we are using dask dataframe which contains 4 small pandas dataframes
df = dd.from_pandas(df, npartitions=4)
Perform same operation as above. There is little bit change in code else everything is same.
Wall time: 9.93 ms
start dask client (optional). It will provide a dashboard which is useful to gain insight on the computation. You can click on the dashboard link after executing following command
from dask.distributed import Client, progress client = Client(threads_per_worker=4, n_workers=1) client
Now just for demonstration purpose basic functions are written
import time def decrement(x): time.sleep(1) return x - 1 def increment(x): time.sleep(1) return x + 1 def multiply(x, y): time.sleep(1) return x * y
Run them like normal functions
%%time y = decrement(20) x = increment(20) z = multiply(x, y) z
Wall time: 3 s
Above code executed functions one after another in a sequence. And as we can clearlly see that decrement(20) and increment(20) don't have any relation so we can run them in parallel
Now Let's make our functions lazy using dask.
import dask decrement = dask.delayed(decrement) increment = dask.delayed(increment) multiply = dask.delayed(multiply)
Now our functions are lazy so task will be recorded in graph and run parallelly on hardwa
y = decrement(20) x = increment(20) z = multiply(x, y) z
Let's visuralize the execution
Wall time: 2.06 s
From above we can see that dask required less time as it executed tasks parallelly and python did that sequentially.
I have illustrated that adding parallel processing to your data science workflow is trivial with Dask.
If you are working with dask then share about it in the comment Box ! Happy Dasking ...