RDD Partition Behaviour

Partitions of the Base RDD is based on

    1. number of file (no of partitions will be at least the number of files .. one for each)
    2. size of file (if the file is small, minimum partition count = 2, for large file it will divide the file into 32 MB blocks if the data source (HDFS, Cassandra) does not provide any partition hint)
    3. underlying file system - if you are using HDFS or Cassandra, these contains hints about partition preference. Those source will determine the number of partitions
  1. split tibility - if file is not splittable then second technique will take over the partitioning behaviour

The number of partitions at the shuffle RDD depends on a few factors

    • if you pass number of partitions as argument to wide operation, that will determine the number of partitions in the shuffle rdd
    • default level of parallelism (spark.default.parallelism) is set in the configuration
    • the number of partition will be based on the parent rdd

Note:

1. level of parallelism of job execution at a given stage is determined by number of partitions of the RDD

2. records in each partition is processed sequentially. so if partition size is large, the task will take longer time for processing

2. Try to keep the partitions size to 100 MB so that cluster management overhead is less in proportion to the processing time of task.

3. If the partition size is too low, the proportion of cluster management to the task duration will be high

To achieve 100 MB partition size, do trial and error. Estimate the size of data at RDD level that will be the hint. Lets you start with 1 TB of data and using filter you remove 90% ( .. based on your knowledge of data) that will leave 100 GB data to process .. so you get a ballpark number for num of partitions.

All RDDs offer two functions to change the number of partitions - repartition and coalesce. Repartition is used to increase/decrease the number the number partition while coalesce is used to decrease the number of partition. PairRdds offer one additional method, partitionBy to control the partitioning logic. Repartition is a wide operation - that create a stage.

Hash Partitioner

Hash partitioner distribute the data based on the hash code of the key. If the key distribution has skew, hash partitioner may create unequal size partitions.

scala> import scala.util.Random
scala> val numPartitions = 2
scala> val nums = (0 to 10).map(e => scala.util.Random.nextInt).map(r => (r, r.hashCode % numPartitions))

scala.collection.immutable.IndexedSeq[(Int, Int)] = Vector((2064866311,1), (1397872016,0), (-111422268,0), (1770144410,0), (533969592,0), (-1157859665,-1), (1023582323,1), (-210153845,-1), (1880335774,0), (-682078771,-1), (1313298797,1))

scala> import org.apache.spark.HashPartitioner 
scala> sc.parallelize(nums).partitionBy(new HashPartitioner(2)).glom.collect

Array[Array[(Int, Int)]] = Array(Array((1397872016,0), (-111422268,0), (1770144410,0), (533969592,0), (1880335774,0)), Array((2064866311,1), (-1157859665,-1), (1023582323,1), (-210153845,-1), (-682078771,-1), (1313298797,1)))

Range Partitioner

Range partitioner allows you create equal size partition, if due a previous join operation, partition size got vastly non-uniform (you can ignore minor non-uniformity of size +/- 10%). Distributions will based on the values of the key.

Let's create some skewed partitions

scala> import scala.util.Random
scala> import org.apache.spark.HashPartitioner
scala> val nums = (0 to 100000).map(e => (0, Random.nextInt)) :+ (1 -> Random.nextInt)
scala> val rdd = sc.parallelize(nums).partitionBy(new HashPartitioner(2)).map(_.swap)
scala> rdd.glom.map(_.size).collect

Size of partitions Array(100001, 1)

As you can see above one partition contains 10 records and another contains 1. Now we will apply range partitioner to distribution the partitioner equally.
scala> rdd.repartition(2).glom.collect //Array(50002, 50000)
scala> import org.apache.spark.RangePartitioner
scala> val rangePartitioner = new RangePartitioner(3, rdd)
scala> rdd.partitionBy(rangePartitioner).glom.map(_.size).collect 

You should see Array(33417, 28746, 37839)

You can create a custom partitioner as well.

Effect of Partitioners

