How to open/stream .zip files through Spark?

I have zip files that I would like to open 'through' Spark. I can open .gzip file no problem because of Hadoops native Codec support, but am unable to do so with .zip files.

Is there an easy way to read a zip file in your Spark code? I've also searched for zip codec implementations to add to the CompressionCodecFactory, but am unsuccessful so far.


There was no solution with python code and I recently had to read zips in pyspark. And, while searching how to do that I came across this question. So, hopefully this'll help others.

import zipfile
import io

def zip_extract(x):
    in_memory_data = io.BytesIO(x[1])
    file_obj = zipfile.ZipFile(in_memory_data, "r")
    files = [i for i in file_obj.namelist()]
    return dict(zip(files, [file_obj.open(file).read() for file in files]))


zips = sc.binaryFiles("hdfs:/Testing/*.zip")
files_data = zips.map(zip_extract).collect()

In the above code I returned a dictionary with filename in the zip as a key and the text data in each file as the value. you can change it however you want to suit your purposes.


@user3591785 pointed me in the correct direction, so I marked his answer as correct.

For a bit more detail, I was able to search for ZipFileInputFormat Hadoop, and came across this link: http://cotdp.com/2012/07/hadoop-processing-zip-files-in-mapreduce/

Taking the ZipFileInputFormat and its helper ZipfileRecordReader class, I was able to get Spark to perfectly open and read the zip file.

    rdd1  = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration());

The result was a map with one element. The file name as key, and the content as the value, so I needed to transform this into a JavaPairRdd. I'm sure you could probably replace Text with BytesWritable if you want, and replace the ArrayList with something else, but my goal was to first get something running.

JavaPairRDD<String, String> rdd2 = rdd1.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, String, String>() {

    @Override
    public Iterable<Tuple2<String, String>> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
        List<Tuple2<String,String>> newList = new ArrayList<Tuple2<String, String>>();

        InputStream is = new ByteArrayInputStream(textTextTuple2._2.getBytes());
        BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));

        String line;

        while ((line = br.readLine()) != null) {

        Tuple2 newTuple = new Tuple2(line.split("\\t")[0],line);
            newList.add(newTuple);
        }
        return newList;
    }
});