Pyspark - Code to calculate file hash/checksum not working

I have the below pyspark code to calculate the SHA1 hash of each file in a folder. I'm using spark.sparkContext.binaryFiles to get an RDD of pairs where the key is the file name and the value is a file-like object, on which I'm calculating the hash in a map function rdd.mapValues(map_hash_file). However, I'm getting the below error at the second-last line, which I don't understand - how can this be fixed please? Thanks

Error: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 66.0 failed 4 times, most recent failure: Lost task 0.3 in stage 66.0

Code:

#Function to calulcate hash-value/checksum of a file
def map_hash_file(row):
    file_name = row[0]
    file_contents = row[1]
    sha1_hash = hashlib.sha1()
    sha1_hash.update(file_contents.encode('utf-8'))
    return file_name, sha1_hash.hexdigest()

rdd = spark.sparkContext.binaryFiles('/mnt/workspace/Test_Folder', minPartitions=None)

#As a check, print the list of files collected in the RDD
dataColl=rdd.collect()
for row in dataColl:
    print(row[0])

#Apply the function to calcuate hash of each file and store the results
hash_values = rdd.mapValues(map_hash_file)

#Store each file name and it's hash value in a dataframe to later export as a CSV
df = spark.createDataFrame(data=hash_values)
display(df)

Solution 1:

You will get your expected result if you do the following:

  • Change file_contents.encode('utf-8') to file_contents. file_contents is already a of type bytes
  • Change rdd.mapValues(map_hash_file) to rdd.map(map_hash_file). The function map_hash_file expects a tuple.

Also consider:

  • Adding an import hashlib
  • Not collecting the content of all files to the driver - you risk consuming all the memory at the driver.

With the above changes, your code should look something like this:

import hashlib
#Function to calulcate hash-value/checksum of a file
def map_hash_file(row):
    file_name = row[0]
    file_contents = row[1]
    sha1_hash = hashlib.sha1()
    sha1_hash.update(file_contents)
    return file_name, sha1_hash.hexdigest()

rdd = spark.sparkContext.binaryFiles('/mnt/workspace/Test_Folder', minPartitions=None)

#Apply the function to calcuate hash of each file and store the results
hash_values = rdd.map(map_hash_file)

#Store each file name and it's hash value in a dataframe to later export as a CSV
df = spark.createDataFrame(data=hash_values)
display(df)