Chapter 7 Constructing functions by piping dplyr verbs

In the previous chapters, we looked at how the sparklyr interface communicates with the Spark instance and what this means for performance with regards to arbitrarily defined R functions. We also examined how Apache Arrow can increase the performance of data transfers between the R session and the Spark instance.

In this chapter, we will look at how to write R functions that can be executed directly by Spark without serialization overhead that we have shown in the previous installment. We will focus on writing functions as combinations of dplyr verbs that can be translated using dbplyr and investigate how the SQL is generated and Spark plans created.

7.1 R functions as combinations of dplyr verbs and Spark

One of the approaches to retain the performance of Spark with arbitrary R functionality is to carefully design our functions such that in its entirety when using it with sparklyr, the function call can be translated directly to Spark SQL using dbplyr.

This allows us to write, package, test, and document the functions as we normally would, while still getting the performance benefits of Apache Spark.

Let’s look at an example where we would like to do simple transformations of data stored in a column of a data frame, such as normalization of one of the columns. For illustration purposes, we will normalize the values of a column by first subtracting the mean value and then dividing the values by the standard deviation.

7.1.1 Trying it with base R functions

The first attempt could be quite simple, we could attempt to take advantage of R’s base function scale() to do the work for us:

This function would work fine with a local data frame such as weather:

## # A tibble: 26,115 x 3
##       id  temp normTemp[,1]
##    <int> <dbl>        <dbl>
##  1     1  39.0       -0.913
##  2     2  39.0       -0.913
##  3     3  39.0       -0.913
##  4     4  39.9       -0.862
##  5     5  39.0       -0.913
##  6     6  37.9       -0.974
##  7     7  39.0       -0.913
##  8     8  39.9       -0.862
##  9     9  39.9       -0.862
## 10    10  41         -0.802
## # … with 26,105 more rows

However for a Spark DataFrame this would throw an error. This is because the base R function scale() is not translated by dbplyr at the moment and it is not a Hive built-in function either:

## Error: org.apache.spark.sql.AnalysisException: Undefined function: 'scale'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 21
##  at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15$$anonfun$applyOrElse$49.apply(Analyzer.scala:1281)
##  at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15$$anonfun$applyOrElse$49.apply(Analyzer.scala:1281)
##  at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
...

7.1.2 Using a combination of supported dplyr verbs and operations

To run the function successfully, we will need to rewrite it as a combination of functions and operations that are supported by the dbplyr translation to Spark SQL. One example implementation is as follows:

Using this function yields the desired results for both local and Spark data frames:

## # A tibble: 26,115 x 3
##       id  temp normTemp
##    <int> <dbl>    <dbl>
##  1     1  39.0   -0.913
##  2     2  39.0   -0.913
##  3     3  39.0   -0.913
##  4     4  39.9   -0.862
##  5     5  39.0   -0.913
##  6     6  37.9   -0.974
##  7     7  39.0   -0.913
##  8     8  39.9   -0.862
##  9     9  39.9   -0.862
## 10    10  41     -0.802
## # … with 26,105 more rows
## # A tibble: 26,115 x 3
##       id  temp normTemp
##    <int> <dbl>    <dbl>
##  1     1  39.0   -0.913
##  2     2  39.0   -0.913
##  3     3  39.0   -0.913
##  4     4  39.9   -0.862
##  5     5  39.0   -0.913
##  6     6  37.9   -0.974
##  7     7  39.0   -0.913
##  8     8  39.9   -0.862
##  9     9  39.9   -0.862
## 10    10  41     -0.802
## # … with 26,105 more rows

7.1.3 Investigating the SQL translation and its Spark plan

Another advantage of this approach is that we can investigate the plan by which the actions will be executed by Spark using the explain() function from the dplyr package. This will print both the SQL query constructed by dbplyr and the plan generated by Spark, which can help us investigate performance issues:

