Bucket By

Abstract: How bucketing can significantly improve join operation

Create two tables with 10 million records each.

spark.range(10000000).withColumn("f1", pow("id", 2)).write.saveAsTable("t1")
spark.range(10000000).withColumn("f2", pow("id", 3)).write.saveAsTable("t2")

Created 2 more tables with data bucketed by id column.

spark.table("t1").write.bucketBy(10, "id").saveAsTable("t1_buck")
spark.table("t2").write.bucketBy(10, "id").saveAsTable("t2_buck")
sql("select * from t1 join t2 on t1.id = t2.id").explain

[== Physical Plan ==

*SortMergeJoin [id#3408807L], [id#3408809L], Inner

:- *Sort [id#3408807L ASC NULLS FIRST], false, 0

: +- Exchange hashpartitioning(id#3408807L, 200)

: +- *Project [id#3408807L, f1#3408808]

: +- *Filter isnotnull(id#3408807L)

: +- *FileScan parquet default.t1[id#3408807L,f1#3408808] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/abulbasar/Downloads/zeppelin-0.7.3-bin-netinst/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,f1:double>

+- *Sort [id#3408809L ASC NULLS FIRST], false, 0

+- Exchange hashpartitioning(id#3408809L, 200)

+- *Project [id#3408809L, f2#3408810]

+- *Filter isnotnull(id#3408809L)

+- *...

Observations:

- join requires a single stage

- 2 exchange hash partitioner are present followed up SortMergeJoin

sql("select * from t1_buck join t2_buck on t1_buck.id = t2_buck.id").explain

[== Physical Plan ==

*SortMergeJoin [id#3408855L], [id#3408857L], Inner

:- *Sort [id#3408855L ASC NULLS FIRST], false, 0

: +- *Project [id#3408855L, f1#3408856]

: +- *Filter isnotnull(id#3408855L)

: +- *FileScan parquet default.t1_buck[id#3408855L,f1#3408856] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/abulbasar/Downloads/zeppelin-0.7.3-bin-netinst/spark-warehouse/t1_b..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,f1:double>

+- *Sort [id#3408857L ASC NULLS FIRST], false, 0

+- *Project [id#3408857L, f2#3408858]

+- *Filter isnotnull(id#3408857L)

+- *FileScan parquet default.t2_buck[id#3408857L,f2#3408858] Batched: true, Format: Parquet, Location: InMemoryFi...

Observations:

- join requires a single stage

- 2 exchange hash partitioner are removed

Net benefit, the join query ran much faster for the bucketed table.