Chapter 5 Communication between Spark and sparklyr

In this chapter, we will examine how the sparklyr interface communicates with the Spark instance and what this means for performance with regards to arbitrarily defined R functions. We will also look at how Apache Arrow can improve the performance of object serialization.

5.1 Sparklyr as a Spark interface provider

The sparklyr package is an R-based interface to Apache Spark. The meaning of the word interface is very important in this context as the way we use this interface can significantly affect the performance benefits we get from using Spark.

To understand the meaning of the above a bit better, we will examine 3 very simple functions that are different in implementation but intend to provide the same results, and how they behave with regards to Spark. Our goal will be completely trivial - convert the origin column that contains the airport of origin of flights from uppercase to all lowercase. We will keep using the datasets from the nycflights13 package for our examples.

5.2 An R function translated to Spark SQL

Using the following fun_implemented() function will yield the expected results for both a local data frame weather and the remote Spark object referenced by tbl_weather.

First, let us run fun_implemented for a local data frame in our R session. Note that the output of the command is A tibble: 26,115 x 16, meaning this is an object in our local R session.

## # A tibble: 26,115 x 16
##       id origin  year month   day  hour  temp  dewp humid wind_dir wind_speed
##    <int> <chr>  <int> <int> <int> <int> <dbl> <dbl> <dbl>    <dbl>      <dbl>
##  1     1 ewr     2013     1     1     1  39.0  26.1  59.4      270      10.4 
##  2     2 ewr     2013     1     1     2  39.0  27.0  61.6      250       8.06
##  3     3 ewr     2013     1     1     3  39.0  28.0  64.4      240      11.5 
##  4     4 ewr     2013     1     1     4  39.9  28.0  62.2      250      12.7 
##  5     5 ewr     2013     1     1     5  39.0  28.0  64.4      260      12.7 
##  6     6 ewr     2013     1     1     6  37.9  28.0  67.2      240      11.5 
##  7     7 ewr     2013     1     1     7  39.0  28.0  64.4      240      15.0 
##  8     8 ewr     2013     1     1     8  39.9  28.0  62.2      250      10.4 
##  9     9 ewr     2013     1     1     9  39.9  28.0  62.2      260      15.0 
## 10    10 ewr     2013     1     1    10  41    28.0  59.6      260      13.8 
## # … with 26,105 more rows, and 5 more variables: wind_gust <dbl>, precip <dbl>,
## #   pressure <dbl>, visib <dbl>, time_hour <dttm>

Next, we use it against a remote Spark DataFrame. Notice that here the output is a remote object with Source: spark<?> [?? x 16] and once again, Spark only executed the minimal work to show this printout, so we do not yet know how many lines in total are in the resulting DataFrame.

## # Source: spark<?> [?? x 16]
##       id origin  year month   day  hour  temp  dewp humid wind_dir wind_speed
##    <int> <chr>  <int> <int> <int> <int> <dbl> <dbl> <dbl>    <dbl>      <dbl>
##  1     1 ewr     2013     1     1     1  39.0  26.1  59.4      270      10.4 
##  2     2 ewr     2013     1     1     2  39.0  27.0  61.6      250       8.06
##  3     3 ewr     2013     1     1     3  39.0  28.0  64.4      240      11.5 
##  4     4 ewr     2013     1     1     4  39.9  28.0  62.2      250      12.7 
##  5     5 ewr     2013     1     1     5  39.0  28.0  64.4      260      12.7 
##  6     6 ewr     2013     1     1     6  37.9  28.0  67.2      240      11.5 
##  7     7 ewr     2013     1     1     7  39.0  28.0  64.4      240      15.0 
##  8     8 ewr     2013     1     1     8  39.9  28.0  62.2      250      10.4 
##  9     9 ewr     2013     1     1     9  39.9  28.0  62.2      260      15.0 
## 10    10 ewr     2013     1     1    10  41    28.0  59.6      260      13.8 
## # … with more rows, and 5 more variables: wind_gust <dbl>, precip <dbl>,
## #   pressure <dbl>, visib <dbl>, time_hour <dttm>

5.2.1 How does Spark know the R function tolower()?

Actually, it does not. Our function call worked within Spark because the R function tolower() was translated by the functionality of the dbplyr package to Spark SQL - converting the R tolower() function to LOWER, which is a function available in Spark SQL. The resulting query was then sent to Spark to be executed.

This is the main mode of operation of the sparklyr interface - translating our R code to Spark SQL code and using Spark’s SQL API to execute it. We can see the actual translated SQL by running sql_render() on the above function call.

## <SQL> SELECT `id`, LOWER(`origin`) AS `origin`, `year`, `month`, `day`, `hour`, `temp`, `dewp`, `humid`, `wind_dir`, `wind_speed`, `wind_gust`, `precip`, `pressure`, `visib`, `time_hour`
## FROM `weather`

5.3 An R function not translated to Spark SQL

Using the following fun_r_only() function will only yield the expected results for a local data frame weather. For the remote Spark object referenced by tbl_weather we will get an error:

