How to read multiple text files into a single RDD?

You can specify whole directories, use wildcards and even CSV of directories and wildcards. E.g.:

sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")

As Nick Chammas points out this is an exposure of Hadoop's FileInputFormat and therefore this also works with Hadoop (and Scalding).


Use union as follows:

val sc = new SparkContext(...)
val r1 = sc.textFile("xxx1")
val r2 = sc.textFile("xxx2")
...
val rdds = Seq(r1, r2, ...)
val bigRdd = sc.union(rdds)

Then the bigRdd is the RDD with all files.


You can use a single textFile call to read multiple files. Scala:

sc.textFile(','.join(files)) 

You can use this

First You can get a Buffer/List of S3 Paths :

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest

def listFiles(s3_bucket:String, base_prefix : String) = {
    var files = new ArrayList[String]

    //S3 Client and List Object Request
    var s3Client = new AmazonS3Client();
    var objectListing: ObjectListing = null;
    var listObjectsRequest = new ListObjectsRequest();

    //Your S3 Bucket
    listObjectsRequest.setBucketName(s3_bucket)

    //Your Folder path or Prefix
    listObjectsRequest.setPrefix(base_prefix)

    //Adding s3:// to the paths and adding to a list
    do {
      objectListing = s3Client.listObjects(listObjectsRequest);
      for (objectSummary <- objectListing.getObjectSummaries().asScala) {
        files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
      }
      listObjectsRequest.setMarker(objectListing.getNextMarker());
    } while (objectListing.isTruncated());

    //Removing Base Directory Name
    files.remove(0)

    //Creating a Scala List for same
    files.asScala
  }

Now Pass this List object to the following piece of code, note : sc is an object of SQLContext

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

Now you got a final Unified RDD i.e. df

Optional, And You can also repartition it in a single BigRDD

val files = sc.textFile(filename, 1).repartition(1)

Repartitioning always works :D


In PySpark, I have found an additional useful way to parse files. Perhaps there is an equivalent in Scala, but I am not comfortable enough coming up with a working translation. It is, in effect, a textFile call with the addition of labels (in the below example the key = filename, value = 1 line from file).

"Labeled" textFile

input:

import glob
from pyspark import SparkContext
SparkContext.stop(sc)
sc = SparkContext("local","example") # if running locally
sqlContext = SQLContext(sc)

for filename in glob.glob(Data_File + "/*"):
    Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)

output: array with each entry containing a tuple using filename-as-key and with value = each line of file. (Technically, using this method you can also use a different key besides the actual filepath name- perhaps a hashing representation to save on memory). ie.

[('/home/folder_with_text_files/file1.txt', 'file1_contents_line1'),
 ('/home/folder_with_text_files/file1.txt', 'file1_contents_line2'),
 ('/home/folder_with_text_files/file1.txt', 'file1_contents_line3'),
 ('/home/folder_with_text_files/file2.txt', 'file2_contents_line1'),
  ...]

You can also recombine either as a list of lines:

Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

[('/home/folder_with_text_files/file1.txt', ['file1_contents_line1', 'file1_contents_line2','file1_contents_line3']),
 ('/home/folder_with_text_files/file2.txt', ['file2_contents_line1'])]

Or recombine entire files back to single strings (in this example the result is the same as what you get from wholeTextFiles, but with the string "file:" stripped from the filepathing.):

Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()