Spark Memory Management
Exercises for Spark Internals.
Goal of this exercise is to learn the following
How to optimize the shuffle using local aggregation
Control parallelism by controlling partitions of RDD
How to optimize caching
Here is the test dataset. Download, unzip and save it in "stocks" folder in your HDFS home directory.
Controlling Partitions of RDD
Identify the narrow and wide operations in the following statement and from spark Web UI find the number of stages and the number of tasks for a job for the following
val stocks = sc.textFile("stocks")
stocks
.repartition(10)
.filter(!_.startsWith("date"))
.map(_.split(","))
.map(tokens => (tokens(7), tokens(5).toDouble))
.groupByKey()
.mapValues(values => values.sum / values.size)
.coalesce(1)
.count
Control the number of partitions while reading data from source
sc.textFile("stocks").partitions.size // Expected value is 2 as min number of spark partitions for HDFS datasource is 2.
sc.textFile("stocks", 7).partitions.size // Expected value 7
Verify that the no of partitions of shuffled rdd is same as that of parent when num partitions argument is not passed in wide operation and default level of parallelism is not set.
val stocks = sc.textFile("stocks")
stocks
.repartition(11)
.filter(!_.startsWith("date"))
.map(_.split(","))
.map(tokens => (tokens(7), tokens(5).toDouble))
.groupByKey()
.partitions.size // Expected value is 11 since the parent RDD before groupByKey operation has 11 partitions created through repartitioning.
Control the number of partitions using numPartitions argument of the wide operation
val stocks = sc.textFile("stocks")
stocks
.filter(!_.startsWith("date"))
.map(_.split(","))
.map(tokens => (tokens(7), tokens(5).toDouble))
.groupByKey(18)
.partitions.size // Expected value is 18
Control the number of partitions using default level of parallelism property
Launch spark with default parallelism as 13
$ spark-shell --conf spark.default.parallelism=13 --verbose
val stocks = sc.textFile("stocks", 7)
stocks
.filter(!_.startsWith("date"))
.map(_.split(","))
.map(tokens => (tokens(7), tokens(5).toDouble))
.groupByKey()
.partitions.size // Expected value is 13
Control the number of partitions using re-partition and coalesce function. Note: Use repartition to increase the number of partitions and use coalesce to reduce the number of partitions.
val stocks = sc.textFile("stocks")
stocks.repartition(12).partitions.size //Expected value is 12
stocks.coalesce(3).partitions.size // Expected value is 3
Reduce Shuffle Using reduceByKey function
Control the data shuffling using reduceByKey function that uses local aggregation prior to global aggregation
First create a group by operation to sum volumes of stock trading for each symbol using groupByKey operation
val stocks = sc.textFile("stocks")
stocks
.filter(!_.startsWith("date"))
.map(_.split(","))
.map(tokens => (tokens(7), tokens(5).toDouble))
.groupByKey(15)
.mapValues(values => values.sum)
.collect
Produces shuffle write/read: 14.3 MB
Solve the same problem using reduceByKey function.
val stocks = sc.textFile(“stocks”)
stocks
.filter(!_.startsWith("date"))
.map(_.split(","))
.map(tokens => (tokens(7), tokens(5).toDouble))
.reduceByKey((x, y) => x + y)
.collect
Produces shuffle write/read: 12.7 KB.
Conclusion, reduceByKey has drastically reduced the amount of network shuffle.
Caching Behaviour
Launch spark-shell with 2GB of drive memory.
$ spark-shell --verbose --driver-memory 2g
Load stocks dataset and cache the dataset in memory. Run a full scan operation using count and capture the size of the RDD in memory.
scala> import org.apache.spark.storage.StorageLevel
scala> val stocks = sc.textFile(“stocks”)
scala> stocks.cache().count // Same as stocks.persist(StorageLevel.MEMORY_ONLY)
Capture the size of the RDD in memory. See the screenshot below about where to find it.
scala> stocks.unpersist() // Uncache the stocks RDD from memory
scala> stocks.persist(StorageLevel.MEMORY_ONLY_SER).count //Capture the size in memory
Create a dataframe and cache again under both MEMORY_ONLY and MEMORY_ONLY_SER and compare values.
scala> val df = spark.read.format("csv").option("header", "true").load("stocks")
scala> df.persist(StorageLevel.MEMORY_ONLY).count //Capture the size of memory
scala> df.unpersist() // Uncache the dataframe from memory
scala> df.persist(StorageLevel.MEMORY_ONLY_SER).count //capture the size of memory
Summary of the finding
Conclusion:
In RDD operation serialization and deserialization have huge different.
Actual file size in uncompressed format is about 121 MB which is closed to RDD serialized version
Ser/Deser caching level have little to no impact in Dataframe operation