The function executes successfully on a local R data frame as R knows the function casefold():

## # A tibble: 26,115 x 16
##       id origin  year month   day  hour  temp  dewp humid wind_dir wind_speed
##    <int> <chr>  <int> <int> <int> <int> <dbl> <dbl> <dbl>    <dbl>      <dbl>
##  1     1 ewr     2013     1     1     1  39.0  26.1  59.4      270      10.4 
##  2     2 ewr     2013     1     1     2  39.0  27.0  61.6      250       8.06
##  3     3 ewr     2013     1     1     3  39.0  28.0  64.4      240      11.5 
##  4     4 ewr     2013     1     1     4  39.9  28.0  62.2      250      12.7 
##  5     5 ewr     2013     1     1     5  39.0  28.0  64.4      260      12.7 
##  6     6 ewr     2013     1     1     6  37.9  28.0  67.2      240      11.5 
##  7     7 ewr     2013     1     1     7  39.0  28.0  64.4      240      15.0 
##  8     8 ewr     2013     1     1     8  39.9  28.0  62.2      250      10.4 
##  9     9 ewr     2013     1     1     9  39.9  28.0  62.2      260      15.0 
## 10    10 ewr     2013     1     1    10  41    28.0  59.6      260      13.8 
## # … with 26,105 more rows, and 5 more variables: wind_gust <dbl>, precip <dbl>,
## #   pressure <dbl>, visib <dbl>, time_hour <dttm>

Trying to execute fun_r_only() against a Spark DataFrame however errors:

## Error: org.apache.spark.sql.catalyst.parser.ParseException: 
## mismatched input 'AS' expecting ')'(line 1, pos 38)
## 
## == SQL ==
## SELECT `id`, casefold(`origin`, FALSE AS `upper`) AS `origin`, `year`, `month`, `day`, `hour`, `temp`, `dewp`, `humid`, `wind_dir`, `wind_speed`, `wind_gust`, `precip`, `pressure`, `visib`, `time_hour`
...

This is because there simply is no translation provided by dbplyr for the casefold() function. The generated Spark SQL will therefore not be valid and throw an error once the Spark SQL parser tries to parse it.

5.4 A Hive built-in function not existing in R

On the other hand, using the below fun_hive_builtin() function will only yield the expected results for the remote Spark object referenced by tbl_weather. For the local data frame weather we will get an error:

The function fails to execute on a local R data frame as R does not know the function lower():

## Error in lower(~origin): could not find function "lower"

However, against a Spark DataFrame the code works as desired:

## # Source: spark<?> [?? x 16]
##       id origin  year month   day  hour  temp  dewp humid wind_dir wind_speed
##    <int> <chr>  <int> <int> <int> <int> <dbl> <dbl> <dbl>    <dbl>      <dbl>
##  1     1 ewr     2013     1     1     1  39.0  26.1  59.4      270      10.4 
##  2     2 ewr     2013     1     1     2  39.0  27.0  61.6      250       8.06
##  3     3 ewr     2013     1     1     3  39.0  28.0  64.4      240      11.5 
##  4     4 ewr     2013     1     1     4  39.9  28.0  62.2      250      12.7 
##  5     5 ewr     2013     1     1     5  39.0  28.0  64.4      260      12.7 
##  6     6 ewr     2013     1     1     6  37.9  28.0  67.2      240      11.5 
##  7     7 ewr     2013     1     1     7  39.0  28.0  64.4      240      15.0 
##  8     8 ewr     2013     1     1     8  39.9  28.0  62.2      250      10.4 
##  9     9 ewr     2013     1     1     9  39.9  28.0  62.2      260      15.0 
## 10    10 ewr     2013     1     1    10  41    28.0  59.6      260      13.8 
## # … with more rows, and 5 more variables: wind_gust <dbl>, precip <dbl>,
## #   pressure <dbl>, visib <dbl>, time_hour <dttm>

This is because, as seen above the function lower() does not exist in R itself. For a non-existing R function there obviously can be no dbplyr translation either. In this case, dbplyr keeps it as-is when translating to SQL, not doing any translation.

The SQL will be valid and executed without problems because lower is, in fact, a function built-in to Hive, so the following generated SQL is valid.

## <SQL> SELECT `id`, lower(`origin`) AS `origin`, `year`, `month`, `day`, `hour`, `temp`, `dewp`, `humid`, `wind_dir`, `wind_speed`, `wind_gust`, `precip`, `pressure`, `visib`, `time_hour`
## FROM `weather`

5.5 Using non-translated functions with sparklyr

It can easily happen that one of the functions we want to use falls into the category where it is neither translated or a Hive built-in function. In this case, there is another interface provided by sparklyr that can allow us to do that - the spark_apply() function. We will look into this interface in more detail in the next chapter.

There is also a lower-level API provided by sparklyr allowing us to invoke Scala methods without using SQL translation. We discuss this API in detail in the Using the lower-level invoke API to manipulate Spark’s Java objects from R and Exploring the invoke API from R with Java reflection and examining invokes with logs chapters.