Using hash partitioner you can reduce the network shuffle.

scala> val moviesRaw = sc.textFile("/wd/data/movie_lens/ml-20m/movies.csv")
scala> val pattern = """"([^"]*)"|(?<=,|^)([^,]*)(?=,|$)""".r
scala> val movies = moviesRaw
    .filter(line => ! line.startsWith("movieId,title,genres"))
    .map(_.replace("\"\"", ""))
    .map(line => pattern.findAllIn(line).toArray)

Record count 27279

Create rdd on ratings dat.

scala> val ratingsRaw = sc.textFile("/wd/data/movie_lens/ml-20m/ratings.csv")
scala> val ratingsRdd = ratingsRaw.filter(line => !line.startsWith("userId,movieId,rating,timestamp")).map(line => line.split(','))
scala> val ratingsRddByMovieId = ratingsRdd.keyBy(fields => fields(1))

Count of record: 20000264

Some operations add partitioner by default. For example

sortBy adds a Hash partitioner,

scala> ratingsByMovieId.sortByKey().partitioner

Option[org.apache.spark.Partitioner] = Some(org.apache.spark.RangePartitioner@2f4a7f6)

groupBy operation adds RangePartititoner

scala> movies.flatMap(fields => fields(2).split('|')).groupBy(e => e).partitioner

Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)

Do a inner join betweer movies and ratings data and take a note of amount of shuffle.

scala> moviesById.join(ratingsRddByMovieId).count

Total Shuffle: 378.2 MB

Amount shuffle did not change by if you alter the position of bigger rdd in terms of record count.

scala> ratingsByMovieId.join(moviesById).count

Total amount of shuffle: 378.2 MB

Now hash partition ratings rdd that is a bigger rdd in terms of record count.

scala> import org.apache.spark.HashPartitioner
scala> val ratingsByMovieIdHashed = ratingsByMovieId.partitionBy(new HashPartitioner(2)).cache
scala> ratingsByMovieIdHashed.count

Check the partitioner on ratingsByMovieIdHashed.

scala> ratingsByMovieIdHashed.partitioner

Option[org.apache.spark.Partitioner] = Some(org.apache.spark.HashPartitioner@2)

scala> ratingsByMovieIdHashed.join(moviesById).count

Total amount of shuffle: 964.2 KB

Note that after hashing the ratings data, the shuffle size is now much less.

Now, let's see the effect of adding a hash partitioner on smaller dataset.

scala> val moviesByIdHashed = moviesById.partitionBy(new HashPartitioner(2)).cache
scala> moviesByIdHashed.count
scala> moviesByIdHashed.join(ratingsByMovieId).count

Total shuffle: 347.7 MB

So, the impact of addition hash partitioner on the smaller dataset is not significant.

Let's join two hashed partitioned rdd.

scala> moviesByIdHashed.join(ratingsByMovieIdHashed).count

Total shuffle: 0

Functions that get benefit from Partitioning

cogroup, all types of join, groupByKey, reduceByKey, combineByKey and lookup

Pre-partitioning data will cause at least one of the RDDs (one with know partitioner) to not be shuffled. If both RDDs are partitioned by same partitioner and if they are cached on the same machines, then no shuffle will occur.

For binary operations, the resulting rdd will contain hashpartitioner unless one of source rdd contains a partitioner. If a partitioner is present in the source RDDs, the resulting RDD will inherit that partitioner.

Functions that reset the Partitioner

map

Note: mapValues, flatMapValues retain the partitioner. So, while working with pairRdd, use these functions rather than map and and flatMap if suitable.

Customer Partitioner

Example: in PageRank, hash URLs by domain name, because may links are internal

class DomainPartitioner extends Partitioner { 
    def numPartitions = 20
    def getPartition(key: Any): Int = parseDomain(key.toString).hashCode % numPartitions
    def equals(other: Any): Boolean = other.isInstanceOf[DomainPartitioner]
}