Hotwire Tech Blog

Scribes from Hotwire Engineering

Overview

We will look into running Jobs on Spark cluster and configuring the settings to fine tune a simple example to achieve significantly lower runtimes. We will also allude to the trade off between setting number of tasks per executors and number of executors per node given a cluster node configuration.

Spark Cluster Abstraction

This diagram (source) shows the key components of the cluster. Spark driver is the program you write. The Driver program demands for executors from the Master (the Worker nodes lets the Master know how much resource they each have). The Master allocates the resources and provides the Driver with the required number of executors. As you can tell from the flowchart, some of the key variables we can set are the number of executors per worker, number of tasks per executor and memory per executor, given a cluster.

AWS EMR Setup

You can use the AWS CLI or the Web Console to create the clusters. For the purposes of this experiment we used the following nodes

  • 2 m3.xlarge core nodes
  • 2 r3.8xlarge task nodes

Run time will vary depending on the configuration and number of nodes.

A Simple Example

We are going to use a simple example to play around the Spark configurations. Let us start with a code we would write to sum up a billion random numbers. This is what it would look like if you are running it on your laptop without any parallelization:

from random import random
from datetime import datetime
if __name__ == "__main__":
    start = datetime.now() # to time the task
    sum = 0.0
    for i in xrange(1000000000):
        sum += random() # sum the random numbers
    stop = datetime.now() # to time the task
    print "Sum :", sum
    print "Time taken: ", (stop-start).total_seconds()

If you run it on your laptop, it might take around ~140-150 seconds (depending on the specifications of your laptop).

Now let’s write a pyspark script to run on the spark cluster we just created.

from random import random
from operator import add
from pyspark import SparkContext

if __name__ == "__main__":
    sc = SparkContext(appName = "Randomizer")
    def f(_):
        return random()
  
  # This will split the task of summing 1B numbers into 100 tasks. 
        # Spark decides how to distribute the task of running func f 
  # (which is just calling the random() function)
    count = sc.parallelize(xrange(1000000000), 100).map(f).reduce(add)
    print "Sum: ", count

To run this with Spark type in your command after SSH-ing in to you master node:

spark-submit pyspark_random_sum.py

You will see a lot of Yarn (the scheduler) output and towards the end you will see the result which looks like something like this:

16/05/06 17:50:09 INFO DAGScheduler: Job 0 finished: reduce at 
/home/hadoop/<username>/pyspark_random_sum.py:13, 
took 22.375376 s Sum: 499962099.419

Tracking Spark Jobs

Now that you can submit Spark jobs, we would want to know how are the resources being used. How many executors are being created? How many tasks are executed parallely by each executor?

Hadoop Job Tracker

After starting an ssh tunnel to your master node and setting the foxyproxy settings on your browser (as shown below when you click on “Enable Web Connection”),  type in the address <your_master_node_ip>:8088 (for example in this case 10.139.21.110:8088)

image2016-5-6 14-57-38

You will see the Hadoop Job Tracker as shown here:

image2016-5-6 16-35-43

As you can see, the top row Cluster Metrics provides the summary of the nodes we have. Some key metrics are number of VCores and memory present and used. Below that we can see the list of Applications which are the jobs that run on this cluster. In this case the most recent one on top (Job _0069) is the program we just ran.

Spark Job Tracker

The quickest way to get to the job tracker of the application is to click on ApplcationMaster or  History  for the Spark job you started. In this case if you click it for the Randomizer application you will end up on a page like this:

spark_dash

This page gives the summary of the of the job running. If you have more complicated pyspark programs, you might see multiple active and inactive jobs. For this experiment, we can see that a total of 100 tasks were assigned and the task progress bar on the right shows how much of the job was completed. You can venture deeper to get more details on the tasks and evaluators spawned for your job by clicking on the Description as shown in the image above. This will land you on a page like this:

image2016-5-6 17-42-43

Summary Metrics shows the time taken for each of the (100 in this case) tasks and GC (which stands for Garbage Collection). This is followed by the list of executors spawned on each node. Each row gives the summary of the time taken and the number of tasks executed by each executor. There is even a visualization to see the timeline of the executors spawned and the tasks executed by each when you click on Event Timeline:

image2016-5-6 17-59-29

If you want to check the history of Spark jobs run without using the Hadoop job tracker, you can directly go to the port 18080. For example, 10.139.21.110:18080. On this page you can navigate to pages shown above for any historical job you might have run earlier.

image2016-5-6 18-3-22

Configuring Spark

Viewing Default Spark Settings

From any of the Spark application pages you can access the settings that was used for that particular job to run. If you haven’t tinkered around with any of the settings or parameters you can click on Environment as shown here:

spark_dash2

Then you will get a page with list of environment variables set for this Spark job as shown here:

spark_dash3

You can find key variables like spark.executor.cores used for the job. The term “core” is misleading here. It actually means the maximum number of tasks each executor would run. This page does not contain an exhaustive list of all the environment variables. As you will see in next section, when you set some other variables, they will appear in this list.

