Read from nested directories

Download the source code of Spark

$ mkdir -p ~/workspaces

$ cd ~/workspaces

$ git clone https://github.com/apache/spark.git

Cloning into 'spark'...

remote: Counting objects: 563612, done.

remote: Compressing objects: 100% (211/211), done.

remote: Total 563612 (delta 214), reused 138 (delta 123), pack-reused 563271

Receiving objects: 100% (563612/563612), 273.71 MiB | 2.58 MiB/s, done.

Resolving deltas: 100% (214129/214129), done.


Launch spark

$ /opt/mapr/spark/spark-2.2.1/bin/spark-shell


scala> sc.textFile("file:////home/mapr/workspaces/spark").count

java.io.IOException: Not a file: file:///home/mapr/workspaces/spark/mllib

at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:324)

at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)

at scala.Option.getOrElse(Option.scala:121)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)

at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)

at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)

at scala.Option.getOrElse(Option.scala:121)

at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)

at org.apache.spark.rdd.RDD.count(RDD.scala:1158)

... 48 elided


scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive")

res0: String = null


scala> sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive", "true")


Find the line of code (LOC) across all files

scala> sc.textFile("file:////home/mapr/workspaces/spark").count

res5: Long = 1500033


Count the java files

scala> val javaFiles = sc.wholeTextFiles("file:////home/mapr/workspaces/spark", 100).map(_._1).filter(_.endsWith(".java"))

javaFiles: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[24] at repartition at <console>:24


scala> javaFiles.count

18/08/27 18:06:29 WARN TaskSetManager: Stage 5 contains a task of very large size (360 KB). The maximum recommended task size is 100 KB.

res10: Long = 911