Apache Spark Data Generator Function on Databricks Not working

I am trying to execute the Data Generator function provided my Microsoft to test streaming data to Event Hubs.

Unfortunately, I keep on getting the error

Processing failure: No such file or directory

When I try and execute the function:

%scala
DummyDataGenerator.start(15)

Can someone take a look at the code and help decipher why I'm getting the error:

class DummyDataGenerator:
  streamDirectory = "/FileStore/tables/flight"
None # suppress output

I'm not sure how the above cell gets called into the function DummyDataGenerator

%scala

import scala.util.Random
import java.io._
import java.time._

// Notebook #2 has to set this to 8, we are setting
// it to 200 to "restore" the default behavior.
spark.conf.set("spark.sql.shuffle.partitions", 200)

// Make the username available to all other languages.
// "WARNING: use of the "current" username is unpredictable
// when multiple users are collaborating and should be replaced
// with the notebook ID instead.
val username = com.databricks.logging.AttributionContext.current.tags(com.databricks.logging.BaseTagDefinitions.TAG_USER);
spark.conf.set("com.databricks.training.username", username)

object DummyDataGenerator extends Runnable {
  var runner : Thread = null;
  val className = getClass().getName()
  val streamDirectory = s"dbfs:/tmp/$username/new-flights"
  val airlines = Array( ("American", 0.17), ("Delta", 0.12), ("Frontier", 0.14), ("Hawaiian", 0.13), ("JetBlue", 0.15), ("United", 0.11), ("Southwest", 0.18) )
  val reasons = Array("Air Carrier", "Extreme Weather", "National Aviation System", "Security", "Late Aircraft")

  val rand = new Random(System.currentTimeMillis())
  var maxDuration = 3 * 60 * 1000 // default to three minutes

  def clean() {
    System.out.println("Removing old files for dummy data generator.")
    dbutils.fs.rm(streamDirectory, true)
    if (dbutils.fs.mkdirs(streamDirectory) == false) {
      throw new RuntimeException("Unable to create temp directory.")
    }
  }

  def run() {
    val date = LocalDate.now()
    val start = System.currentTimeMillis()

    while (System.currentTimeMillis() - start < maxDuration) {
      try {
        val dir = s"/dbfs/tmp/$username/new-flights"
        val tempFile = File.createTempFile("flights-", "", new File(dir)).getAbsolutePath()+".csv"
        val writer = new PrintWriter(tempFile)

        for (airline <- airlines) {
          val flightNumber = rand.nextInt(1000)+1000
          val deptTime = rand.nextInt(10)+10
          val departureTime = LocalDateTime.now().plusHours(-deptTime)
          val (name, odds) = airline
          val reason = Random.shuffle(reasons.toList).head
          val test = rand.nextDouble()

          val delay = if (test < odds)
            rand.nextInt(60)+(30*odds)
            else rand.nextInt(10)-5

          println(s"- Flight #$flightNumber by $name at $departureTime delayed $delay minutes due to $reason")
          writer.println(s""" "$flightNumber","$departureTime","$delay","$reason","$name" """.trim)
        }
        writer.close()

        // wait a couple of seconds
        //Thread.sleep(rand.nextInt(5000))

      } catch {
        case e: Exception => {
          printf("* Processing failure: %s%n", e.getMessage())
          return;
        }
      }
    }
    println("No more flights!")
  }

  def start(minutes:Int = 5) {
    maxDuration = minutes * 60 * 1000

    if (runner != null) {
      println("Stopping dummy data generator.")
      runner.interrupt();
      runner.join();
    }
    println(s"Running dummy data generator for $minutes minutes.")
    runner = new Thread(this);
    runner.run();
  }

  def stop() {
    start(0)
  }
}

DummyDataGenerator.clean()

displayHTML("Imported streaming logic...") // suppress output

Solution 1:

you should be able to use the Databricks Labs Data Generator on the Databricks community edition. I'm providing the instructions below:

Running Databricks Labs Data Generator on the community edition

The Databricks Labs Data Generator is a Pyspark library so the code to generate the data needs to be Python. But you should be able to create a view on the generated data and consume it from Scala if that's your preferred language.

  1. You can install the framework on the Databricks community edition by creating a notebook with the cell

%pip install git+https://github.com/databrickslabs/dbldatagen

Once it's installed you can then use the library to define a data generation spec and by using build, generate a Spark dataframe on it.

The following example shows generation of batch data similar to the data set you are trying to generate. This should be placed in a separate notebook cell

Note - here we generate 10 million records to illustrate ability to create larger data sets. It can be used to generate datasets much larger than that

%python

num_rows = 10 * 1000000  # number of rows to generate
num_partitions = 8  # number of Spark dataframe partitions

delay_reasons = ["Air Carrier", "Extreme Weather", "National Aviation System", "Security", "Late Aircraft"]

# will have implied column `id` for ordinal of row
flightdata_defn = (dg.DataGenerator(spark, name="flight_delay_data", rows=num_rows, partitions=num_partitions)
                 .withColumn("flightNumber", "int", minValue=1000, uniqueValues=10000, random=True)
                 .withColumn("airline", "string", minValue=1, maxValue=500,  prefix="airline", random=True, distribution="normal")
                 .withColumn("original_departure", "timestamp", begin="2020-01-01 01:00:00", end="2020-12-31 23:59:00", interval="1 minute", random=True)
                 .withColumn("delay_minutes", "int", minValue=20, maxValue=600, distribution=dg.distributions.Gamma(1.0, 2.0))
                 .withColumn("delayed_departure",  "timestamp", expr="cast(original_departure as bigint) +  (delay_minutes * 60) ", baseColumn=["original_departure", "delay_minutes"])
                 .withColumn("reason", "string", values=delay_reasons, random=True)
                )

df_flight_data = flightdata_defn.build()

display(df_flight_data)

You can find information on how to generate streaming data in the online documentation at https://databrickslabs.github.io/dbldatagen/public_docs/using_streaming_data.html

You can create a named temporary view over the data so that you can access it from SQL or Scala using one of two methods:

1: use createOrReplaceTempView

df_flight_data.createOrReplaceTempView("delays")

2: use options for build. In this case the name passed to the Data Instance initializer will be the name of the view

i.e

df_flight_data = flightdata_defn.build(withTempView=True)

Solution 2:

This code will not work on the community edition because of this line:

val dir = s"/dbfs/tmp/$username/new-flights"

as there is no DBFS fuse on Databricks community edition (it's supported only on full Databricks). It's potentially possible to make it working by:

  1. Changing that directory to local directory, like, /tmp or something like
  2. adding a code (after writer.close()) to list flights-* files in that local directory, and using dbutils.fs.mv to move them into streamDirectory