How to setup and structure a spark application in scala

Discussed in this article will be:

  • SBT setup
  • Implementing a Spark trait that reads commandline arguments properly and creates a spark session
  • Testing spark jobs and functions
  • CI/CD

Spark Scala Application - Tim van Cann - GoDataDriven

Why?

More often than not I notice companies and employees struggling to find a good spark application structure. Code is filled with side-effects, such as mixing I/O with logic, making the spark logic impossible to unit test. They are not using a proper structure that separates concerns destroying all modularity, increasing technical debt. Even though those principles are hard in general, using a computation framework such as spark does not make it easier since each job usually does input -> transform -> output.

This post proposes a structure and some best practices that try to address these issues.

Getting started

To get started and have a look at the project structure, clone the repository at github

SBT setup

The first few lines in build.sbt are the most important lines to bootstrap your application. They contain the project name and the spark dependencies.

These lines define the name, version and organization of your project and are needed to upload a succesfull build to a binary store, more on that later. We use scalaVersion 2.11.11 since that is the version spark is compiled against and as of writing the latest available spark version is 2.2.0. The dependencies make sure that spark is available on the classpath for compilation, however the scope is Provided as we assume that wherever we deploy our application a spark cluster is already running. Scopt is a very useful library to help reading arguments from commandline, we'll pull in that package on the default scope Compile. Last but very important scalatest is pulled in to help write proper unit test and integration tests. Since this library is only needed for testing the scope is limited to test and it. The latter must be enabled before the it command is recognized and is done so by the 2 lines:

Defaults.itSettings
lazy val root = project.in(file(".")).configs(IntegrationTest)

This enables the command

$ sbt it:test

to run all integration tests under folder src/it/

The lines

Test / testOptions += Tests.Argument("-oD")
IntegrationTest / testOptions += Tests.Argument("-oD")

are helpful utility lines that enabled time measurement for each test such that sbt can print those in the testreport.

Assembly

Since we're deploying the application on some cluster all dependencies that are not available on the classpath of the cluster must be packed into the jar. For that the sbt-assembly plugin is used. The project/assembly.sbt file consists of a single line:

addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.4")

which enables the bash command.

$ sbt assembly

This command will run all your tests and packages all Compile scope libraries into the jar, creating a FAT jar. For more information, refer to the github page for sbt-assembly

Spark jobs

The main spark trait is src/main/scala/thw/vancann/SparkJob.scala. It essentially does 2 things:

  • Read in and parse any optional and required command line arguments into a case class
  • Start a SparkSession, initialize a Storage object and call the run function.

The only thing actual jobs need to do is implement the functions appName and run.

Scopt

src/main/scala/thw/vancann/UsageConfig.scala is the file containing the UsageConfig case class and the parser that parses the command line arguments. For more information about scopt, please refer to their github page.

Storage

The src/main/scala/thw/vancann/storage/Storage.scala should define all I/O of your application. There should be no hidden reads and writes, everything should go through one place such that consolidation can be achieved. Note that for this structure we will always assume the use of Datasets typed as proper case classes, as one should.

Storage is defined as a trait, this makes it trivial to hook in a different storage implementation for separate use cases or when switching cloud providers, making the code slightly more modular.

Example Job

An actual example sparkjob is provided in src/main/scala/thw/vancann/WordCount.scala. Note that this job seperates I/O from actual logic.

The run function reads in all sources needed for this job, this function should have NO LOGIC besides I/O. The magic happens in the transform function, inspired by spark Transformers. Taking one or multiple Datasets plus auxiliary arguments and should transform the data as needed. Returning the resulting Dataset. Anything equal and lower than the transform function SHOULD NOT do any I/O or have side effects in general. This makes it trivial to test all functions without having to rely on external data sources or stubbing databases. Yes, this requires discipline but it'll pay off!

Each spark job should only write to one destination and generally do only one thing, i.e. transform and/or combine some data, pull from an API, create a model (using SparkML or H2o, apply a model or do some ingestion. It is fine - and usually necessarily - to read from multiple sources.

It is better to have more jobs doing smaller things than a big job doing lots of things! Smaller jobs are easier to test, easier to modify and certainly easier to debug.

Testing

For testing one usually needs to have a spark session available for each test. To reuse the same session, and to not start a new one for each test class, a SharedSparkSession in src/test/scala/thw/vancann/SharedSparkSession.scala is provided. By importing this singleton into each test the same session is reused throughout tests and suites.

An example for such a test is provided in src/test/scala/thw/vancann/WordCountTest.scala using FlatSpec from scalatest as teststyle and test library respectively.

Any I/O tests generally go in the src/it folder to separate the unit tests from the integration tests. An integration test that reads a file is provided in src/it/scala/thw/vancann/WordCountTest.scala. Note again the usefulness of the Storage trait as we can easily implement a LocalStorage to read from the resources folder, removing the need for mocking / stubbing.

Continuous Integration (CI) and Continuous Delivery (CD)

To make sure each push to a branch does not break any code or styleguides Continuous Ingegration (CI) is a good way to catch each culprit and adhere to agreements made by your team.

Provided is a way to do CI in gitlab, which allows you to run CI and CD for free as long as you configure your own runner which is quite easy and very well documented here.

The file .gitlab-ci.yml describes how to run three stages in a pristine docker container.

(1) Tests and codecoverage

Run both the unit tests and integration tests and measure the code coverage using scoverage which is added as plugin in project/plugins.sbt.

coverageExcludedPackages := "<empty>;.*storage.*"
coverageMinimum := 70
coverageFailOnMinimum := true

in build.sbt define some settings for scoverage. These particular settings set a coverage level of minimum 70 and let the build fail if coveragereport finds out that the coverage falls below this number.

(2) Scalastyle

Check for predefined style violations defined in scalastyle-config.xml. For more information have a look at scalastyle. The lines

scalastyleFailOnWarning := false
scalastyleFailOnError := true

in build.sbt define if and when scalastyle should fail the build.

(3) Generate scaladocs and publish the project

For this last step a few additional parameters need to be uncommented in build.sbt. Have a look at the sbt documentation to see what these lines do.

Note that starting each docker container for each stage is quite slow as all sbt dependencies need to be pulled in. A slightly more advanced and significantly faster solution is to prepack a docker image that contains any dependencies defined in build.sbt in the docker image .ivy2 repository folder.

Final notes

There you have it. A project to bootstrap your spark application! Feel free to tweak, add and remove as you please, but do take note of the best practices proposed:

  • Separate concerns
  • No side-effects
  • A spark job should generally have only one output, restricting the job to do only one thing.
  • Think modular!

We are hiring

Stay up to date on the latest insights and best-practices by registering for the GoDataDriven newsletter.
Follow us for more of this