Chapter 10 Exploring the invoke API from R with Java reflection and examining invokes with logs

Then darkness took me, and I strayed out of thought and time, and I wandered far on roads that I will not tell

  • Gandalf the White

In the previous chapters, we have shown how to write functions as both combinations of dplyr verbs, SQL query generators that can be executed by Spark and how to use the lower-level API to invoke methods on Java object references from R.

In this chapter, we will look into more details around sparklyr’s invoke() API, investigate available methods for different classes of objects using the Java reflection API and look under the hood of the sparklyr interface mechanism with invoke logging.

10.1 Examining available methods from R

If you did not do so, it is recommended to read the previous chapter of this book before this one to get a quick overview of the invoke() API.

10.2 Using the Java reflection API to list the available methods

The invoke() interface is powerful, but also a bit hidden from the eyes as we do not immediately know what methods are available for which object classes. We can circumvent that using the getMethods method which (in short) returns an array of Method objects reflecting public member methods of the class.

For instance, retrieving a list of methods for the org.apache.spark.SparkContext class:

## [[1]]
## <jobj[828]>
##   java.lang.reflect.Method
##   public org.apache.spark.util.CallSite org.apache.spark.SparkContext.org$apache$spark$SparkContext$$creationSite()
## 
## [[2]]
## <jobj[829]>
##   java.lang.reflect.Method
##   public org.apache.spark.SparkConf org.apache.spark.SparkContext.org$apache$spark$SparkContext$$_conf()
## 
## [[3]]
## <jobj[830]>
##   java.lang.reflect.Method
##   public org.apache.spark.SparkEnv org.apache.spark.SparkContext.org$apache$spark$SparkContext$$_env()
## 
## [[4]]
## <jobj[831]>
##   java.lang.reflect.Method
##   public scala.Option org.apache.spark.SparkContext.org$apache$spark$SparkContext$$_progressBar()
## 
## [[5]]
## <jobj[832]>
##   java.lang.reflect.Method
##   public scala.Option org.apache.spark.SparkContext.org$apache$spark$SparkContext$$_ui()
## 
## [[6]]
## <jobj[833]>
##   java.lang.reflect.Method
##   public org.apache.spark.rpc.RpcEndpointRef org.apache.spark.SparkContext.org$apache$spark$SparkContext$$_heartbeatReceiver()

We can see that the invoke() chain has returned a list of Java object references, each of them of class java.lang.reflect.Method. This is a good result, but the output is not very user-friendly from the R user perspective. Let us write a small wrapper that will return a some of the method’s details in a more readable fashion, for instance the return type and an overview of parameters.

Finally, to get a nice overview, we can make another helper function that will return a named list of methods for an object’s class, including their return types and overview of parameters.

10.3 Investigating DataSet and SparkContext class methods

Using the above defined function we can explore the methods available to a DataFrame reference, showing a few of the names first.

##  [1] "agg"                           "alias"                        
##  [3] "apply"                         "as"                           
##  [5] "cache"                         "checkpoint"                   
##  [7] "coalesce"                      "col"                          
##  [9] "collect"                       "collectAsArrowToPython"       
## [11] "collectAsList"                 "collectToPython"              
## [13] "colRegex"                      "columns"                      
## [15] "count"                         "createGlobalTempView"         
## [17] "createOrReplaceGlobalTempView" "createOrReplaceTempView"      
## [19] "createTempView"                "crossJoin"

If we would like to see more details we can now investigate further, for instance show different parameter interfaces for the agg method, showing that the agg method has the following parameter interfaces.

##                                                                                                                                  agg 
##                                                                             "java.util.Map<java.lang.String, java.lang.String> arg0" 
##                                                                                                                                  agg 
##                                                              "org.apache.spark.sql.Column arg0, org.apache.spark.sql.Column... arg1" 
##                                                                                                                                  agg 
##                                           "org.apache.spark.sql.Column arg0, scala.collection.Seq<org.apache.spark.sql.Column> arg1" 
##                                                                                                                                  agg 
##                                                            "scala.collection.immutable.Map<java.lang.String, java.lang.String> arg0" 
##                                                                                                                                  agg 
## "scala.Tuple2<java.lang.String, java.lang.String> arg0, scala.collection.Seq<scala.Tuple2<java.lang.String, java.lang.String>> arg1"

Similarly, we can look at a SparkContext class and show some available methods that can be invoked.

