Pyspark - Code to calculate file hash/checksum not working
I have the below pyspark
code to calculate the SHA1 hash of each file in a folder. I'm using spark.sparkContext.binaryFiles
to get an RDD of pairs where the key is the file name and the value is a file-like object, on which I'm calculating the hash in a map function rdd.mapValues(map_hash_file)
. However, I'm getting the below error at the second-last line, which I don't understand - how can this be fixed please? Thanks
Error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 66.0 failed 4 times, most recent failure: Lost task 0.3 in stage 66.0
Code:
#Function to calulcate hash-value/checksum of a file
def map_hash_file(row):
file_name = row[0]
file_contents = row[1]
sha1_hash = hashlib.sha1()
sha1_hash.update(file_contents.encode('utf-8'))
return file_name, sha1_hash.hexdigest()
rdd = spark.sparkContext.binaryFiles('/mnt/workspace/Test_Folder', minPartitions=None)
#As a check, print the list of files collected in the RDD
dataColl=rdd.collect()
for row in dataColl:
print(row[0])
#Apply the function to calcuate hash of each file and store the results
hash_values = rdd.mapValues(map_hash_file)
#Store each file name and it's hash value in a dataframe to later export as a CSV
df = spark.createDataFrame(data=hash_values)
display(df)
Solution 1:
You will get your expected result if you do the following:
- Change
file_contents.encode('utf-8')
tofile_contents
.file_contents
is already a of typebytes
- Change
rdd.mapValues(map_hash_file)
tordd.map(map_hash_file)
. The functionmap_hash_file
expects a tuple.
Also consider:
- Adding an
import hashlib
- Not collecting the content of all files to the driver - you risk consuming all the memory at the driver.
With the above changes, your code should look something like this:
import hashlib
#Function to calulcate hash-value/checksum of a file
def map_hash_file(row):
file_name = row[0]
file_contents = row[1]
sha1_hash = hashlib.sha1()
sha1_hash.update(file_contents)
return file_name, sha1_hash.hexdigest()
rdd = spark.sparkContext.binaryFiles('/mnt/workspace/Test_Folder', minPartitions=None)
#Apply the function to calcuate hash of each file and store the results
hash_values = rdd.map(map_hash_file)
#Store each file name and it's hash value in a dataframe to later export as a CSV
df = spark.createDataFrame(data=hash_values)
display(df)