Spark Memory Management
Exercises for Spark Internals.
Goal of this exercise is to learn the following
How to optimize the shuffle using local aggregation
Control parallelism by controlling partitions of RDD
How to optimize caching
Here is the test dataset. Download, unzip and save it in "stocks" folder in your HDFS home directory.
Controlling Partitions of RDD
Identify the narrow and wide operations in the following statement and from spark Web UI find the number of stages and the number of tasks for a job for the following
Control the number of partitions while reading data from source
Verify that the no of partitions of shuffled rdd is same as that of parent when num partitions argument is not passed in wide operation and default level of parallelism is not set.
Control the number of partitions using numPartitions argument of the wide operation
Control the number of partitions using default level of parallelism property
Launch spark with default parallelism as 13
Control the number of partitions using re-partition and coalesce function. Note: Use repartition to increase the number of partitions and use coalesce to reduce the number of partitions.
Reduce Shuffle Using reduceByKey function
Control the data shuffling using reduceByKey function that uses local aggregation prior to global aggregation
First create a group by operation to sum volumes of stock trading for each symbol using groupByKey operation
Produces shuffle write/read: 14.3 MB
Solve the same problem using reduceByKey function.
Produces shuffle write/read: 12.7 KB.
Conclusion, reduceByKey has drastically reduced the amount of network shuffle.
Launch spark-shell with 2GB of drive memory.
Load stocks dataset and cache the dataset in memory. Run a full scan operation using count and capture the size of the RDD in memory.
Capture the size of the RDD in memory. See the screenshot below about where to find it.
Create a dataframe and cache again under both MEMORY_ONLY and MEMORY_ONLY_SER and compare values.
Summary of the finding
In RDD operation serialization and deserialization have huge different.
Actual file size in uncompressed format is about 121 MB which is closed to RDD serialized version
Ser/Deser caching level have little to no impact in Dataframe operation