Compress Output Files in Spark
Source File in HDFS:
$ hadoop fs -ls -h /user/cloudera/sfpd
-rw-r--r-- 1 cloudera cloudera 365.0 M 2016-11-15 08:57 /user/cloudera/sfpd/Map__Crime_Incidents_-_from_1_Jan_2003.csv
Check number of blocks and block size from Namenode UI http://localhost:50070/explorer.html#/user/cloudera/sfpd
With the default block size of 128, it shows 3 blocks with the following sizes:
block 0:
Compress RDD Output
Create a RDD in spark
scala> val rdd = sc.textFile("/user/cloudera/stocks")
scala> rdd.partitions.size
res0: Int = 2
scala> rdd.coalesce(1).saveAsTextFile("/user/cloudera/sfpd.1", classOf[org.apache.hadoop.io.compress.BZip2Codec])
Save the RDD without any compression and verify the number of blocks and block size
scala> rdd.coalesce(1).saveAsTextFile("/user/cloudera/sfpd.1")
It should be 3 blocks. Total size: 365 MB, which matches with the original file size.
Block 0: 134217728
Block 1: 134217728
Block 2: 114289931
Save the RDD using GZip compression
scala> rdd.coalesce(1).saveAsTextFile("/user/cloudera/sfpd.2", classOf[org.apache.hadoop.io.compress.GzipCodec])
Verify the number of blocks and size of each block. Total Size: 86.33 MB
Block 0: 90523565
Save the RDD using BZip2 compression
scala> rdd.coalesce(1).saveAsTextFile("/user/cloudera/sfpd.3", classOf[org.apache.hadoop.io.compress.BZip2Codec])
Verify the number of blocks and size of each block. Total size: 53.76 MB
Block 0: 56375965
Find the default block size of HDFS
scala> sc.hadoopConfiguration.get("dfs.blocksize") // Expected Value: 134217728
Compress Dataset Output
Let's use dataset api to use compression in the output.
scala> val ds = spark.read.text("/user/cloudera/sfpd")
Save the dataset using gzip compression
scala> ds.repartition(1).write.format("csv").option("compression", "gzip").save("/user/cloudera/sfpd.ds.1")
Save the dataset using bzip2 compression
scala> ds.repartition(1).write.format("csv").option("compression", "org.apache.hadoop.io.compress.BZip2Codec").save("/user/cloudera/sfpd.ds.2")