Chapter 6 Non-translated functions with spark_apply
In this chapter, we will look into the spark_apply()
interface and how it communicates with the Spark instance. As an example, we will try to rewrite the function from the previous chapter allowing us to use the non-translated casefold()
function with Spark.
# Define a custom function using `casefold`
fun_r_custom <- function(tbl, colName) {
tbl[[colName]] <- casefold(tbl[[colName]], upper = FALSE)
tbl
}
Now, we can use the spark_apply()
interface to execute our custom function on a Spark DataFrame, providing the name of the column on which we want to apply the function as the context
argument.
# Execute on Spark via `spark_apply`:
head(tbl_weather) %>%
spark_apply(fun_r_custom, context = {colName <- "origin"})
## # Source: spark<?> [?? x 16]
## id origin year month day hour temp dewp humid wind_dir wind_speed
## <int> <chr> <int> <int> <int> <int> <dbl> <dbl> <dbl> <dbl> <dbl>
## 1 1 ewr 2013 1 1 1 39.0 26.1 59.4 270 10.4
## 2 2 ewr 2013 1 1 2 39.0 27.0 61.6 250 8.06
## 3 3 ewr 2013 1 1 3 39.0 28.0 64.4 240 11.5
## 4 4 ewr 2013 1 1 4 39.9 28.0 62.2 250 12.7
## 5 5 ewr 2013 1 1 5 39.0 28.0 64.4 260 12.7
## 6 6 ewr 2013 1 1 6 37.9 28.0 67.2 240 11.5
## # … with 5 more variables: wind_gust <dbl>, precip <dbl>, pressure <dbl>,
## # visib <dbl>, time_hour <dttm>
6.1 What is so important about this distinction?
We have now shown that we can also send code that was not translated by dbplyr to Spark and get it executed without issues using spark_apply()
. So what is the catch and where does the importance of the meaning of the word interface come in?
Let us quickly examine the performance of the 3 operations
- using a Hive built-in function directly,
- using a function translated by dbplyr and
- using
spark_apply()
microbenchmark::microbenchmark(
times = 10,
hive_builtin = fun_hive_builtin(tbl_weather, origin) %>% collect(),
translated_dplyr = fun_implemented(tbl_weather, origin) %>% collect(),
spark_apply = spark_apply(tbl_weather, fun_r_custom, context = {colName <- "origin"}) %>% collect()
)
We can see that
- the operations executed via the SQL translation mechanism of dbplyr (via both the hive built-in function and an R function with a SQL translation available) were executed in around 0.5 seconds while
- the operation via
spark_apply()
took orders of magnitude longer - more than 6 minutes
Note that the absolute values here will vary based on the setup and the infrastructure, the important message is in the relative differences, not in the absolute timings.
6.2 What happens when we use custom functions with spark_apply()
We can now see that the operation with spark_apply()
is extremely slow compared to the other two. The key to understanding the difference is to examine how the custom transformations of data using R functions are performed within spark_apply()
.
In simplified terms, this happens in a few steps:
- the data is moved in row-format from Spark into the R process through a socket connection. This is inefficient as multiple data types need to be deserialized over each row
- the data gets converted to columnar format since this is how R data frames are implemented
- the R functions are applied to compute the results
- the results are again converted to row-format, serialized row-by-row and sent back to Spark over the socket connection
6.3 What happens when we use translated or Hive built-in functions
When using functions that can be translated to Spark SQL the process is very different
- The call is translated to Spark SQL using the dbplyr backend
- The constructed query is sent to Spark for execution using DBI
- Only when
collect()
orcompute()
is called, the SQL is executed within Spark - Only when
collect()
is called the results are also sent to the R session
This means that the transfer of data only happens once and only when collect()
is called, which saves a vast amount of overhead.
6.4 Which R functionality is currently translated and built-in to Hive
An important question to answer with regards to performance then is what amount of functionality is available using the fast dbplyr backend. As seen above, these features can be categorized into two groups:
R functions translatable to Spark SQL via dbplyr. The full list of such functions is available on RStudio’s sparklyr website
Hive built-in functions that get translated as they are and can be evaluated by Spark. The full list is available on the Hive Operators and User-Defined Functions website.
6.5 Making serialization faster with Apache Arrow
6.5.1 What is Apache Arrow and how it improves performance
Our benchmarks have shown that using spark_apply()
does not scale well and the penalty of the bottleneck in performance caused by serialization, deserialization, and transfer is too high.
To partially mitigate this we can take advantage of Apache Arrow, a cross-language development platform for in-memory data that specifies a standardized language-independent columnar memory format for flat and hierarchical data.
By adding support for Arrow in sparklyr, it makes Spark perform the row-format to column-format conversion in parallel in Spark, data is then transferred through the socket but no custom serialization takes place and all the R process needs to do is copy this data from the socket into its heap, transform it and copy it back to the socket connection.
This makes the process significantly faster:
microbenchmark::microbenchmark(
times = 10,
setup = library(arrow),
hive_builtin = fun_hive_builtin(tbl_weather, origin) %>% collect(),
translated_dplyr = fun_implemented(tbl_weather, origin) %>% collect(),
spark_apply_arrow = spark_apply(tbl_weather, fun_r_custom, context = {colName <- "origin"}) %>% collect()
)
We can see that the timing on spark_apply()
decreased from more than 6 minutes to around 4.5 seconds, which is a very significant performance boost. Compared to the other methods we however still experience an order of magnitude difference.
6.6 Conclusion, take-home messages
Adding Arrow to the mix certainly significantly improved the performance of our example code, but is still quite slow compared to the native approach. Based on the above, we could conclude that:
- Performance benefits are present mainly when all the computation is performed within Spark and R serves merely as a “messaging agent”, sending commands to Spark to be executed
- If there are object serialization and transfer of larger objects present, performance is strongly impacted.
The take-home message from this exercise is that
- We should strive to only use R code that can be executed within the Spark instance - If we need some data retrieved, it is advisable that this is data that was previously heavily aggregated within Spark and only a small amount is transferred to the R session.
6.7 But we still need arbitrary functions to run fast
In the next chapters, we will investigate a few options that allow us to retain the performance of Spark while still being able to write arbitrary R functions by using methods already implemented and available in the Spark API.