Spark surprises for the uninitiated
Recently I was delivering a Spark course. One of the exercises asked the students to split a Spark DataFrame in two, non-overlapping, parts.
One of the students came up with a creative way to do so.
He started by adding a monotonically increasing ID column to the DataFrame. Spark has a built-in
function for this, monotonically_increasing_id
— you can find how to use it in the docs.
His idea was pretty simple: once creating a new column with this increasing ID, he would select a subset of the initial DataFrame and then do an anti-join with the initial one to find the complement1.
However this wasn't working. His initial DataFrame had 10000 lines. The first sub-selection had 3000 lines. The result of the anti-join, however, didn't have 7000 items.
Was his code wrong? At the essence the code was very simple. He would first read a DataFrame
df = ( spark .read.csv('chicken.csv', header=True) .repartition(40) # simulate "big" for illustrative purposes .withColumn('increasing_id', sf.monotonically_increasing_id()) )
followed by a subselection
df_small = df.filter(sf.col('weight') < 100)
and finally the anti-join
df_large_on_increasing_id = df.join( df_small, how='leftanti', on='increasing_id' )
(Disclaimer: this code is simplified; df_large_on_increasing_id
can obviously be obtained by
selecting everything with weight greater or equal to 100. Thanks for noticing!)
If we now select everything from df_large_on_increasing_id
where the weight is less than
100, we should get an empty DataFrame. Do we?
df_large_on_increasing_id.filter(sf.col('weight') < 100).show(n=6)
increasing_id | id | weight | time | chick | diet |
---|---|---|---|---|---|
2 | 296 | 87.0 | 8.0 | 27 | 2 |
5 | 182 | 42.0 | 0.0 | 17 | 1 |
11 | 403 | 76.0 | 6.0 | 36 | 3 |
8589934592 | 377 | 49.0 | 2.0 | 34 | 3 |
8589934596 | 342 | 62.0 | 4.0 | 31 | 3 |
8589934601 | 152 | 81.0 | 18.0 | 13 | 1 |
Clearly not! But what is happening then? When creating df_small
, I should only remain with
records with a weight smaller than 100. For example
df_small.show(n=3)
increasing_id | id | weight | time | chick | diet |
---|---|---|---|---|---|
8589934592 | 377 | 49.0 | 2.0 | 34 | 3 |
8589934596 | 342 | 62.0 | 4.0 | 31 | 3 |
8589934601 | 152 | 81.0 | 18.0 | 13 | 1 |
When doing an anti-join with the initial DataFrame, these records should be definitely removed: the
increasing_id
s 8589934592, 8589934596, and 8589934601 are present in df_small
, so they should
not be present in df_large_on_increasing_id = df - df_small
!
The issue is subtle, but if you are familiar with how Spark really works, you should have already noticed it!
Spark is lazy by default: this means that, when we call monotonically_increasing_id
, Spark is
actually not doing anything besides tracking that, when we will actually need the
increasing_id
column, it needs to use the monotonically_increasing_id
function to compute it.
But when do we need that column? Well, it turns out we need it pretty late in the process: only
when calling show
it will start doing the computation!
So what, you might say! When the computation will be triggered, why would the result be different?
Well, the hint lies in the documentation of the monotonically_increasing_id
function (emphasis
mine):
The function is non-deterministic because its result depends on partition IDs.
Interesting: the IDs depend on the partition ID! This means that the function uses the partition
IDs of df_small
when doing the computation for df_small
, while it uses the partition IDs of
df
when doing the computation for df
!
How can we verify this is true? (Or True
, if you're feeling pythonic)
We can simply cache and then materialize the column before using it!
To do so we first cache
the DataFrame — cache
tells Spark: once this
DataFrame has been computed, hold it in memory for as long as you can — and then do a count
—
count
tells Spark: now I want to materialize the results (and then tell me how many records they
contain!)
df = ( spark .read.csv('chicken.csv', header=True) .repartition(40) # simulate "big" .withColumn('increasing_id', sf.monotonically_increasing_id()) ) # once an action will be triggered, count in this case, # Spark will hold df as it is now in memory df.cache() # this is the action that materializes df df.count()
If we now re-execute the code we had before, we get a very different result!
df_small = df.filter(sf.col('weight') < 100) df_large_on_increasing_id = df.join( df_small, how='leftanti', on='increasing_id' ) df_large_on_increasing_id.filter(sf.col('weight') < 100).show()
increasing_id | id | weight | time | chick | diet |
---|---|---|---|---|---|
|
|
|
|
|
|
The DataFrame is empty, as we expected!!
This was a good fix, but does it work well in production?
The answer lies in what I wrote before: Spark will keep the DataFrame in memory only for as long as it can! If memory is, for whatever reason, tight, it will evict the DataFrame, recomputing it, and hence giving you back the wrong results!
If you have Andrew Snare as a colleague, you probably heard it before: using cache
to ensure
correctness in Spark is a dangerous anti-pattern2.
The students were quick to ask if there is any way around this doom. The answer is simple: write the DataFrame back to disk after you've added the column. As wasteful as it might seem3 it's the only way to ensure correctness without changing the logic (i.e. doing the anti-join with the monotonically increasing ID).4
Interested in Data Science with Spark Training?
The course I wrote about at the start of the post is one of our finest. You can sign up for the next one, scheduled for March 20-22 2019.
If you can't make it, there's always more scheduled and you can get in touch for a private, in-company session!
If you want more rambling throughout the month, follow me on Twitter: I'm gglanzani there!
-
The details are a bit more intricate. An anti-join is, basically, a subtraction. If you write
df1.anti-join(df2)
you are effectively doingdf1 - df2
, i.e. give me all the records indf1
that do not appear indf2
. ↩ -
It can still be useful for performance, but don't count on it for correctness. ↩
-
Narrator voice: it is. ↩
-
This is the right place to remember everyone that there are other ways to split a DataFrame, and that the exercise didn't require using a monotonically increasing ID to be done. However it was one of the highlights of the day for the students: gather all the theoretical knowledge they absorbed until then to fix what was seemingly a bug in Spark but was actually a bug in their code. ↩