## <SQL>
## SELECT `id`, `origin`, `year`, `month`, `day`, `hour`, `temp`, `dewp`, `humid`, `wind_dir`, `wind_speed`, `wind_gust`, `precip`, `pressure`, `visib`, `time_hour`, (`temp` - AVG(`temp`) OVER ()) / stddev_samp(`temp`) OVER () AS `normTemp`
## FROM `weather`
## 
## <PLAN>
## == Physical Plan ==
## *(1) Project [id#24, origin#25, year#26, month#27, day#28, hour#29, temp#30, dewp#31, humid#32, wind_dir#33, wind_speed#34, wind_gust#35, precip#36, pressure#37, visib#38, time_hour#39, ((temp#30 - _we0#1995) / _we1#1996) AS normTemp#1981]
## +- Window [avg(temp#30) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#1995, stddev_samp(temp#30) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we1#1996]
##    +- Exchange SinglePartition
##       +- InMemoryTableScan [id#24, origin#25, year#26, month#27, day#28, hour#29, temp#30, dewp#31, humid#32, wind_dir#33, wind_speed#34, wind_gust#35, precip#36, pressure#37, visib#38, time_hour#39]
##             +- InMemoryRelation [id#24, origin#25, year#26, month#27, day#28, hour#29, temp#30, dewp#31, humid#32, wind_dir#33, wind_speed#34, wind_gust#35, precip#36, pressure#37, visib#38, time_hour#39], StorageLevel(disk, memory, deserialized, 1 replicas)
##                   +- Scan ExistingRDD[id#24,origin#25,year#26,month#27,day#28,hour#29,temp#30,dewp#31,humid#32,wind_dir#33,wind_speed#34,wind_gust#35,precip#36,pressure#37,visib#38,time_hour#39]

If we are only interested in the SQL itself as a character string, we can use dbplyr’s sql_render():

## [1] "SELECT `id`, `origin`, `year`, `month`, `day`, `hour`, `temp`, `dewp`, `humid`, `wind_dir`, `wind_speed`, `wind_gust`, `precip`, `pressure`, `visib`, `time_hour`, (`temp` - AVG(`temp`) OVER ()) / stddev_samp(`temp`) OVER () AS `normTemp`\nFROM `weather`"

7.2 A more complex use case - Joins, group bys, and aggregations

The dplyr syntax makes it very easy to construct more complex aggregations across multiple Spark DataFrames. An example of a function that joins 2 Spark DataFrames and computes a mean of a selected column, grouped by another column can look as follows:

We can then use this function for instance to look at the mean arrival delay of flights grouped by visibility. Note that we are only collecting heavily aggregated data - 20 rows in total. The overhead of data transfer from the Spark instance to the R session is therefore small. Also, just assigning the function call to delay_by_visib does not actually execute or collect anything, execution really starts only when collect() is called:

## Warning: Missing values are always removed in SQL.
## Use `mean(x, na.rm = TRUE)` to silence this warning
## This warning is displayed only once per session.
## # A tibble: 20 x 2
##    visib `mean(arr_delay)`
##    <dbl>             <dbl>
##  1  0                24.9 
##  2  0.06             28.5 
##  3  0.12             45.4 
##  4  0.25             20.8 
##  5  0.5              39.8 
##  6  0.75             41.4 
##  7  1                37.6 
##  8  1.25             65.1 
##  9  1.5              34.7 
## 10  1.75             45.6 
## 11  2                26.3 
## 12  2.5              21.7 
## 13  3                21.7 
## 14  4                17.7 
## 15  5                18.9 
## 16  6                17.3 
## 17  7                16.4 
## 18  8                16.1 
## 19  9                15.6 
## 20 10                 4.32

We can look at the plan and the generated SQL query as well:

