Bucket By
Abstract: How bucketing can significantly improve join operation
Create two tables with 10 million records each.
spark.range(10000000).withColumn("f1", pow("id", 2)).write.saveAsTable("t1")
spark.range(10000000).withColumn("f2", pow("id", 3)).write.saveAsTable("t2")
Created 2 more tables with data bucketed by id column.
spark.table("t1").write.bucketBy(10, "id").saveAsTable("t1_buck")
spark.table("t2").write.bucketBy(10, "id").saveAsTable("t2_buck")
sql("select * from t1 join t2 on t1.id = t2.id").explain
[== Physical Plan ==
*SortMergeJoin [id#3408807L], [id#3408809L], Inner
:- *Sort [id#3408807L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#3408807L, 200)
: +- *Project [id#3408807L, f1#3408808]
: +- *Filter isnotnull(id#3408807L)
: +- *FileScan parquet default.t1[id#3408807L,f1#3408808] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/abulbasar/Downloads/zeppelin-0.7.3-bin-netinst/spark-warehouse/t1], PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,f1:double>
+- *Sort [id#3408809L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#3408809L, 200)
+- *Project [id#3408809L, f2#3408810]
+- *Filter isnotnull(id#3408809L)
+- *...
Observations:
- join requires a single stage
- 2 exchange hash partitioner are present followed up SortMergeJoin
sql("select * from t1_buck join t2_buck on t1_buck.id = t2_buck.id").explain
[== Physical Plan ==
*SortMergeJoin [id#3408855L], [id#3408857L], Inner
:- *Sort [id#3408855L ASC NULLS FIRST], false, 0
: +- *Project [id#3408855L, f1#3408856]
: +- *Filter isnotnull(id#3408855L)
: +- *FileScan parquet default.t1_buck[id#3408855L,f1#3408856] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Users/abulbasar/Downloads/zeppelin-0.7.3-bin-netinst/spark-warehouse/t1_b..., PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint,f1:double>
+- *Sort [id#3408857L ASC NULLS FIRST], false, 0
+- *Project [id#3408857L, f2#3408858]
+- *Filter isnotnull(id#3408857L)
+- *FileScan parquet default.t2_buck[id#3408857L,f2#3408858] Batched: true, Format: Parquet, Location: InMemoryFi...
Observations:
- join requires a single stage
- 2 exchange hash partitioner are removed
Net benefit, the join query ran much faster for the bucketed table.