Spark Memory Management

Spark Memory Management

Exercises for Spark Internals.

Goal of this exercise is to learn the following

    • How to optimize the shuffle using local aggregation
    • Control parallelism by controlling partitions of RDD
    • How to optimize caching

Here is the test dataset. Download, unzip and save it in "stocks" folder in your HDFS home directory.

Controlling Partitions of RDD

    1. Identify the narrow and wide operations in the following statement and from spark Web UI find the number of stages and the number of tasks for a job for the following
val stocks = sc.textFile("stocks")
stocks
    .repartition(10)
    .filter(!_.startsWith("date"))
    .map(_.split(","))
    .map(tokens => (tokens(7), tokens(5).toDouble))
    .groupByKey()
    .mapValues(values => values.sum / values.size)
    .coalesce(1)
    .count
    1. Control the number of partitions while reading data from source
sc.textFile("stocks").partitions.size // Expected value is 2 as min number of spark partitions for HDFS datasource is 2.
sc.textFile("stocks", 7).partitions.size // Expected value 7
    1. Verify that the no of partitions of shuffled rdd is same as that of parent when num partitions argument is not passed in wide operation and default level of parallelism is not set.
val stocks = sc.textFile("stocks")
stocks
    .repartition(11)
    .filter(!_.startsWith("date"))
    .map(_.split(","))
    .map(tokens => (tokens(7), tokens(5).toDouble))
    .groupByKey()
    .partitions.size // Expected value is 11 since the parent RDD before groupByKey operation has 11 partitions created through repartitioning.
    1. Control the number of partitions using numPartitions argument of the wide operation
val stocks = sc.textFile("stocks")
stocks
    .filter(!_.startsWith("date"))
    .map(_.split(","))
    .map(tokens => (tokens(7), tokens(5).toDouble))
    .groupByKey(18)
    .partitions.size // Expected value is 18
    1. Control the number of partitions using default level of parallelism property

Launch spark with default parallelism as 13

$ spark-shell --conf spark.default.parallelism=13 --verbose
val stocks = sc.textFile("stocks", 7)
stocks
    .filter(!_.startsWith("date"))
    .map(_.split(","))
    .map(tokens => (tokens(7), tokens(5).toDouble))
    .groupByKey()
    .partitions.size // Expected value is 13
    1. Control the number of partitions using re-partition and coalesce function. Note: Use repartition to increase the number of partitions and use coalesce to reduce the number of partitions.
val stocks = sc.textFile("stocks")
stocks.repartition(12).partitions.size //Expected value is 12
stocks.coalesce(3).partitions.size // Expected value is 3

Reduce Shuffle Using reduceByKey function

Control the data shuffling using reduceByKey function that uses local aggregation prior to global aggregation

First create a group by operation to sum volumes of stock trading for each symbol using groupByKey operation

val stocks = sc.textFile("stocks")
stocks
    .filter(!_.startsWith("date"))
    .map(_.split(","))
    .map(tokens => (tokens(7), tokens(5).toDouble))
    .groupByKey(15)
    .mapValues(values => values.sum)
    .collect

Produces shuffle write/read: 14.3 MB

Solve the same problem using reduceByKey function.

val stocks = sc.textFile(“stocks”)
stocks
    .filter(!_.startsWith("date"))
    .map(_.split(","))
    .map(tokens => (tokens(7), tokens(5).toDouble))
    .reduceByKey((x, y) => x + y)
    .collect

Produces shuffle write/read: 12.7 KB.

Conclusion, reduceByKey has drastically reduced the amount of network shuffle.

Caching Behaviour

Launch spark-shell with 2GB of drive memory.

$ spark-shell --verbose --driver-memory 2g

Load stocks dataset and cache the dataset in memory. Run a full scan operation using count and capture the size of the RDD in memory.

scala> import org.apache.spark.storage.StorageLevel
scala> val stocks = sc.textFile(“stocks”)
scala> stocks.cache().count // Same as stocks.persist(StorageLevel.MEMORY_ONLY)

Capture the size of the RDD in memory. See the screenshot below about where to find it.

scala> stocks.unpersist() // Uncache the stocks RDD from memory
scala> stocks.persist(StorageLevel.MEMORY_ONLY_SER).count  //Capture the size in memory

Create a dataframe and cache again under both MEMORY_ONLY and MEMORY_ONLY_SER and compare values.

scala> val df = spark.read.format("csv").option("header", "true").load("stocks")
scala> df.persist(StorageLevel.MEMORY_ONLY).count //Capture the size of memory
scala> df.unpersist() // Uncache the dataframe from memory
scala> df.persist(StorageLevel.MEMORY_ONLY_SER).count //capture the size of memory

Summary of the finding

Conclusion:

  • In RDD operation serialization and deserialization have huge different.
  • Actual file size in uncompressed format is about 121 MB which is closed to RDD serialized version
  • Ser/Deser caching level have little to no impact in Dataframe operation