## <SQL>
## SELECT `visib`, AVG(`arr_delay`) AS `mean(arr_delay)`
## FROM (SELECT `LHS`.`id` AS `id.x`, `RHS`.`year` AS `year`, `RHS`.`month` AS `month`, `RHS`.`day` AS `day`, `LHS`.`dep_time` AS `dep_time`, `LHS`.`sched_dep_time` AS `sched_dep_time`, `LHS`.`dep_delay` AS `dep_delay`, `LHS`.`arr_time` AS `arr_time`, `LHS`.`sched_arr_time` AS `sched_arr_time`, `LHS`.`arr_delay` AS `arr_delay`, `LHS`.`carrier` AS `carrier`, `LHS`.`flight` AS `flight`, `LHS`.`tailnum` AS `tailnum`, `RHS`.`origin` AS `origin`, `LHS`.`dest` AS `dest`, `LHS`.`air_time` AS `air_time`, `LHS`.`distance` AS `distance`, `RHS`.`hour` AS `hour`, `LHS`.`minute` AS `minute`, `RHS`.`time_hour` AS `time_hour`, `RHS`.`id` AS `id.y`, `RHS`.`temp` AS `temp`, `RHS`.`dewp` AS `dewp`, `RHS`.`humid` AS `humid`, `RHS`.`wind_dir` AS `wind_dir`, `RHS`.`wind_speed` AS `wind_speed`, `RHS`.`wind_gust` AS `wind_gust`, `RHS`.`precip` AS `precip`, `RHS`.`pressure` AS `pressure`, `RHS`.`visib` AS `visib`
## FROM `flights` AS `LHS`
## RIGHT JOIN `weather` AS `RHS`
## ON (`LHS`.`year` = `RHS`.`year` AND `LHS`.`month` = `RHS`.`month` AND `LHS`.`day` = `RHS`.`day` AND `LHS`.`origin` = `RHS`.`origin` AND `LHS`.`hour` = `RHS`.`hour` AND `LHS`.`time_hour` = `RHS`.`time_hour`)
## ) `dbplyr_003`
## WHERE (NOT(((`arr_delay`) IS NULL)))
## GROUP BY `visib`
## ORDER BY `visib`
## 
## <PLAN>
## == Physical Plan ==
## *(4) Sort [visib#38 ASC NULLS FIRST], true, 0
## +- Exchange rangepartitioning(visib#38 ASC NULLS FIRST, 2)
##    +- *(3) HashAggregate(keys=[visib#38], functions=[avg(arr_delay#411)])
##       +- Exchange hashpartitioning(visib#38, 2)
##          +- *(2) HashAggregate(keys=[visib#38], functions=[partial_avg(arr_delay#411)])
##             +- *(2) Project [arr_delay#411, visib#38]
##                +- *(2) BroadcastHashJoin [year#403, month#404, day#405, origin#415, hour#419, time_hour#421], [year#26, month#27, day#28, origin#25, cast(hour#29 as double), time_hour#39], Inner, BuildRight
##                   :- *(2) Filter ((((((NOT isnull(arr_delay#411) && isnotnull(hour#419)) && isnotnull(year#403)) && isnotnull(origin#415)) && isnotnull(day#405)) && isnotnull(month#404)) && isnotnull(time_hour#421))
##                   :  +- InMemoryTableScan [year#403, month#404, day#405, arr_delay#411, origin#415, hour#419, time_hour#421], [NOT isnull(arr_delay#411), isnotnull(hour#419), isnotnull(year#403), isnotnull(origin#415), isnotnull(day#405), isnotnull(month#404), isnotnull(time_hour#421)]
##                   :        +- InMemoryRelation [id#402, year#403, month#404, day#405, dep_time#406, sched_dep_time#407, dep_delay#408, arr_time#409, sched_arr_time#410, arr_delay#411, carrier#412, flight#413, tailnum#414, origin#415, dest#416, air_time#417, distance#418, hour#419, minute#420, time_hour#421], StorageLevel(disk, memory, deserialized, 1 replicas)
##                   :              +- Scan ExistingRDD[id#402,year#403,month#404,day#405,dep_time#406,sched_dep_time#407,dep_delay#408,arr_time#409,sched_arr_time#410,arr_delay#411,carrier#412,flight#413,tailnum#414,origin#415,dest#416,air_time#417,distance#418,hour#419,minute#420,time_hour#421]
##                   +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, int, false], input[2, int, false], input[3, int, false], input[0, string, false], cast(input[4, int, false] as double), input[6, timestamp, false]))
##                      +- *(1) Filter (((((isnotnull(month#27) && isnotnull(day#28)) && isnotnull(hour#29)) && isnotnull(origin#25)) && isnotnull(year#26)) && isnotnull(time_hour#39))
##                         +- InMemoryTableScan [origin#25, year#26, month#27, day#28, hour#29, visib#38, time_hour#39], [isnotnull(month#27), isnotnull(day#28), isnotnull(hour#29), isnotnull(origin#25), isnotnull(year#26), isnotnull(time_hour#39)]
##                               +- InMemoryRelation [id#24, origin#25, year#26, month#27, day#28, hour#29, temp#30, dewp#31, humid#32, wind_dir#33, wind_speed#34, wind_gust#35, precip#36, pressure#37, visib#38, time_hour#39], StorageLevel(disk, memory, deserialized, 1 replicas)
##                                     +- Scan ExistingRDD[id#24,origin#25,year#26,month#27,day#28,hour#29,temp#30,dewp#31,humid#32,wind_dir#33,wind_speed#34,wind_gust#35,precip#36,pressure#37,visib#38,time_hour#39]