##  [1] "$lessinit$greater$default$3" "$lessinit$greater$default$4"
##  [3] "$lessinit$greater$default$5" "accumulable"                
##  [5] "accumulableCollection"       "accumulator"                
##  [7] "addedFiles"                  "addedJars"                  
##  [9] "addFile"                     "addJar"                     
## [11] "addSparkListener"            "applicationAttemptId"       
## [13] "applicationId"               "appName"                    
## [15] "assertNotStopped"            "binaryFiles"                
## [17] "binaryFiles$default$2"       "binaryRecords"              
## [19] "binaryRecords$default$3"     "broadcast"                  
## [21] "cancelAllJobs"               "cancelJob"                  
## [23] "cancelJobGroup"              "cancelStage"                
## [25] "checkpointDir"               "checkpointDir_$eq"          
## [27] "checkpointFile"              "clean"                      
## [29] "clean$default$2"             "cleaner"                    
## [31] "clearCallSite"               "clearJobGroup"              
## [33] "collectionAccumulator"       "conf"                       
## [35] "createSparkEnv"              "dagScheduler"               
## [37] "dagScheduler_$eq"            "defaultMinPartitions"       
## [39] "defaultParallelism"          "deployMode"                 
## [41] "doubleAccumulator"           "emptyRDD"                   
## [43] "env"                         "equals"                     
## [45] "eventLogCodec"               "eventLogDir"                
## [47] "eventLogger"                 "executorAllocationManager"  
## [49] "executorEnvs"                "executorMemory"             
## [51] "files"                       "getAllPools"                
## [53] "getCallSite"                 "getCheckpointDir"           
## [55] "getClass"                    "getConf"                    
## [57] "getExecutorIds"              "getExecutorMemoryStatus"    
## [59] "getExecutorThreadDump"       "getLocalProperties"

10.3.1 Using helpers to explore the methods

We can also use the helper functions to investigate more. For instance, we see that there is a getConf method avaiable to us. Looking at the object reference however does not provide useful information, so we can list the methods for that class and look for "get" methods that would show us the configuration.

##  [1] "get"                 "get"                 "get"                
##  [4] "getAll"              "getAllWithPrefix"    "getAppId"           
##  [7] "getAvroSchema"       "getBoolean"          "getClass"           
## [10] "getDeprecatedConfig" "getDouble"           "getenv"             
## [13] "getExecutorEnv"      "getInt"              "getLong"            
## [16] "getOption"           "getSizeAsBytes"      "getSizeAsBytes"     
## [19] "getSizeAsBytes"      "getSizeAsGb"         "getSizeAsGb"        
## [22] "getSizeAsKb"         "getSizeAsKb"         "getSizeAsMb"        
## [25] "getSizeAsMb"         "getTimeAsMs"         "getTimeAsMs"        
## [28] "getTimeAsSeconds"    "getTimeAsSeconds"    "getWithSubstitution"

We see that there is a getAll method that could prove useful, returning a list of tuples and taking no arguments as input.

##              returnType                  params 
## "class [Lscala.Tuple2;"                      ""
## [1] "(spark.app.name,sparklyr)"        "(spark.driver.host,localhost)"   
## [3] "(spark.sql.shuffle.partitions,2)"

Looking at the Scala documentation for the getAll method, we actually see that there is information missing on our data - the classes of the objects in the tuple, which in this case is scala.Tuple2<java.lang.String,java.lang.String>[].

We could therefore improve our helper to be more detailed in the return value information.

10.3.2 Unexported helpers provided by sparklyr

The sparklyr package itself provides facilities of nature similar to those above, looking at some of them, even though they are not exported.

## [1] "SparkConf" "Object"
## [1] "org.apache.spark.SparkConf"
##  [1] "<jobj[1645]>"                                                                                                                   
##  [2] "  org.apache.spark.SparkConf"                                                                                                   
##  [3] "  org.apache.spark.SparkConf@7ec389e7"                                                                                          
##  [4] "Fields:"                                                                                                                        
##  [5] "<jobj[2490]>"                                                                                                                   
##  [6] "  java.lang.reflect.Field"                                                                                                      
##  [7] "  private final java.util.concurrent.ConcurrentHashMap org.apache.spark.SparkConf.org$apache$spark$SparkConf$$settings"         
##  [8] "<jobj[2491]>"                                                                                                                   
##  [9] "  java.lang.reflect.Field"                                                                                                      
## [10] "  private transient org.apache.spark.internal.config.ConfigReader org.apache.spark.SparkConf.org$apache$spark$SparkConf$$reader"

10.4 How sparklyr communicates with Spark, invoke logging

Now that we have and overview of the invoke() interface, we can take a look under the hood of sparklyr and see how it actually communicates with the Spark instance. In fact, the communication is a set of invocations that can be very different depending on which of the approches we choose for our purposes.

