Spark Jobs — Induce Parallelism

Ani
4 min readMay 27, 2022

Calculus, the electrical battery, the telephone, the steam engine, the radio — all these groundbreaking innovations were hit upon by multiple inventors working in parallel with no knowledge of one another. — Steven Johnson

comarch_ergo_roads_gis.jpg

Spark, my all time favourite ETL framework and for all the reasons to be the best ever we have witnessed. Low cost, high performant, fault tolerant baby!

What I am trying to put here might be covered by multiple people in different forums or some people are currently typing with me but I thought of penning down this with my own style.

I see spark as all the data-engineers(real one’s) bestie which is serving big data ETL for long time now. Spark is a framework which evaluates a command lazily unless an action is taken on it(not lecturing). It has a great analytical ability by which it then splits the action(Job) into multiple parts.

Now let us see how it does the processing? (Now lecturing). When we do a spark submit it then creates multiple jobs and run then one by one, yes you guessed it right it runs them sequentially and the stages inside each jobs run also in sequence. The one and only thing that spark runs in parallel are the tasks for each stage. This is the lowest granular element within a spark job. Each task takes a thread of JVM and they work on a single partition.

Well then where is the problem here?

Suppose you are dealing with a situation where you need to load a table with the below strategy say :

val join1= spark.sql("select * from tableA inner join tableB on tableA.id = tableB.id").saveAsTable("join1")val join2 = spark.sql("select * from tableC inner join tableD on tableC.id = tableD.id").saveAsTable("join2")val outPut = spark.sql("select * from join1 inner join join2 on join1.id = join2.id").saveAsTable("result")

In the above code block join1 and join2 can be run in parallel but due to the sequential model of spark execution join2 will be waiting for join1 to finish first.

Let us look at some practical example here :

All the saveAsTable commands in the above image will be creating handful of jobs and will be running sequentially. Let’s see the spark UI for the same. You can see the three jobs started sequentially.

Please double click on the below image and see

Now let us do some threading. We know when we work with JVM we have the flexibility to tweak how we can use threads for parallel thread operation. For example say you have a 8 Core CPU so that means you have at max 8 free threads to work with depending on the availability that can go from 0 to 8 any number.

Here is the catch! Let me do the same thing but in this case I am openning threads say 15. So whenever threads will be available it will pick up on job and execute them.

val range = 15
for (w <- 0 to range) {
val thread = new Thread {
override def run {

val df = spark.range(100000).toDF("id")

df.withColumn("range", lit(1 to 10 toArray))
.write.mode("overWrite").format("parquet").saveAsTable("number")

customerDf.write.format("parquet").mode("overwrite").save("/tmp/anirban/simpsonsDF_files")

employeeDf.createOrReplaceTempView("employee")

spark.table("employee").write.mode("overWrite").format("parquet").saveAsTable("emp")
}
}
thread.start
}

Let’s see the Spark UI to confirm this.

Wow! You can see spark created so many jobs in parallel and triggred them in 15 threads. Please zoom in and see the magic.

Event time line also looks so obvious. Almost all of the jobs are contemporary.

This is true useful when you need to perform parallel operations in spark and save time and you have adequate resources available to do so.

For any type of help regarding career counselling, resume building, discussing designs or know more about latest data engineering trends and technologies reach out to me at anigos.

P.S : I don’t charge money

--

--

Ani

Big Data Architect — Passionate about designing robust distributed systems