Configuring using spark-submit command

Many of the Spark configurations can be set using the spark-submit command. Run spark-submit –help to see the list of options. If you need to set a configuration you don’t find here, you can use the –conf PROP=VALUE where PROP could be any spark configuration. This documentation shows how to use it, along with the list of configurations that can be set.

spark-submit --num-executors 6 pyspark_random_sum.py

We will see this, when we visit the Environment page for this job:

spark_dash4

You can see that a new entry has appeared for the spark.executor.instances variable.

Let’s Experiment!

We ran a few Spark jobs using the simple example to benchmark the runtime. First we ran a Spark job to sum 1 random number. This is to find out the overhead of running a Spark job. And because Spark uses Hadoop there is an overhead involving running a Hadoop job. Then we ramped up the number of random numbers we wanted to sum and tried different number of tasks which are set in sc.parallelize() function in the code.

Disclaimer: The runtimes are subject to change depending on other jobs running on the cluster and/or the configuration of the cluster.

The command that was used to perform these set of experiments is:

spark-submit pyspark_random_sum.py

In other words default Spark settings were used, except for modifying the number of tasks parameter in sc.parallelize(). Here is a matrix detailing the experiment we ran.

Total Random Numbers summed Number of Tasks Hadoop Time (s) Spark Time (s)
1 1 13 7

This is kind of our best case scenario or lowest runtime we can achieve. For summing more than 1 number we probably can’t do any better than this. Let’s try something bigger.

Total Random Numbers summed Number of Tasks Hadoop Time (s) Spark Time (s)
10,000 1000 18 11

More tasks should definitely help solve larger problems, right?

Total Random Numbers summed Number of Tasks Hadoop Time (s) Spark Time (s)
10,000 10,000 29 21

Shouldn’t more tasks actually reduce the time? There is cost in splitting a job into many tasks. This is where it becomes more expensive (time wise) to split into many tasks. So let us try with fewer number of tasks.

Total Random Numbers summed Number of Tasks Hadoop Time (s) Spark Time (s)
10,000 100 15 8

Wow, that is close to the time it took to sum 1 number. Let’s use even fewer tasks.

Total Random Numbers summed Number of Tasks Hadoop Time (s) Spark Time (s)
10,000 10 15 8

Looks like we plateaued. But seems like 10 tasks would be enough.

Let’s try with even larger set of random numbers:

Total Random Numbers summed Number of Tasks Hadoop Time (s) Spark Time (s)
1M 10 13 7
10M 10 14 7

So far so good.

Total Random Numbers summed Number of Tasks Hadoop Time (s) Spark Time (s)
100M 10 18 11
1B 10 37 30

Now, can we reduce the time for summing a 1 billion random numbers? Let us try increasing the number of tasks.

Total Random Numbers summed Number of Tasks Hadoop Time (s) Spark Time (s)
1B 100 40 32

No good, maybe more task?

Total Random Numbers summed Number of Tasks Hadoop Time (s) Spark Time (s)
1B 500 26 19

Seems like we are getting somewhere.

Total Random Numbers summed Number of Tasks Hadoop Time (s) Spark Time (s)
1B 1000 24 18

Not much of an improvement there.

Here we managed to experiment with one parameter of Spark job, namely the total number of tasks. You can experiment with other parameters like –num-executors (the total number of executors), –executor-cores (number of tasks per executor) and many more. So we tried different number of executors and when we ran

spark-submit --num-executors 10 pyspark_random_sum.py

We got:

Total Random Numbers summed Number of Tasks Hadoop Time (s) Spark Time (s)
1B 500 23 9

That is a huge improvement for summing a billion numbers. By default, 21 executors were used. This is Spark’s dynamic allocation which is set by default. When we set configurations like number of executors, dynamic allocation is turned off.

Lessons

Here are a few things we learnt after this exercise:

  1. Given a large embarrassingly parallel problem, we can use Spark to bring down the runtime significantly. We started with a program that takes around 150 seconds on our laptop, to 21 seconds on Spark (with default settings) and finally down to 9 seconds after playing with two parameters. Depending on the problem, number and type of nodes and the way we write Spark code can make a huge difference too.
  2. The key parameters which could influence the performance of your Spark job are number of executors, number of tasks per executor and amount of memory per executor (we did not need to experiment with this as our example was not memory intensive).
  3. Each Spark application would need tuning to achieve high performance. By default, Spark’s dynamic allocation sets the number of executors depending on the amount of parallelization you want. For our example, for lesser number of tasks (100), Spark used 10 executors but for more tasks (500), it used 21.
  4. While starting an EMR cluster, you can enable maximizeResourceAllocation.
    config.json
    [{"classification":"spark",
    "properties":{"maximizeResourceAllocation":"true"}}]

    More details here (scroll down to “To set maximizeResourceAllocation“). We haven’t benchmarked this yet. But let us know if you have already tried it out.