DAG (Directed Acyclic Graph)

What is DAG?

Directed Acyclic Graph is a directed graph with no directed cycles. That is, it is formed by a collection of vertices and directed edges, each edge connecting one vertex to another, such that there is no way to start at some vertex v and follow a sequence of edges that eventually loops back to v again. - Wikipedia

Why we are discussing DAG in the space for Apache Spark?

When an "action" function (for example, count, reduce) is submitted to spark, the engine perform the job using stage oriented scheduling. DAG scheduler is responsible for the scheduling and orchestration. It has the following responsibility

  1. Creates DAG of RDD operations to perform the job

  2. Finds the minimum scheduling to perform the job

  3. Keeps track of the RDD's and stages that are materialized - which means actually run the transformations

  4. Determines the preferred location run to the task based on the current cache status

  5. Submits the stages as TaskSet to a low level Task Scheduler. The TaskScheduler is responsible for sending the tasks to the cluster, running them, retrying if there are failures, and mitigating stragglers. They return events to the DAGScheduler

  6. Handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted

Key Points

  • RDD operations are of 2 types - transformation and action.

    • Transformation on RDD gives back another RDD. It builds up data lineage. To spot a transformation, look at the return type of a function. If it is RDD type, the function is a transformation. Examples: map, filter, mapValues etc

    • Action operation kicks off a spark job. It terminates a data lineage. Example: collect, count, take, sample etc.

  • Transformation can be divided into 2 categories - narrow and wide

    • Narrow transformation are self contained. To compute a narrow transformation, spark needs data from a single partition.

    • Wide transformation requires spark to move data across cluster to compute.

  • Sequence of narrow transformations in a data lineage are chained a single pipeline

  • The chain of data the lineage is grouped into stages

    • No of stages = no of wide transformation + 1

  • The stages executed sequentially in general except the join operation. In join operation two upstream (before the join operation) data stages can execute in parallel.

  • Tasks within a stage run in parallel

    • No of tasks depends on the no of partitions in the RDD. Remember, in the pipeline of narrow transformation, the no of partitions do not change.

Below is a simple code to verify the order of function execution within a stage. The code block contains one wide operation - groupByKey. Before the wide operation, filter and map that are narrow operations, will be collapsed to a pipeline of operation. So a record in the rdd will pass through both map and filter function before the next record.

val rdd = sc.parallelize(Array("A", "B", "A", "C", "0"), 1)

//Keep no of partition to 1 to see what happens within a single partition

rdd

.filter{v =>

println(s"In filter for $v")

v != "0"

}

.map{v =>

println(s"In map for $v")

(v, 1)

}

.groupByKey()

.mapValues{values =>

println(s"In mapValues")

values.sum

}

.collect

Output:

In filter for A

In map for A

In filter for B

In map for B

In filter for A

In map for A

In filter for C

In map for C

In filter for 0

In mapValues

In mapValues

In mapValues

Observation:

  • Here we see the record A crosses both filter and map function then B cross those two functions.