Chapter 9 Using the lower-level invoke API to manipulate Spark’s Java objects from R

There will be no foolish wand-waving or silly incantations in this class.

  • Severus Snape

In the previous chapters, we have shown how to write functions as both combinations of dplyr verbs and SQL query generators that can be executed by Spark, how to execute them with DBI and how to achieve lazy SQL statements that only get executed when needed.

In this chapter, we will look at how to write R functions that interface with Spark via a lower-level invocation API that lets us use all the functionality that is exposed by the Scala Spark APIs. We will also show how such R calls relate to Scala code.

9.1 The invoke() API of sparklyr

So far when interfacing with Spark from R, we have used the sparklyr package in three ways:

  • Writing combinations of dplyr verbs that would be translated to Spark SQL via the dbplyr package and the SQL executed by Spark when requested
  • Generating Spark SQL code directly and sending it for execution in multiple ways
  • Combinations of the above two methods

What these methods have in common is that they translate operations written in R to Spark SQL and that SQL code is then sent for execution by our Spark instance.

There is however another approach that we can use with sparklyr, which will be more familiar to users or developers who have worked with packages like rJava or rscala before. Even though possibly less convenient than the APIs provided by the 2 aforementioned packages, sparklyr provides an invocation API that exposes 3 functions:

  1. invoke(jobj, method, ...) to execute a method on a Java object reference
  2. invoke_static(sc, class, method, ...) to execute a static method associated with a Java class
  3. invoke_new(sc, class, ...) to invoke a constructor associated with a Java class

Let us have a look at how we can use those functions in practice to efficiently work with Spark from R.

9.2 Getting started with the invoke API

We can start with a few very simple examples of invoke() usage, for instance getting the number of rows of the tbl_flights:

## [1] 336776

We see one extra operation before invoking the count: spark_dataframe(). This is because the invoke() interface works with Java object references and not tbl objects in remote sources such as tbl_flights. We, therefore, need to convert tbl_flights to a Java object reference, for which we use the spark_dataframe() function.

Now, for something more exciting, let us compute a summary of the variables in tbl_flights using the describe method:

## # Source: spark<?> [?? x 20]
##   summary id    year  month day   dep_time sched_dep_time dep_delay arr_time
##   <chr>   <chr> <chr> <chr> <chr> <chr>    <chr>          <chr>     <chr>   
## 1 count   3367… 3367… 3367… 3367… 328521   336776         328521    328063  
## 2 mean    1683… 2013… 6.54… 15.7… 1349.10… 1344.25484001… 12.63907… 1502.05…
## 3 stddev  9721… 0.0   3.41… 8.76… 488.281… 467.335755734… 40.21006… 533.264…
## 4 min     1     2013  1     1     1        106            -43.0     1       
## 5 max     3367… 2013  12    31    2400     2359           1301.0    2400    
## # … with 11 more variables: sched_arr_time <chr>, arr_delay <chr>,
## #   carrier <chr>, flight <chr>, tailnum <chr>, origin <chr>, dest <chr>,
## #   air_time <chr>, distance <chr>, hour <chr>, minute <chr>

We also one see extra operation after invoking the describe method: sdf_register(). This is because the invoke() interface also returns Java object references and we may like to see a more user-friendly tbl object instead. This is where sdf_register() comes in to register a Spark DataFrame and return a tbl_spark object back to us.

And indeed, we can see that the wrapper sdf_describe() provided by the sparklyr package itself works in a very similar fashion:

## {
##     in_df <- cols %in% colnames(x)
##     if (any(!in_df)) {
##         msg <- paste0("The following columns are not in the data frame: ", 
##             paste0(cols[which(!in_df)], collapse = ", "))
##         stop(msg)
##     }
##     cols <- cast_character_list(cols)
##     x %>% spark_dataframe() %>% invoke("describe", cols) %>% 
##         sdf_register()
## }

If we so wish, for DataFrame related object references, we can also call collect() to retrieve the results directly, without using sdf_register() first, for instance retrieving the full content of the origin column:

## # A tibble: 336,776 x 1
##    origin
##    <chr> 
##  1 EWR   
##  2 LGA   
##  3 JFK   
##  4 JFK   
##  5 LGA   
##  6 EWR   
##  7 EWR   
##  8 LGA   
##  9 JFK   
## 10 LGA   
## # … with 336,766 more rows

It can also be helpful to investigate the schema of our flights DataFrame:

## <jobj[733]>
##   org.apache.spark.sql.types.StructType
##   StructType(StructField(id,IntegerType,true), StructField(year,IntegerType,true), StructField(month,IntegerType,true), StructField(day,IntegerType,true), StructField(dep_time,IntegerType,true), StructField(sched_dep_time,IntegerType,true), StructField(dep_delay,DoubleType,true), StructField(arr_time,IntegerType,true), StructField(sched_arr_time,IntegerType,true), StructField(arr_delay,DoubleType,true), StructField(carrier,StringType,true), StructField(flight,IntegerType,true), StructField(tailnum,StringType,true), StructField(origin,StringType,true), StructField(dest,StringType,true), StructField(air_time,DoubleType,true), StructField(distance,DoubleType,true), StructField(hour,DoubleType,true), StructField(minute,DoubleType,true), StructField(time_hour,TimestampType,true))

