How to loop through each row of dataFrame in pyspark

Solution 1:

To "loop" and take advantage of Spark's parallel computation framework, you could define a custom function and use map.

def customFunction(row):

   return (row.name, row.age, row.city)

sample2 = sample.rdd.map(customFunction)

or

sample2 = sample.rdd.map(lambda x: (x.name, x.age, x.city))

The custom function would then be applied to every row of the dataframe. Note that sample2 will be a RDD, not a dataframe.

Map may be needed if you are going to perform more complex computations. If you just need to add a simple derived column, you can use the withColumn, with returns a dataframe.

sample3 = sample.withColumn('age2', sample.age + 2)

Solution 2:

You simply cannot. DataFrames, same as other distributed data structures, are not iterable and can be accessed using only dedicated higher order function and / or SQL methods.

You can of course collect

for row in df.rdd.collect():
    do_something(row)

or convert toLocalIterator

for row in df.rdd.toLocalIterator():
    do_something(row)

and iterate locally as shown above, but it beats all purpose of using Spark.

Solution 3:

Using list comprehensions in python, you can collect an entire column of values into a list using just two lines:

df = sqlContext.sql("show tables in default")
tableList = [x["tableName"] for x in df.rdd.collect()]

In the above example, we return a list of tables in database 'default', but the same can be adapted by replacing the query used in sql().

Or more abbreviated:

tableList = [x["tableName"] for x in sqlContext.sql("show tables in default").rdd.collect()]

And for your example of three columns, we can create a list of dictionaries, and then iterate through them in a for loop.

sql_text = "select name, age, city from user"
tupleList = [{name:x["name"], age:x["age"], city:x["city"]} 
             for x in sqlContext.sql(sql_text).rdd.collect()]
for row in tupleList:
    print("{} is a {} year old from {}".format(
        row["name"],
        row["age"],
        row["city"]))

Solution 4:

Give A Try Like this

    result = spark.createDataFrame([('SpeciesId','int'), ('SpeciesName','string')],["col_name", "data_type"]); 
    for f in result.collect(): 
        print (f.col_name)