7.3 Using the functions with local versus remote datasets

7.3.1 Unified front-end, different back-ends

Some of the appeal of the dplyr syntax comes from the fact that we can use the same functions to conveniently manipulate local data frames in memory and, with the very same code, data from remote sources such as relational databases, data.tables and even data within Spark.

This unified front-end, however, comes with some important differences that we must be aware of when applying and porting code from using it to manipulate and compute on local data versus on remote sources. The same holds for remote Spark DataFrames that we are manipulating when using dplyr functions.

The following paragraphs will show a few examples of issues we can come across when porting local data handling to a remote source such as Spark.

7.3.3 Dates, times and time zones

Special care must also be taken when dealing with date/time values and their time zones:

## # A tibble: 26,115 x 2
##       id time_hour          
##    <int> <dttm>             
##  1     1 2013-01-01 01:00:00
##  2     2 2013-01-01 02:00:00
##  3     3 2013-01-01 03:00:00
##  4     4 2013-01-01 04:00:00
##  5     5 2013-01-01 05:00:00
##  6     6 2013-01-01 06:00:00
##  7     7 2013-01-01 07:00:00
##  8     8 2013-01-01 08:00:00
##  9     9 2013-01-01 09:00:00
## 10    10 2013-01-01 10:00:00
## # … with 26,105 more rows
## # Source: spark<?> [?? x 2]
##       id time_hour          
##    <int> <dttm>             
##  1     1 2013-01-01 06:00:00
##  2     2 2013-01-01 07:00:00
##  3     3 2013-01-01 08:00:00
##  4     4 2013-01-01 09:00:00
##  5     5 2013-01-01 10:00:00
##  6     6 2013-01-01 11:00:00
##  7     7 2013-01-01 12:00:00
##  8     8 2013-01-01 13:00:00
##  9     9 2013-01-01 14:00:00
## 10    10 2013-01-01 15:00:00
## # … with more rows

7.3.4 Joins

Another example of a different behavior is joining due to matching column values. Care must be taken to write conditions that are portable to ensure that the joins are consistent across data sources.

7.3.5 Portability of used methods

And, rather obviously, when using Hive built-in functions in the dplyr-based function, we will most likely not be able to execute it on the local data frames, as we have seen previously.

7.4 Conclusion, take-home messages

In this chapter, we have shown that we can take advantage of the performance of Spark while still writing arbitrary R functions by using dplyr syntax, which supports translation to Spark SQL using the dbplyr backend. We have also looked at some important differences when applying the same dplyr transformations to local and remote data sets.

With this approach, we can use R development best practices, testing, and documentation methods in a standard way when writing our R packages, getting the best of both worlds - Apache Spark for performance and R for convenient development of data science applications.

In the next chapter, we will look at writing R functions that will be using SQL directly, instead of relying on dbplyr for the translation, and how we can efficiently send them to the Spark instance for execution and optionally retrieve the results to our R session.