scala -get file size of individual json in directory

I have a schema for json data defined as

val gpsSchema: StructType = 
  StructType(Array(
    StructField("Name",StringType,true),
    StructField("GPS", ArrayType(
      StructType(Array(
          StructField("TimeStamp",DoubleType,true),
          StructField("Longitude", DoubleType, true),
          StructField("Latitude",DoubleType,true)
          )),true),true)))

sample json data

{"Name":"John","GPS":[{"TimeStamp": 1605449171.259277, "Longitude": -76.463684, "Latitude": 40.787052}, 
{"TimeStamp": 1605449175.743052, "Longitude": -76.464046, "Latitude": 40.787038}, 
{"TimeStamp": 1605449180.932659, "Longitude": -76.464465, "Latitude": 40.787022}, 
{"TimeStamp": 1605449187.288478, "Longitude": -76.464977, "Latitude": 40.787054}]}

I have 50 such json files in my input directory ("dbfs:/mnt/input_dir")

val my_dataframe = spark.read.schema(gpsSchema).json("dbfs:/mnt/input_dir")
my_dataframe.count() = 50

How can I get the file size for each json in my_dataframe using scala?


Solution 1:

You can define a UDF as follows

  import org.apache.hadoop.fs.{FileSystem, Path}
  val fileSize = udf { loc:String =>
    val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    val path = new Path(loc)
    fs.getFileStatus(path).getLen
  }

and call it on your data as follows

   import org.apache.spark.sql.functions._
   val cols = Seq($"*",input_file_name(),fileSize(input_file_name()).as("file_size"))
   val df = spark.read.format("json").load("data/path").select(cols:_*)

this will give you all the rows of the json as well as the file_path and its size.

Please note with this approach we are trying to create a new FileSystem object for each row, which is highly inefficient so on large files of data you will have some performance implications.

Edit: In light of the performance implication, you can also retrieve the file size locally by the following method:

 import org.apache.hadoop.fs.{FileSystem, Path}
  def calculateFileSize(fileLocation:String,spark:SparkSession) = {
    val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
    val path = new Path(fileLocation:String)
    fs.getFileStatus(path).getLen
  }