Chapter 9 Using the lower-level invoke API to manipulate Spark’s Java objects from R
There will be no foolish wand-waving or silly incantations in this class.
- Severus Snape
In the previous chapters, we have shown how to write functions as both combinations of dplyr verbs and SQL query generators that can be executed by Spark, how to execute them with DBI and how to achieve lazy SQL statements that only get executed when needed.
In this chapter, we will look at how to write R functions that interface with Spark via a lower-level invocation API that lets us use all the functionality that is exposed by the Scala Spark APIs. We will also show how such R calls relate to Scala code.
9.1 The invoke() API of sparklyr
So far when interfacing with Spark from R, we have used the sparklyr package in three ways:
- Writing combinations of dplyr verbs that would be translated to Spark SQL via the dbplyr package and the SQL executed by Spark when requested
- Generating Spark SQL code directly and sending it for execution in multiple ways
- Combinations of the above two methods
What these methods have in common is that they translate operations written in R to Spark SQL and that SQL code is then sent for execution by our Spark instance.
There is however another approach that we can use with sparklyr, which will be more familiar to users or developers who have worked with packages like rJava or rscala before. Even though possibly less convenient than the APIs provided by the 2 aforementioned packages, sparklyr provides an invocation API that exposes 3 functions:
invoke(jobj, method, ...)
to execute a method on a Java object referenceinvoke_static(sc, class, method, ...)
to execute a static method associated with a Java classinvoke_new(sc, class, ...)
to invoke a constructor associated with a Java class
Let us have a look at how we can use those functions in practice to efficiently work with Spark from R.
9.2 Getting started with the invoke API
We can start with a few very simple examples of invoke()
usage, for instance getting the number of rows of the tbl_flights
:
## [1] 336776
We see one extra operation before invoking the count: spark_dataframe()
. This is because the invoke()
interface works with Java object references and not tbl
objects in remote sources such as tbl_flights
. We, therefore, need to convert tbl_flights
to a Java object reference, for which we use the spark_dataframe()
function.
Now, for something more exciting, let us compute a summary of the variables in tbl_flights
using the describe
method:
tbl_flights_summary <- tbl_flights %>%
spark_dataframe() %>%
invoke("describe", as.list(colnames(tbl_flights))) %>%
sdf_register()
tbl_flights_summary
## # Source: spark<?> [?? x 20]
## summary id year month day dep_time sched_dep_time dep_delay arr_time
## <chr> <chr> <chr> <chr> <chr> <chr> <chr> <chr> <chr>
## 1 count 3367… 3367… 3367… 3367… 328521 336776 328521 328063
## 2 mean 1683… 2013… 6.54… 15.7… 1349.10… 1344.25484001… 12.63907… 1502.05…
## 3 stddev 9721… 0.0 3.41… 8.76… 488.281… 467.335755734… 40.21006… 533.264…
## 4 min 1 2013 1 1 1 106 -43.0 1
## 5 max 3367… 2013 12 31 2400 2359 1301.0 2400
## # … with 11 more variables: sched_arr_time <chr>, arr_delay <chr>,
## # carrier <chr>, flight <chr>, tailnum <chr>, origin <chr>, dest <chr>,
## # air_time <chr>, distance <chr>, hour <chr>, minute <chr>
We also one see extra operation after invoking the describe method: sdf_register()
. This is because the invoke()
interface also returns Java object references and we may like to see a more user-friendly tbl
object instead. This is where sdf_register()
comes in to register a Spark DataFrame and return a tbl_spark
object back to us.
And indeed, we can see that the wrapper sdf_describe()
provided by the sparklyr package itself works in a very similar fashion:
## {
## in_df <- cols %in% colnames(x)
## if (any(!in_df)) {
## msg <- paste0("The following columns are not in the data frame: ",
## paste0(cols[which(!in_df)], collapse = ", "))
## stop(msg)
## }
## cols <- cast_character_list(cols)
## x %>% spark_dataframe() %>% invoke("describe", cols) %>%
## sdf_register()
## }
If we so wish, for DataFrame related object references, we can also call collect()
to retrieve the results directly, without using sdf_register()
first, for instance retrieving the full content of the origin
column:
## # A tibble: 336,776 x 1
## origin
## <chr>
## 1 EWR
## 2 LGA
## 3 JFK
## 4 JFK
## 5 LGA
## 6 EWR
## 7 EWR
## 8 LGA
## 9 JFK
## 10 LGA
## # … with 336,766 more rows
It can also be helpful to investigate the schema of our flights
DataFrame:
## <jobj[733]>
## org.apache.spark.sql.types.StructType
## StructType(StructField(id,IntegerType,true), StructField(year,IntegerType,true), StructField(month,IntegerType,true), StructField(day,IntegerType,true), StructField(dep_time,IntegerType,true), StructField(sched_dep_time,IntegerType,true), StructField(dep_delay,DoubleType,true), StructField(arr_time,IntegerType,true), StructField(sched_arr_time,IntegerType,true), StructField(arr_delay,DoubleType,true), StructField(carrier,StringType,true), StructField(flight,IntegerType,true), StructField(tailnum,StringType,true), StructField(origin,StringType,true), StructField(dest,StringType,true), StructField(air_time,DoubleType,true), StructField(distance,DoubleType,true), StructField(hour,DoubleType,true), StructField(minute,DoubleType,true), StructField(time_hour,TimestampType,true))
We can also use the invoke interface on other objects, for instance the SparkContext
. Let’s for instance retrieve the uiWebUrl
of our context:
## [1] "Some(http://localhost:4040)"
9.3 Grouping and aggregation with invoke chains
Imagine we would like to do simple aggregations of a Spark DataFrame, such as an average of a column grouped by another column. For reference, we can do this very simply using the dplyr approach. Let’s compute the average departure delay by origin of the flight:
## # Source: spark<?> [?? x 2]
## origin `avg(dep_delay)`
## <chr> <dbl>
## 1 EWR 15.1
## 2 JFK 12.1
## 3 LGA 10.3
Now we will show how to do the same aggregation via the lower level API. Using the Spark shell we would simply write in Scala:
Translating that into the lower level invoke()
API provided by sparklyr can look similar to the following code.
tbl_flights %>%
spark_dataframe() %>%
invoke("groupBy", "origin", list()) %>%
invoke("agg", invoke_static(sc, "org.apache.spark.sql.functions", "expr", "avg(dep_delay)"), list()) %>%
sdf_register()
## # Source: spark<?> [?? x 2]
## origin `avg(dep_delay)`
## <chr> <dbl>
## 1 EWR 15.1
## 2 JFK 12.1
## 3 LGA 10.3
9.3.1 What is all that extra code?
Now, compared to the very simple 2 operations in the Scala version, we have some gotchas to examine:
one of the
invoke()
calls is quite long. Instead of justavg("dep_delay")
like in the Scala example, we useinvoke_static(sc, "org.apache.spark.sql.functions", "expr", "avg(dep_delay)")
. This is because theavg("dep_delay")
expression is somewhat of a syntactic sugar provided by Scala, but when calling from R we need to provide the object reference hidden behind that sugar.the empty
list()
at the end of the"groupBy"
and"agg"
invokes. This is needed as a workaround some Scala methods take String, String* as arguments and sparklyr currently does not support variable parameters. We can passlist()
to represent an emptyString[]
in Scala as the needed second argument.
9.4 Wrapping the invocations into R functions
Seeing the above example, we can quickly write a useful wrapper to ease the pain a little. First, we can create a small function that will generate the aggregation expression we can use with invoke("agg", ...)
.
agg_expr <- function(tbl, exprs) {
sparklyr::invoke_static(
tbl[["src"]][["con"]],
"org.apache.spark.sql.functions",
"expr",
exprs
)
}
Next, we can wrap around the entire process to make a more generic aggregation function, using the fact that a remote tibble has the details on sc
within its tbl[["src"]][["con"]]
element:
grpagg_invoke <- function(tbl, colName, groupColName, aggOperation) {
avgColumn <- tbl %>% agg_expr(paste0(aggOperation, "(", colName, ")"))
tbl %>%
spark_dataframe() %>%
invoke("groupBy", groupColName, list()) %>%
invoke("agg", avgColumn, list()) %>%
sdf_register()
}
And finally use our wrapper to get the same results in a more user-friendly way:
## # Source: spark<?> [?? x 2]
## origin `avg(arr_delay)`
## <chr> <dbl>
## 1 EWR 9.11
## 2 JFK 5.55
## 3 LGA 5.78
9.5 Reconstructing variable normalization
Now we will attempt to construct the variable normalization that we have shown in the previous parts with dplyr verbs and SQL generation - we will normalize the values of a column by first subtracting the mean value and then dividing the values by the standard deviation:
normalize_invoke <- function(tbl, colName) {
sdf <- tbl %>% spark_dataframe()
stdCol <- agg_expr(tbl, paste0("stddev_samp(", colName, ")"))
avgCol <- agg_expr(tbl, paste0("avg(", colName, ")"))
avgTemp <- sdf %>%
invoke("agg", avgCol, list()) %>%
invoke("first")
stdTemp <- sdf %>%
invoke("agg", stdCol, list()) %>%
invoke("first")
newCol <- sdf %>%
invoke("col", colName) %>%
invoke("minus", as.numeric(avgTemp)) %>%
invoke("divide", as.numeric(stdTemp))
sdf %>%
invoke("withColumn", colName, newCol) %>%
sdf_register()
}
tbl_weather %>% normalize_invoke("temp")
## # Source: spark<?> [?? x 16]
## id origin year month day hour temp dewp humid wind_dir wind_speed
## <int> <chr> <int> <int> <int> <int> <dbl> <dbl> <dbl> <dbl> <dbl>
## 1 1 EWR 2013 1 1 1 -0.913 26.1 59.4 270 10.4
## 2 2 EWR 2013 1 1 2 -0.913 27.0 61.6 250 8.06
## 3 3 EWR 2013 1 1 3 -0.913 28.0 64.4 240 11.5
## 4 4 EWR 2013 1 1 4 -0.862 28.0 62.2 250 12.7
## 5 5 EWR 2013 1 1 5 -0.913 28.0 64.4 260 12.7
## 6 6 EWR 2013 1 1 6 -0.974 28.0 67.2 240 11.5
## 7 7 EWR 2013 1 1 7 -0.913 28.0 64.4 240 15.0
## 8 8 EWR 2013 1 1 8 -0.862 28.0 62.2 250 10.4
## 9 9 EWR 2013 1 1 9 -0.862 28.0 62.2 260 15.0
## 10 10 EWR 2013 1 1 10 -0.802 28.0 59.6 260 13.8
## # … with more rows, and 5 more variables: wind_gust <dbl>, precip <dbl>,
## # pressure <dbl>, visib <dbl>, time_hour <dttm>
The above implementation is just an example and far from optimal, but it also has a few interesting points about it:
- Using
invoke("first")
will actually compute and collect the value into the R session - Those collected values are then sent back during the
invoke("minus", as.numeric(avgTemp))
andinvoke("divide", as.numeric(stdTemp))
This means that there is unnecessary overhead when sending those values from the Spark instance into R and back, which will have slight performance penalties.
9.6 Where invoke can be better than dplyr translation or SQL
As we have seen in the above examples, working with the invoke()
API can prove more difficult than using the intuitive syntax of dplyr or SQL queries. In some use cases, the trade-off may still be worth it. In our practice, these are some examples of such situations:
- When Scala’s Spark API is more flexible, powerful or suitable for a particular task and the translation is not as good
- When performance is crucial and we can produce more optimal solutions using the invocations
- When we know the Scala API well and not want to invest time to learn the dplyr syntax, but it is easier to translate the Scala calls into a series of
invoke()
calls - When we need to interact and manipulate other Java objects apart from the standard Spark DataFrames
9.7 Conclusion
In this chapter, we have looked at how to use the lower-level invoke interface provided by sparklyr to manipulate Spark objects and other Java object references. In the following chapter, we will look a bit deeper and look into using Java’s reflection API to make the invoke interface more accessible from R, getting detail invocation logs and more.