Xebia Background Header Wave

Recently I was delivering a Spark course. One of the exercises asked the students to split a Spark DataFrame in two, non-overlapping, parts.

One of the students came up with a creative way to do so.

split

He started by adding a monotonically increasing ID column to the DataFrame. Spark has a built-in function for this, monotonically_increasing_id — you can find how to use it in the docs.

His idea was pretty simple: once creating a new column with this increasing ID, he would select a subset of the initial DataFrame and then do an anti-join with the initial one to find the complement1.

However this wasn’t working. His initial DataFrame had 10000 lines. The first sub-selection had 3000 lines. The result of the anti-join, however, didn’t have 7000 items.

Was his code wrong? At the essence the code was very simple. He would first read a DataFrame

df = (
    spark
    .read.csv('chicken.csv', header=True)
    .repartition(40)  # simulate "big" for illustrative purposes
    .withColumn('increasing_id', sf.monotonically_increasing_id())
)

followed by a subselection

df_small = df.filter(sf.col('weight') < 100)

and finally the anti-join

df_large_on_increasing_id = df.join(
    df_small, how='leftanti', on='increasing_id'
)

(Disclaimer: this code is simplified; df_large_on_increasing_id can obviously be obtained by selecting everything with weight greater or equal to 100. Thanks for noticing!)

If we now select everything from df_large_on_increasing_id where the weight is less than 100, we should get an empty DataFrame. Do we?

df_large_on_increasing_id.filter(sf.col('weight') < 100).show(n=6)
increasing_ididweighttimechickdiet
229687.08.0272
518242.00.0171
1140376.06.0363
858993459237749.02.0343
858993459634262.04.0313
858993460115281.018.0131

Clearly not! But what is happening then? When creating df_small, I should only remain with records with a weight smaller than 100. For example

df_small.show(n=3)
increasing_ididweighttimechickdiet
858993459237749.02.0343
858993459634262.04.0313
858993460115281.018.0131

When doing an anti-join with the initial DataFrame, these records should be definitely removed: the increasing_ids 8589934592, 8589934596, and 8589934601 are present in df_small, so they should not be present in df_large_on_increasing_id = df - df_small!

The issue is subtle, but if you are familiar with how Spark really works, you should have already noticed it!

Spark is lazy by default: this means that, when we call monotonically_increasing_id, Spark is actually not doing anything besides tracking that, when we will actually need the increasing_id column, it needs to use the monotonically_increasing_id function to compute it.

But when do we need that column? Well, it turns out we need it pretty late in the process: only when calling show it will start doing the computation!

So what, you might say! When the computation will be triggered, why would the result be different?

Well, the hint lies in the documentation of the monotonically_increasing_id function (emphasis mine):

The function is non-deterministic because its result depends on partition IDs.

Interesting: the IDs depend on the partition ID! This means that the function uses the partition IDs of df_small when doing the computation for df_small, while it uses the partition IDs of df when doing the computation for df!

How can we verify this is true? (Or True, if you’re feeling pythonic)

We can simply cache and then materialize the column before using it!

To do so we first cache the DataFrame — cache tells Spark: once this DataFrame has been computed, hold it in memory for as long as you can — and then do a countcount tells Spark: now I want to materialize the results (and then tell me how many records they contain!)

df = (
    spark
    .read.csv('chicken.csv', header=True)
    .repartition(40)  # simulate "big"
    .withColumn('increasing_id', sf.monotonically_increasing_id())
)
# once an action will be triggered, count in this case,
# Spark will hold df as it is now in memory
df.cache()
# this is the action that materializes df
df.count()

If we now re-execute the code we had before, we get a very different result!

df_small = df.filter(sf.col('weight') < 100)
df_large_on_increasing_id = df.join(
    df_small, how='leftanti', on='increasing_id'
)
df_large_on_increasing_id.filter(sf.col('weight') < 100).show()
increasing_ididweighttimechickdiet

The DataFrame is empty, as we expected!!

This was a good fix, but does it work well in production?

The answer lies in what I wrote before: Spark will keep the DataFrame in memory only for as long as it can! If memory is, for whatever reason, tight, it will evict the DataFrame, recomputing it, and hence giving you back the wrong results!

If you have Andrew Snare as a colleague, you probably heard it before: using cache to ensure correctness in Spark is a dangerous anti-pattern2.

The students were quick to ask if there is any way around this doom. The answer is simple: write the DataFrame back to disk after you’ve added the column. As wasteful as it might seem3 it’s the only way to ensure correctness without changing the logic (i.e. doing the anti-join with the monotonically increasing ID).4

Interested in Data Science with Spark Training?

The course I wrote about at the start of the post is one of our finest. Read more on its dedicated page.


If you want more rambling throughout the month, follow me on Twitter: I’m gglanzani there!


  1. The details are a bit more intricate. An anti-join is, basically, a subtraction. If you write df1.anti-join(df2) you are effectively doing df1 - df2, i.e. give me all the records in df1 that do not appear in df2

  2. It can still be useful for performance, but don’t count on it for correctness. 

  3. Narrator voice: it is. 

  4. This is the right place to remember everyone that there are other ways to split a DataFrame, and that the exercise didn’t require using a monotonically increasing ID to be done. However it was one of the highlights of the day for the students: gather all the theoretical knowledge they absorbed until then to fix what was seemingly a bug in Spark but was actually a bug in their code. 

Questions?

Get in touch with us to learn more about the subject and related solutions

Explore related posts