Compress Output Files in Spark

Source File in HDFS:

$ hadoop fs -ls -h /user/cloudera/sfpd

-rw-r--r-- 1 cloudera cloudera 365.0 M 2016-11-15 08:57 /user/cloudera/sfpd/Map__Crime_Incidents_-_from_1_Jan_2003.csv

Check number of blocks and block size from Namenode UI http://localhost:50070/explorer.html#/user/cloudera/sfpd

With the default block size of 128, it shows 3 blocks with the following sizes:

block 0:

Compress RDD Output

Create a RDD in spark

scala> val rdd = sc.textFile("/user/cloudera/stocks")

scala> rdd.partitions.size

res0: Int = 2

scala> rdd.coalesce(1).saveAsTextFile("/user/cloudera/sfpd.1", classOf[])

Save the RDD without any compression and verify the number of blocks and block size

scala> rdd.coalesce(1).saveAsTextFile("/user/cloudera/sfpd.1")

It should be 3 blocks. Total size: 365 MB, which matches with the original file size.

Block 0: 134217728

Block 1: 134217728

Block 2: 114289931

Save the RDD using GZip compression

scala> rdd.coalesce(1).saveAsTextFile("/user/cloudera/sfpd.2", classOf[])

Verify the number of blocks and size of each block. Total Size: 86.33 MB

Block 0: 90523565

Save the RDD using BZip2 compression

scala> rdd.coalesce(1).saveAsTextFile("/user/cloudera/sfpd.3", classOf[])

Verify the number of blocks and size of each block. Total size: 53.76 MB

Block 0: 56375965

Find the default block size of HDFS

scala> sc.hadoopConfiguration.get("dfs.blocksize") // Expected Value: 134217728

Compress Dataset Output

Let's use dataset api to use compression in the output.

scala> val ds ="/user/cloudera/sfpd")

Save the dataset using gzip compression

scala> ds.repartition(1).write.format("csv").option("compression", "gzip").save("/user/cloudera/sfpd.ds.1")

Save the dataset using bzip2 compression

scala> ds.repartition(1).write.format("csv").option("compression", "").save("/user/cloudera/sfpd.ds.2")

Compression codec for Hadoop file system.