To obtain the information, we use the sparklyr.log.invoke property. We can choose one of the following 3 values based on our preferences:

  • TRUE will use message() to communicate short info on what is being invoked
  • "cat" will use cat() to communicate short info on what is being invoked
  • "callstack" will use message() to communicate short info on what is being invoked and the callstack

We will use "cat" below to keep the output short and easily manageable. First, we will close the previous connection and create a new one with the configuration containing the sparklyr.log.invoke set to "cat", and copy in the flights dataset:

## Invoking sparklyr.Shell getBackend
## Invoking getSparkContext
## Invoking org.apache.spark.SparkConf
## Invoking setAppName
## Invoking setMaster
## Invoking setSparkHome
## Invoking set
## Invoking set
## Invoking org.apache.spark.sql.SparkSession builder
## Invoking config
## Invoking config
## Invoking getOrCreate
## Invoking sparkContext
## Invoking setSparkContext
## Invoking org.apache.spark.api.java.JavaSparkContext fromSparkContext
## Invoking version
## Invoking uiWebUrl
## Invoking isEmpty
## Invoking uiWebUrl
## Invoking get
## Invoking sql
## Invoking schema
## Invoking fields
## Invoking dataType
## Invoking toString
## Invoking name
## Invoking dataType
## Invoking toString
## Invoking name
## Invoking dataType
## Invoking toString
## Invoking name
## Invoking sparklyr.Utils collectColumn
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructField
## Invoking sparklyr.SQLUtils createStructType
## Invoking sparklyr.Utils createDataFrameFromCsv
## Invoking createDataFrame
## Invoking createOrReplaceTempView
## Invoking sql
## Invoking sql
## Invoking isStreaming
## Invoking sparklyr.Utils collect
## Invoking columns
## Invoking schema
## Invoking fields
## Invoking dataType
## Invoking toString
## Invoking name
## Invoking sql
## Invoking schema
## Invoking fieldNames

10.4.1 Using dplyr verbs translated with dbplyr

Now that the setup is complete, we use the dplyr verb approach to retrieve the count of rows and look the invocations that this entails.

## Invoking sql
## Invoking sql
## Invoking columns
## Invoking isStreaming
## Invoking sql
## Invoking isStreaming
## Invoking sql
## Invoking sparklyr.Utils collect
## Invoking columns
## Invoking schema
## Invoking fields
## Invoking dataType
## Invoking toString
## Invoking name
## Invoking sql
## Invoking columns
## # Source: spark<?> [?? x 1]
##        n
##    <dbl>
## 1 336776

We see multiple invocations do the sql method and also the columns method. This makes sense since the dplyr verb approach actually works by translating the commands into Spark SQL via dbplyr and then sends those translated commands to Spark via that interface.

10.4.2 Using DBI to send queries

Similarly, we can investigate the invocations that happen when we try to retrieve the same results via the DBI interface:

## Invoking sql
## Invoking isStreaming
## Invoking sparklyr.Utils collect
## Invoking columns
## Invoking schema
## Invoking fields
## Invoking dataType
## Invoking toString
## Invoking name
##        n
## 1 336776

We see slightly fewer invocations compared to the above dplyr approach, but the output is also less processed.

10.4.3 Using the invoke interface

Looking at the invocations that get executed using the invoke() interface.

## Invoking sql
## Invoking count
## [1] 336776

We see that the amount of invocations is much lower, where the top 3 invocations come from the first part of the pipe. The invoke("count") part translated to exactly one invocation to the count method.

We see therefore that the invoke() interface is indeed a more lower-level interface that invokes methods as we request them, with little to no overhead related to translations and other effects.

10.4.4 Redirecting the invoke logs

When running R applications that use Spark as a calculation engine, it is useful to get detailed invoke logs for debugging and diagnostic purposes. Implementing such mechanisms, we need to take into consideration how R handles the invoke logs produced by sparklyr. In simple terms, the invoke logs produced when using

  • TRUE and "callstack" are created using message(), which means they get sent to the stderr() connection by default
  • "cat" are created using cat(), so they get sent to stdout() connection by default

This info can prove useful when redirecting the log information from standard output and standard error to different logging targets.

10.5 Conclusion

In this chapter, we have looked at using the Java reflection API with sparklyr’s invoke() interface to get useful insight on available methods for different object types that can be used in the context of Spark, but also other contexts. Using invoke logging, we have also shown how the different sparklyr interfacing methods communicate with Spark under the hood.