How to use spark.DataFrameReader from Foundry Transforms
You'll want to follow the methodology over here. Strongly recommend using unit test based methods to iterate on your code to recover the file contents.
Your compute function code will look like:
from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many
def read_files(spark_session, paths):
parsed_dfs = []
for file_name in paths:
parsed_df = spark_session.read.option("header", "true").csv(file_name)
parsed_dfs += [parsed_df]
output_df = union_many(*parsed_dfs)
return output_df
@transform(
the_output=Output("ri.foundry.main.dataset.my-awesome-output"),
the_input=Input("ri.foundry.main.dataset.my-awesome-input"),
)
def my_compute_function(the_input, the_output, ctx):
session = ctx.spark_session
input_filesystem = the_input.filesystem()
hadoop_path = input_filesystem.hadoop_path
files = input_filesystem.ls('**/*.csv.gz').map(lambda file_name: hadoop_path + file_name)
output_df = read_files(session, files)
the_output.write_dataframe(output_df)