We can also use the invoke interface on other objects, for instance the SparkContext. Let’s for instance retrieve the uiWebUrl of our context:

## [1] "Some(http://localhost:4040)"

9.3 Grouping and aggregation with invoke chains

Imagine we would like to do simple aggregations of a Spark DataFrame, such as an average of a column grouped by another column. For reference, we can do this very simply using the dplyr approach. Let’s compute the average departure delay by origin of the flight:

## # Source: spark<?> [?? x 2]
##   origin `avg(dep_delay)`
##   <chr>             <dbl>
## 1 EWR                15.1
## 2 JFK                12.1
## 3 LGA                10.3

Now we will show how to do the same aggregation via the lower level API. Using the Spark shell we would simply write in Scala:

Translating that into the lower level invoke() API provided by sparklyr can look similar to the following code.

## # Source: spark<?> [?? x 2]
##   origin `avg(dep_delay)`
##   <chr>             <dbl>
## 1 EWR                15.1
## 2 JFK                12.1
## 3 LGA                10.3

9.3.1 What is all that extra code?

Now, compared to the very simple 2 operations in the Scala version, we have some gotchas to examine:

  • one of the invoke() calls is quite long. Instead of just avg("dep_delay") like in the Scala example, we use invoke_static(sc, "org.apache.spark.sql.functions", "expr", "avg(dep_delay)"). This is because the avg("dep_delay") expression is somewhat of a syntactic sugar provided by Scala, but when calling from R we need to provide the object reference hidden behind that sugar.

  • the empty list() at the end of the "groupBy" and "agg" invokes. This is needed as a workaround some Scala methods take String, String* as arguments and sparklyr currently does not support variable parameters. We can pass list() to represent an empty String[] in Scala as the needed second argument.

9.4 Wrapping the invocations into R functions

Seeing the above example, we can quickly write a useful wrapper to ease the pain a little. First, we can create a small function that will generate the aggregation expression we can use with invoke("agg", ...).

Next, we can wrap around the entire process to make a more generic aggregation function, using the fact that a remote tibble has the details on sc within its tbl[["src"]][["con"]] element:

And finally use our wrapper to get the same results in a more user-friendly way:

## # Source: spark<?> [?? x 2]
##   origin `avg(arr_delay)`
##   <chr>             <dbl>
## 1 EWR                9.11
## 2 JFK                5.55
## 3 LGA                5.78

9.5 Reconstructing variable normalization

Now we will attempt to construct the variable normalization that we have shown in the previous parts with dplyr verbs and SQL generation - we will normalize the values of a column by first subtracting the mean value and then dividing the values by the standard deviation:

## # Source: spark<?> [?? x 16]
##       id origin  year month   day  hour   temp  dewp humid wind_dir wind_speed
##    <int> <chr>  <int> <int> <int> <int>  <dbl> <dbl> <dbl>    <dbl>      <dbl>
##  1     1 EWR     2013     1     1     1 -0.913  26.1  59.4      270      10.4 
##  2     2 EWR     2013     1     1     2 -0.913  27.0  61.6      250       8.06
##  3     3 EWR     2013     1     1     3 -0.913  28.0  64.4      240      11.5 
##  4     4 EWR     2013     1     1     4 -0.862  28.0  62.2      250      12.7 
##  5     5 EWR     2013     1     1     5 -0.913  28.0  64.4      260      12.7 
##  6     6 EWR     2013     1     1     6 -0.974  28.0  67.2      240      11.5 
##  7     7 EWR     2013     1     1     7 -0.913  28.0  64.4      240      15.0 
##  8     8 EWR     2013     1     1     8 -0.862  28.0  62.2      250      10.4 
##  9     9 EWR     2013     1     1     9 -0.862  28.0  62.2      260      15.0 
## 10    10 EWR     2013     1     1    10 -0.802  28.0  59.6      260      13.8 
## # … with more rows, and 5 more variables: wind_gust <dbl>, precip <dbl>,
## #   pressure <dbl>, visib <dbl>, time_hour <dttm>

The above implementation is just an example and far from optimal, but it also has a few interesting points about it:

  • Using invoke("first") will actually compute and collect the value into the R session
  • Those collected values are then sent back during the invoke("minus", as.numeric(avgTemp)) and invoke("divide", as.numeric(stdTemp))

This means that there is unnecessary overhead when sending those values from the Spark instance into R and back, which will have slight performance penalties.

9.6 Where invoke can be better than dplyr translation or SQL

As we have seen in the above examples, working with the invoke() API can prove more difficult than using the intuitive syntax of dplyr or SQL queries. In some use cases, the trade-off may still be worth it. In our practice, these are some examples of such situations:

  • When Scala’s Spark API is more flexible, powerful or suitable for a particular task and the translation is not as good
  • When performance is crucial and we can produce more optimal solutions using the invocations
  • When we know the Scala API well and not want to invest time to learn the dplyr syntax, but it is easier to translate the Scala calls into a series of invoke() calls
  • When we need to interact and manipulate other Java objects apart from the standard Spark DataFrames

9.7 Conclusion

In this chapter, we have looked at how to use the lower-level invoke interface provided by sparklyr to manipulate Spark objects and other Java object references. In the following chapter, we will look a bit deeper and look into using Java’s reflection API to make the invoke interface more accessible from R, getting detail invocation logs and more.