RDD Partition Behaviour
Partitions of the Base RDD is based on
number of file (no of partitions will be at least the number of files .. one for each)
size of file (if the file is small, minimum partition count = 2, for large file it will divide the file into 32 MB blocks if the data source (HDFS, Cassandra) does not provide any partition hint)
underlying file system - if you are using HDFS or Cassandra, these contains hints about partition preference. Those source will determine the number of partitions
split tibility - if file is not splittable then second technique will take over the partitioning behaviour
The number of partitions at the shuffle RDD depends on a few factors
if you pass number of partitions as argument to wide operation, that will determine the number of partitions in the shuffle rdd
default level of parallelism (spark.default.parallelism) is set in the configuration
the number of partition will be based on the parent rdd
Note:
1. level of parallelism of job execution at a given stage is determined by number of partitions of the RDD
2. records in each partition is processed sequentially. so if partition size is large, the task will take longer time for processing
2. Try to keep the partitions size to 100 MB so that cluster management overhead is less in proportion to the processing time of task.
3. If the partition size is too low, the proportion of cluster management to the task duration will be high
To achieve 100 MB partition size, do trial and error. Estimate the size of data at RDD level that will be the hint. Lets you start with 1 TB of data and using filter you remove 90% ( .. based on your knowledge of data) that will leave 100 GB data to process .. so you get a ballpark number for num of partitions.
All RDDs offer two functions to change the number of partitions - repartition and coalesce. Repartition is used to increase/decrease the number the number partition while coalesce is used to decrease the number of partition. PairRdds offer one additional method, partitionBy to control the partitioning logic. Repartition is a wide operation - that create a stage.
Hash Partitioner
Hash partitioner distribute the data based on the hash code of the key. If the key distribution has skew, hash partitioner may create unequal size partitions.
scala> import scala.util.Random
scala> val numPartitions = 2
scala> val nums = (0 to 10).map(e => scala.util.Random.nextInt).map(r => (r, r.hashCode % numPartitions))
scala.collection.immutable.IndexedSeq[(Int, Int)] = Vector((2064866311,1), (1397872016,0), (-111422268,0), (1770144410,0), (533969592,0), (-1157859665,-1), (1023582323,1), (-210153845,-1), (1880335774,0), (-682078771,-1), (1313298797,1))
scala> import org.apache.spark.HashPartitioner
scala> sc.parallelize(nums).partitionBy(new HashPartitioner(2)).glom.collect
Array[Array[(Int, Int)]] = Array(Array((1397872016,0), (-111422268,0), (1770144410,0), (533969592,0), (1880335774,0)), Array((2064866311,1), (-1157859665,-1), (1023582323,1), (-210153845,-1), (-682078771,-1), (1313298797,1)))
Range Partitioner
Range partitioner allows you create equal size partition, if due a previous join operation, partition size got vastly non-uniform (you can ignore minor non-uniformity of size +/- 10%). Distributions will based on the values of the key.
Let's create some skewed partitions
scala> import scala.util.Random
scala> import org.apache.spark.HashPartitioner
scala> val nums = (0 to 100000).map(e => (0, Random.nextInt)) :+ (1 -> Random.nextInt)
scala> val rdd = sc.parallelize(nums).partitionBy(new HashPartitioner(2)).map(_.swap)
scala> rdd.glom.map(_.size).collect
Size of partitions Array(100001, 1)
As you can see above one partition contains 10 records and another contains 1. Now we will apply range partitioner to distribution the partitioner equally.
scala> rdd.repartition(2).glom.collect //Array(50002, 50000)
scala> import org.apache.spark.RangePartitioner
scala> val rangePartitioner = new RangePartitioner(3, rdd)
scala> rdd.partitionBy(rangePartitioner).glom.map(_.size).collect
You should see Array(33417, 28746, 37839)
You can create a custom partitioner as well.
Effect of Partitioners
Using hash partitioner you can reduce the network shuffle.
scala> val moviesRaw = sc.textFile("/wd/data/movie_lens/ml-20m/movies.csv")
scala> val pattern = """"([^"]*)"|(?<=,|^)([^,]*)(?=,|$)""".r
scala> val movies = moviesRaw
.filter(line => ! line.startsWith("movieId,title,genres"))
.map(_.replace("\"\"", ""))
.map(line => pattern.findAllIn(line).toArray)
Record count 27279
Create rdd on ratings dat.
scala> val ratingsRaw = sc.textFile("/wd/data/movie_lens/ml-20m/ratings.csv")
scala> val ratingsRdd = ratingsRaw.filter(line => !line.startsWith("userId,movieId,rating,timestamp")).map(line => line.split(','))
scala> val ratingsRddByMovieId = ratingsRdd.keyBy(fields => fields(1))
Count of record: 20000264
Some operations add partitioner by default. For example
sortBy adds a Hash partitioner,
scala> ratingsByMovieId.sortByKey().partitioner
Option[org.apache.spark.Partitioner] = Some(org.apache.spark.RangePartitioner@2f4a7f6)
groupBy operation adds RangePartititoner
scala> movies.flatMap(fields => fields(2).split('|')).groupBy(e => e).partitioner
Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)
Do a inner join betweer movies and ratings data and take a note of amount of shuffle.
scala> moviesById.join(ratingsRddByMovieId).count
Total Shuffle: 378.2 MB
Amount shuffle did not change by if you alter the position of bigger rdd in terms of record count.
scala> ratingsByMovieId.join(moviesById).count
Total amount of shuffle: 378.2 MB
Now hash partition ratings rdd that is a bigger rdd in terms of record count.
scala> import org.apache.spark.HashPartitioner
scala> val ratingsByMovieIdHashed = ratingsByMovieId.partitionBy(new HashPartitioner(2)).cache
scala> ratingsByMovieIdHashed.count
Check the partitioner on ratingsByMovieIdHashed.
scala> ratingsByMovieIdHashed.partitioner
Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)
scala> ratingsByMovieIdHashed.join(moviesById).count
Total amount of shuffle: 964.2 KB
Note that after hashing the ratings data, the shuffle size is now much less.
Now, let's see the effect of adding a hash partitioner on smaller dataset.
scala> val moviesByIdHashed = moviesById.partitionBy(new HashPartitioner(2)).cache
scala> moviesByIdHashed.count
scala> moviesByIdHashed.join(ratingsByMovieId).count
Total shuffle: 347.7 MB
So, the impact of addition hash partitioner on the smaller dataset is not significant.
Let's join two hashed partitioned rdd.
scala> moviesByIdHashed.join(ratingsByMovieIdHashed).count
Total shuffle: 0
Functions that get benefit from Partitioning
cogroup, all types of join, groupByKey, reduceByKey, combineByKey and lookup
Pre-partitioning data will cause at least one of the RDDs (one with know partitioner) to not be shuffled. If both RDDs are partitioned by same partitioner and if they are cached on the same machines, then no shuffle will occur.
For binary operations, the resulting rdd will contain hashpartitioner unless one of source rdd contains a partitioner. If a partitioner is present in the source RDDs, the resulting RDD will inherit that partitioner.
Functions that reset the Partitioner
map
Note: mapValues, flatMapValues retain the partitioner. So, while working with pairRdd, use these functions rather than map and and flatMap if suitable.
Customer Partitioner
Example: in PageRank, hash URLs by domain name, because may links are internal
class DomainPartitioner extends Partitioner {
def numPartitions = 20
def getPartition(key: Any): Int = parseDomain(key.toString).hashCode % numPartitions
def equals(other: Any): Boolean = other.isInstanceOf[DomainPartitioner]
}