Chapter 10 Exploring the invoke API from R with Java reflection and examining invokes with logs
Then darkness took me, and I strayed out of thought and time, and I wandered far on roads that I will not tell
- Gandalf the White
In the previous chapters, we have shown how to write functions as both combinations of dplyr verbs, SQL query generators that can be executed by Spark and how to use the lower-level API to invoke methods on Java object references from R.
In this chapter, we will look into more details around sparklyr’s invoke()
API, investigate available methods for different classes of objects using the Java reflection API and look under the hood of the sparklyr interface mechanism with invoke logging.
10.1 Examining available methods from R
If you did not do so, it is recommended to read the previous chapter of this book before this one to get a quick overview of the invoke()
API.
10.2 Using the Java reflection API to list the available methods
The invoke()
interface is powerful, but also a bit hidden from the eyes as we do not immediately know what methods are available for which object classes. We can circumvent that using the getMethods
method which (in short) returns an array of Method objects reflecting public member methods of the class.
For instance, retrieving a list of methods for the org.apache.spark.SparkContext
class:
## [[1]]
## <jobj[828]>
## java.lang.reflect.Method
## public java.util.concurrent.atomic.AtomicBoolean org.apache.spark.SparkContext.stopped()
##
## [[2]]
## <jobj[829]>
## java.lang.reflect.Method
## public org.apache.spark.util.CallSite org.apache.spark.SparkContext.org$apache$spark$SparkContext$$creationSite()
##
## [[3]]
## <jobj[830]>
## java.lang.reflect.Method
## public org.apache.spark.SparkConf org.apache.spark.SparkContext.org$apache$spark$SparkContext$$_conf()
##
## [[4]]
## <jobj[831]>
## java.lang.reflect.Method
## public org.apache.spark.SparkEnv org.apache.spark.SparkContext.org$apache$spark$SparkContext$$_env()
##
## [[5]]
## <jobj[832]>
## java.lang.reflect.Method
## public scala.Option org.apache.spark.SparkContext.org$apache$spark$SparkContext$$_progressBar()
##
## [[6]]
## <jobj[833]>
## java.lang.reflect.Method
## public scala.Option org.apache.spark.SparkContext.org$apache$spark$SparkContext$$_ui()
We can see that the invoke()
chain has returned a list of Java object references, each of them of class java.lang.reflect.Method
. This is a good result, but the output is not very user-friendly from the R user perspective. Let us write a small wrapper that will return some of the method’s details in a more readable fashion, for instance the return type and an overview of parameters.
getMethodDetails <- function(mthd) {
returnType <- mthd %>%
invoke("getReturnType") %>%
invoke("toString")
params <- mthd %>%
invoke("getParameters")
params <- vapply(params, invoke, "toString", FUN.VALUE = character(1))
c(returnType = returnType, params = paste(params, collapse = ", "))
}
Finally, to get a nice overview, we can make another helper function that will return a named list of methods for an object’s class, including their return types and overview of parameters.
10.3 Investigating DataSet and SparkContext class methods
Using the above defined function we can explore the methods available to a DataFrame reference, showing a few of the names first.
dfMethods <- tbl_flights %>% spark_dataframe() %>%
getAvailableMethods()
# Show some method names:
dfMethodNames <- sort(unique(names(dfMethods)))
head(dfMethodNames, 20)
## [1] "agg" "alias"
## [3] "apply" "as"
## [5] "cache" "checkpoint"
## [7] "coalesce" "col"
## [9] "collect" "collectAsArrowToPython"
## [11] "collectAsList" "collectToPython"
## [13] "colRegex" "columns"
## [15] "count" "createGlobalTempView"
## [17] "createOrReplaceGlobalTempView" "createOrReplaceTempView"
## [19] "createTempView" "crossJoin"
If we would like to see more details we can now investigate further, for instance show different parameter interfaces for the agg
method, showing that the agg
method has the following parameter interfaces.
## agg
## "java.util.Map<java.lang.String, java.lang.String> arg0"
## agg
## "org.apache.spark.sql.Column arg0, org.apache.spark.sql.Column... arg1"
## agg
## "org.apache.spark.sql.Column arg0, scala.collection.Seq<org.apache.spark.sql.Column> arg1"
## agg
## "scala.collection.immutable.Map<java.lang.String, java.lang.String> arg0"
## agg
## "scala.Tuple2<java.lang.String, java.lang.String> arg0, scala.collection.Seq<scala.Tuple2<java.lang.String, java.lang.String>> arg1"
Similarly, we can look at a SparkContext
class and show some available methods that can be invoked.
scMethods <- sc %>% spark_context() %>%
getAvailableMethods()
scMethodNames <- sort(unique(names(scMethods)))
head(scMethodNames, 60)
## [1] "$lessinit$greater$default$3" "$lessinit$greater$default$4"
## [3] "$lessinit$greater$default$5" "accumulable"
## [5] "accumulableCollection" "accumulator"
## [7] "addedFiles" "addedJars"
## [9] "addFile" "addJar"
## [11] "addSparkListener" "applicationAttemptId"
## [13] "applicationId" "appName"
## [15] "assertNotStopped" "binaryFiles"
## [17] "binaryFiles$default$2" "binaryRecords"
## [19] "binaryRecords$default$3" "broadcast"
## [21] "cancelAllJobs" "cancelJob"
## [23] "cancelJobGroup" "cancelStage"
## [25] "checkpointDir" "checkpointDir_$eq"
## [27] "checkpointFile" "clean"
## [29] "clean$default$2" "cleaner"
## [31] "clearCallSite" "clearJobGroup"
## [33] "collectionAccumulator" "conf"
## [35] "createSparkEnv" "dagScheduler"
## [37] "dagScheduler_$eq" "defaultMinPartitions"
## [39] "defaultParallelism" "deployMode"
## [41] "doubleAccumulator" "emptyRDD"
## [43] "env" "equals"
## [45] "eventLogCodec" "eventLogDir"
## [47] "eventLogger" "executorAllocationManager"
## [49] "executorEnvs" "executorMemory"
## [51] "files" "getAllPools"
## [53] "getCallSite" "getCheckpointDir"
## [55] "getClass" "getConf"
## [57] "getExecutorIds" "getExecutorMemoryStatus"
## [59] "getExecutorThreadDump" "getLocalProperties"
10.3.1 Using helpers to explore the methods
We can also use the helper functions to investigate more. For instance, we see that there is a getConf
method avaiable to us. Looking at the object reference however does not provide useful information, so we can list the methods for that class and look for "get"
methods that would show us the configuration.
spark_conf <- spark_context(sc) %>% invoke("conf")
spark_conf_methods <- getAvailableMethods(spark_conf)
spark_conf_get_methods <- spark_conf_methods %>%
names() %>%
grep(pattern = "get", ., value = TRUE) %>%
sort()
spark_conf_get_methods
## [1] "get" "get" "get"
## [4] "getAll" "getAllWithPrefix" "getAppId"
## [7] "getAvroSchema" "getBoolean" "getClass"
## [10] "getDeprecatedConfig" "getDouble" "getenv"
## [13] "getExecutorEnv" "getInt" "getLong"
## [16] "getOption" "getSizeAsBytes" "getSizeAsBytes"
## [19] "getSizeAsBytes" "getSizeAsGb" "getSizeAsGb"
## [22] "getSizeAsKb" "getSizeAsKb" "getSizeAsMb"
## [25] "getSizeAsMb" "getTimeAsMs" "getTimeAsMs"
## [28] "getTimeAsSeconds" "getTimeAsSeconds" "getWithSubstitution"
We see that there is a getAll
method that could prove useful, returning a list of tuples and taking no arguments as input.
## returnType params
## "class [Lscala.Tuple2;" ""
# Invoke the `getAll` method and look at part of the result
spark_confs <- spark_conf %>% invoke("getAll")
spark_confs <- vapply(spark_confs, invoke, "toString", FUN.VALUE = character(1))
sort(spark_confs)[c(2, 3, 12)]
## [1] "(spark.app.name,sparklyr)" "(spark.driver.host,localhost)"
## [3] "(spark.sql.shuffle.partitions,2)"
Looking at the Scala documentation for the getAll
method, we actually see that there is information missing on our data - the classes of the objects in the tuple, which in this case is scala.Tuple2<java.lang.String,java.lang.String>[]
.
We could therefore improve our helper to be more detailed in the return value information.
10.3.2 Unexported helpers provided by sparklyr
The sparklyr package itself provides facilities of nature similar to those above, looking at some of them, even though they are not exported.
## [1] "SparkConf" "Object"
## [1] "org.apache.spark.SparkConf"
## [1] "<jobj[1645]>"
## [2] " org.apache.spark.SparkConf"
## [3] " org.apache.spark.SparkConf@7ec389e7"
## [4] "Fields:"
## [5] "<jobj[2490]>"
## [6] " java.lang.reflect.Field"
## [7] " private final java.util.concurrent.ConcurrentHashMap org.apache.spark.SparkConf.org$apache$spark$SparkConf$$settings"
## [8] "<jobj[2491]>"
## [9] " java.lang.reflect.Field"
## [10] " private transient org.apache.spark.internal.config.ConfigReader org.apache.spark.SparkConf.org$apache$spark$SparkConf$$reader"
10.4 How sparklyr communicates with Spark, invoke logging
Now that we have and overview of the invoke()
interface, we can take a look under the hood of sparklyr and see how it actually communicates with the Spark instance. In fact, the communication is a set of invocations that can be very different depending on which of the approches we choose for our purposes.
To obtain the information, we use the sparklyr.log.invoke
property. We can choose one of the following 3 values based on our preferences:
TRUE
will usemessage()
to communicate short info on what is being invoked"cat"
will usecat()
to communicate short info on what is being invoked"callstack"
will usemessage()
to communicate short info on what is being invoked and the callstack
We will use "cat"
below to keep the output short and easily manageable. First, we will close the previous connection and create a new one with the configuration containing the sparklyr.log.invoke
set to "cat"
, and copy in the flights dataset:
sparklyr::spark_disconnect(sc)
config <- sparklyr::spark_config()
config$sparklyr.log.invoke <- "cat"
suppressMessages({
sc <- sparklyr::spark_connect(master = "local", config = config)
tbl_flights <- dplyr::copy_to(sc, nycflights13::flights, "flights")
})
## Invoking sparklyr.Shell getBackend
## Invoking getSparkContext
## Invoking org.apache.spark.SparkConf
## Invoking setAppName
## Invoking setMaster
## Invoking setSparkHome
## Invoking set
## Invoking set
## Invoking org.apache.spark.sql.SparkSession builder
## Invoking config
## Invoking config
## Invoking getOrCreate
## Invoking sparkContext
## Invoking setSparkContext
## Invoking org.apache.spark.api.java.JavaSparkContext fromSparkContext
## Invoking version
## Invoking uiWebUrl
## Invoking isEmpty
## Invoking uiWebUrl
## Invoking get
## Invoking sql
## Invoking schema
## Invoking fields
## Invoking dataType
## Invoking toString
## Invoking name
## Invoking dataType
## Invoking toString
## Invoking name
## Invoking dataType
## Invoking toString
## Invoking name
## Invoking sparklyr.Utils collectColumn
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructType
## Invoking sparklyr.Utils createDataFrameFromCsv
## Invoking createDataFrame
## Invoking createOrReplaceTempView
## Invoking sql
## Invoking sql
## Invoking isStreaming
## Invoking sparklyr.Utils collect
## Invoking columns
## Invoking schema
## Invoking fields
## Invoking dataType
## Invoking toString
## Invoking name
## Invoking sql
## Invoking schema
## Invoking fieldNames
10.4.1 Using dplyr verbs translated with dbplyr
Now that the setup is complete, we use the dplyr verb approach to retrieve the count of rows and look at the invocations that this entails.
## Invoking sql
## Invoking sql
## Invoking columns
## Invoking isStreaming
## Invoking sql
## Invoking isStreaming
## Invoking sql
## Invoking sparklyr.Utils collect
## Invoking columns
## Invoking schema
## Invoking fields
## Invoking dataType
## Invoking toString
## Invoking name
## Invoking sql
## Invoking columns
## # Source: spark<?> [?? x 1]
## n
## <dbl>
## 1 336776
We see multiple invocations to the sql
method and also the columns
method. This makes sense since the dplyr verb approach actually works by translating the commands into Spark SQL via dbplyr and then sends those translated commands to Spark via that interface.
10.4.2 Using DBI to send queries
Similarly, we can investigate the invocations that happen when we try to retrieve the same results via the DBI interface:
## Invoking sql
## Invoking isStreaming
## Invoking sparklyr.Utils collect
## Invoking columns
## Invoking schema
## Invoking fields
## Invoking dataType
## Invoking toString
## Invoking name
## n
## 1 336776
We see slightly fewer invocations compared to the above dplyr approach, but the output is also less processed.
10.4.3 Using the invoke interface
Looking at the invocations that get executed using the invoke()
interface.
## Invoking sql
## Invoking count
## [1] 336776
We see that the amount of invocations is much lower, where the top 3 invocations come from the first part of the pipe. The invoke("count")
part translated to exactly one invocation to the count
method.
We see therefore that the invoke()
interface is indeed a more lower-level interface that invokes methods as we request them, with little to no overhead related to translations and other effects.
10.4.4 Redirecting the invoke logs
When running R applications that use Spark as a calculation engine, it is useful to get detailed invoke logs for debugging and diagnostic purposes. Implementing such mechanisms, we need to take into consideration how R handles the invoke logs produced by sparklyr. In simple terms, the invoke logs produced when using
TRUE
and"callstack"
are created usingmessage()
, which means they get sent to thestderr()
connection by default"cat"
are created usingcat()
, so they get sent tostdout()
connection by default
This info can prove useful when redirecting the log information from standard output and standard error to different logging targets.
10.5 Conclusion
In this chapter, we have looked at using the Java reflection API with sparklyr’s invoke()
interface to get useful insight on available methods for different object types that can be used in the context of Spark, but also other contexts. Using invoke logging, we have also shown how the different sparklyr interfacing methods communicate with Spark under the hood.