Read from nested directories

Download the source code of Spark

$ mkdir -p ~/workspaces
$ cd ~/workspaces
$ git clone https://github.com/apache/spark.git
Cloning into 'spark'...
remote: Counting objects: 563612, done.
remote: Compressing objects: 100% (211/211), done.
remote: Total 563612 (delta 214), reused 138 (delta 123), pack-reused 563271
Receiving objects: 100% (563612/563612), 273.71 MiB | 2.58 MiB/s, done.
Resolving deltas: 100% (214129/214129), done.


Launch spark

$ /opt/mapr/spark/spark-2.2.1/bin/spark-shell


scala> sc.textFile("file:////home/mapr/workspaces/spark").count
java.io.IOException: Not a file: file:///home/mapr/workspaces/spark/mllib
  at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:324)
  at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
  at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
  at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
  at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
  ... 48 elided


scala> sc.hadoopConfiguration.get("mapreduce.input.fileinputformat.input.dir.recursive")
res0: String = null


scala> sc.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive", "true")


Find the line of code (LOC) across all files

scala> sc.textFile("file:////home/mapr/workspaces/spark").count
res5: Long = 1500033


Count the java files

scala> val javaFiles = sc.wholeTextFiles("file:////home/mapr/workspaces/spark", 100).map(_._1).filter(_.endsWith(".java"))
javaFiles: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[24] at repartition at <console>:24

scala> javaFiles.count
18/08/27 18:06:29 WARN TaskSetManager: Stage 5 contains a task of very large size (360 KB). The maximum recommended task size is 100 KB.
res10: Long = 911