Chapter 7 Constructing functions by piping dplyr verbs
In the previous chapters, we looked at how the sparklyr interface communicates with the Spark instance and what this means for performance with regards to arbitrarily defined R functions. We also examined how Apache Arrow can increase the performance of data transfers between the R session and the Spark instance.
In this chapter, we will look at how to write R functions that can be executed directly by Spark without serialization overhead that we have shown in the previous installment. We will focus on writing functions as combinations of dplyr verbs that can be translated using dbplyr and investigate how the SQL is generated and Spark plans created.
7.1 R functions as combinations of dplyr verbs and Spark
One of the approaches to retain the performance of Spark with arbitrary R functionality is to carefully design our functions such that in its entirety when using it with sparklyr, the function call can be translated directly to Spark SQL using dbplyr.
This allows us to write, package, test, and document the functions as we normally would, while still getting the performance benefits of Apache Spark.
Let’s look at an example where we would like to do simple transformations of data stored in a column of a data frame, such as normalization of one of the columns. For illustration purposes, we will normalize the values of a column by first subtracting the mean value and then dividing the values by the standard deviation.
7.1.1 Trying it with base R functions
The first attempt could be quite simple, we could attempt to take advantage of R’s base function scale()
to do the work for us:
normalize_dplyr_scale <- function(df, col, newColName) {
df %>% mutate(!!newColName := scale({{col}}))
}
This function would work fine with a local data frame such as weather
:
## # A tibble: 26,115 x 3
## id temp normTemp[,1]
## <int> <dbl> <dbl>
## 1 1 39.0 -0.913
## 2 2 39.0 -0.913
## 3 3 39.0 -0.913
## 4 4 39.9 -0.862
## 5 5 39.0 -0.913
## 6 6 37.9 -0.974
## 7 7 39.0 -0.913
## 8 8 39.9 -0.862
## 9 9 39.9 -0.862
## 10 10 41 -0.802
## # … with 26,105 more rows
However for a Spark DataFrame this would throw an error. This is because the base R function scale()
is not translated by dbplyr at the moment and it is not a Hive built-in function either:
## Error: org.apache.spark.sql.AnalysisException: Undefined function: 'scale'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 21
## at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15$$anonfun$applyOrElse$49.apply(Analyzer.scala:1281)
## at org.apache.spark.sql.catalyst.analysis.Analyzer$LookupFunctions$$anonfun$apply$15$$anonfun$applyOrElse$49.apply(Analyzer.scala:1281)
## at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
...
7.1.2 Using a combination of supported dplyr verbs and operations
To run the function successfully, we will need to rewrite it as a combination of functions and operations that are supported by the dbplyr translation to Spark SQL. One example implementation is as follows:
normalize_dplyr <- function(df, col, newColName) {
df %>% mutate(
!!newColName := ({{col}} - mean({{col}}, na.rm = TRUE)) /
sd({{col}}, na.rm = TRUE)
)
}
Using this function yields the desired results for both local and Spark data frames:
## # A tibble: 26,115 x 3
## id temp normTemp
## <int> <dbl> <dbl>
## 1 1 39.0 -0.913
## 2 2 39.0 -0.913
## 3 3 39.0 -0.913
## 4 4 39.9 -0.862
## 5 5 39.0 -0.913
## 6 6 37.9 -0.974
## 7 7 39.0 -0.913
## 8 8 39.9 -0.862
## 9 9 39.9 -0.862
## 10 10 41 -0.802
## # … with 26,105 more rows
## Spark DataFrame
tbl_weather %>%
normalize_dplyr(temp, "normTemp") %>%
select(id, temp, normTemp) %>%
collect()
## # A tibble: 26,115 x 3
## id temp normTemp
## <int> <dbl> <dbl>
## 1 1 39.0 -0.913
## 2 2 39.0 -0.913
## 3 3 39.0 -0.913
## 4 4 39.9 -0.862
## 5 5 39.0 -0.913
## 6 6 37.9 -0.974
## 7 7 39.0 -0.913
## 8 8 39.9 -0.862
## 9 9 39.9 -0.862
## 10 10 41 -0.802
## # … with 26,105 more rows
7.1.3 Investigating the SQL translation and its Spark plan
Another advantage of this approach is that we can investigate the plan by which the actions will be executed by Spark using the explain()
function from the dplyr package. This will print both the SQL query constructed by dbplyr and the plan generated by Spark, which can help us investigate performance issues:
## <SQL>
## SELECT `id`, `origin`, `year`, `month`, `day`, `hour`, `temp`, `dewp`, `humid`, `wind_dir`, `wind_speed`, `wind_gust`, `precip`, `pressure`, `visib`, `time_hour`, (`temp` - AVG(`temp`) OVER ()) / stddev_samp(`temp`) OVER () AS `normTemp`
## FROM `weather`
##
## <PLAN>
## == Physical Plan ==
## *(1) Project [id#24, origin#25, year#26, month#27, day#28, hour#29, temp#30, dewp#31, humid#32, wind_dir#33, wind_speed#34, wind_gust#35, precip#36, pressure#37, visib#38, time_hour#39, ((temp#30 - _we0#1995) / _we1#1996) AS normTemp#1981]
## +- Window [avg(temp#30) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we0#1995, stddev_samp(temp#30) windowspecdefinition(specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS _we1#1996]
## +- Exchange SinglePartition
## +- InMemoryTableScan [id#24, origin#25, year#26, month#27, day#28, hour#29, temp#30, dewp#31, humid#32, wind_dir#33, wind_speed#34, wind_gust#35, precip#36, pressure#37, visib#38, time_hour#39]
## +- InMemoryRelation [id#24, origin#25, year#26, month#27, day#28, hour#29, temp#30, dewp#31, humid#32, wind_dir#33, wind_speed#34, wind_gust#35, precip#36, pressure#37, visib#38, time_hour#39], StorageLevel(disk, memory, deserialized, 1 replicas)
## +- Scan ExistingRDD[id#24,origin#25,year#26,month#27,day#28,hour#29,temp#30,dewp#31,humid#32,wind_dir#33,wind_speed#34,wind_gust#35,precip#36,pressure#37,visib#38,time_hour#39]
If we are only interested in the SQL itself as a character string, we can use dbplyr’s sql_render()
:
## [1] "SELECT `id`, `origin`, `year`, `month`, `day`, `hour`, `temp`, `dewp`, `humid`, `wind_dir`, `wind_speed`, `wind_gust`, `precip`, `pressure`, `visib`, `time_hour`, (`temp` - AVG(`temp`) OVER ()) / stddev_samp(`temp`) OVER () AS `normTemp`\nFROM `weather`"
7.2 A more complex use case - Joins, group bys, and aggregations
The dplyr syntax makes it very easy to construct more complex aggregations across multiple Spark DataFrames. An example of a function that joins 2 Spark DataFrames and computes a mean of a selected column, grouped by another column can look as follows:
joingrpagg_dplyr <- function(
df1, df2,
joinColNames = setdiff(intersect(colnames(df1), colnames(df2)), "id"),
col, groupCol
) {
df1 %>%
right_join(df2, by = joinColNames) %>%
filter(!is.na({{col}})) %>%
group_by({{groupCol}}) %>%
summarise(mean({{col}})) %>%
arrange({{groupCol}})
}
We can then use this function for instance to look at the mean arrival delay of flights grouped by visibility. Note that we are only collecting heavily aggregated data - 20 rows in total. The overhead of data transfer from the Spark instance to the R session is therefore small. Also, just assigning the function call to delay_by_visib
does not actually execute or collect anything, execution really starts only when collect()
is called:
delay_by_visib <- joingrpagg_dplyr(
tbl_flights, tbl_weather,
col = arr_delay, groupCol = visib
)
delay_by_visib %>% collect()
## Warning: Missing values are always removed in SQL.
## Use `mean(x, na.rm = TRUE)` to silence this warning
## This warning is displayed only once per session.
## # A tibble: 20 x 2
## visib `mean(arr_delay)`
## <dbl> <dbl>
## 1 0 24.9
## 2 0.06 28.5
## 3 0.12 45.4
## 4 0.25 20.8
## 5 0.5 39.8
## 6 0.75 41.4
## 7 1 37.6
## 8 1.25 65.1
## 9 1.5 34.7
## 10 1.75 45.6
## 11 2 26.3
## 12 2.5 21.7
## 13 3 21.7
## 14 4 17.7
## 15 5 18.9
## 16 6 17.3
## 17 7 16.4
## 18 8 16.1
## 19 9 15.6
## 20 10 4.32
We can look at the plan and the generated SQL query as well:
## <SQL>
## SELECT `visib`, AVG(`arr_delay`) AS `mean(arr_delay)`
## FROM (SELECT `LHS`.`id` AS `id.x`, `RHS`.`year` AS `year`, `RHS`.`month` AS `month`, `RHS`.`day` AS `day`, `LHS`.`dep_time` AS `dep_time`, `LHS`.`sched_dep_time` AS `sched_dep_time`, `LHS`.`dep_delay` AS `dep_delay`, `LHS`.`arr_time` AS `arr_time`, `LHS`.`sched_arr_time` AS `sched_arr_time`, `LHS`.`arr_delay` AS `arr_delay`, `LHS`.`carrier` AS `carrier`, `LHS`.`flight` AS `flight`, `LHS`.`tailnum` AS `tailnum`, `RHS`.`origin` AS `origin`, `LHS`.`dest` AS `dest`, `LHS`.`air_time` AS `air_time`, `LHS`.`distance` AS `distance`, `RHS`.`hour` AS `hour`, `LHS`.`minute` AS `minute`, `RHS`.`time_hour` AS `time_hour`, `RHS`.`id` AS `id.y`, `RHS`.`temp` AS `temp`, `RHS`.`dewp` AS `dewp`, `RHS`.`humid` AS `humid`, `RHS`.`wind_dir` AS `wind_dir`, `RHS`.`wind_speed` AS `wind_speed`, `RHS`.`wind_gust` AS `wind_gust`, `RHS`.`precip` AS `precip`, `RHS`.`pressure` AS `pressure`, `RHS`.`visib` AS `visib`
## FROM `flights` AS `LHS`
## RIGHT JOIN `weather` AS `RHS`
## ON (`LHS`.`year` = `RHS`.`year` AND `LHS`.`month` = `RHS`.`month` AND `LHS`.`day` = `RHS`.`day` AND `LHS`.`origin` = `RHS`.`origin` AND `LHS`.`hour` = `RHS`.`hour` AND `LHS`.`time_hour` = `RHS`.`time_hour`)
## ) `dbplyr_003`
## WHERE (NOT(((`arr_delay`) IS NULL)))
## GROUP BY `visib`
## ORDER BY `visib`
##
## <PLAN>
## == Physical Plan ==
## *(4) Sort [visib#38 ASC NULLS FIRST], true, 0
## +- Exchange rangepartitioning(visib#38 ASC NULLS FIRST, 2)
## +- *(3) HashAggregate(keys=[visib#38], functions=[avg(arr_delay#411)])
## +- Exchange hashpartitioning(visib#38, 2)
## +- *(2) HashAggregate(keys=[visib#38], functions=[partial_avg(arr_delay#411)])
## +- *(2) Project [arr_delay#411, visib#38]
## +- *(2) BroadcastHashJoin [year#403, month#404, day#405, origin#415, hour#419, time_hour#421], [year#26, month#27, day#28, origin#25, cast(hour#29 as double), time_hour#39], Inner, BuildRight
## :- *(2) Filter ((((((NOT isnull(arr_delay#411) && isnotnull(hour#419)) && isnotnull(year#403)) && isnotnull(origin#415)) && isnotnull(day#405)) && isnotnull(month#404)) && isnotnull(time_hour#421))
## : +- InMemoryTableScan [year#403, month#404, day#405, arr_delay#411, origin#415, hour#419, time_hour#421], [NOT isnull(arr_delay#411), isnotnull(hour#419), isnotnull(year#403), isnotnull(origin#415), isnotnull(day#405), isnotnull(month#404), isnotnull(time_hour#421)]
## : +- InMemoryRelation [id#402, year#403, month#404, day#405, dep_time#406, sched_dep_time#407, dep_delay#408, arr_time#409, sched_arr_time#410, arr_delay#411, carrier#412, flight#413, tailnum#414, origin#415, dest#416, air_time#417, distance#418, hour#419, minute#420, time_hour#421], StorageLevel(disk, memory, deserialized, 1 replicas)
## : +- Scan ExistingRDD[id#402,year#403,month#404,day#405,dep_time#406,sched_dep_time#407,dep_delay#408,arr_time#409,sched_arr_time#410,arr_delay#411,carrier#412,flight#413,tailnum#414,origin#415,dest#416,air_time#417,distance#418,hour#419,minute#420,time_hour#421]
## +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, int, false], input[2, int, false], input[3, int, false], input[0, string, false], cast(input[4, int, false] as double), input[6, timestamp, false]))
## +- *(1) Filter (((((isnotnull(month#27) && isnotnull(day#28)) && isnotnull(hour#29)) && isnotnull(origin#25)) && isnotnull(year#26)) && isnotnull(time_hour#39))
## +- InMemoryTableScan [origin#25, year#26, month#27, day#28, hour#29, visib#38, time_hour#39], [isnotnull(month#27), isnotnull(day#28), isnotnull(hour#29), isnotnull(origin#25), isnotnull(year#26), isnotnull(time_hour#39)]
## +- InMemoryRelation [id#24, origin#25, year#26, month#27, day#28, hour#29, temp#30, dewp#31, humid#32, wind_dir#33, wind_speed#34, wind_gust#35, precip#36, pressure#37, visib#38, time_hour#39], StorageLevel(disk, memory, deserialized, 1 replicas)
## +- Scan ExistingRDD[id#24,origin#25,year#26,month#27,day#28,hour#29,temp#30,dewp#31,humid#32,wind_dir#33,wind_speed#34,wind_gust#35,precip#36,pressure#37,visib#38,time_hour#39]
7.3 Using the functions with local versus remote datasets
7.3.1 Unified front-end, different back-ends
Some of the appeal of the dplyr syntax comes from the fact that we can use the same functions to conveniently manipulate local data frames in memory and, with the very same code, data from remote sources such as relational databases, data.tables and even data within Spark.
This unified front-end, however, comes with some important differences that we must be aware of when applying and porting code from using it to manipulate and compute on local data versus on remote sources. The same holds for remote Spark DataFrames that we are manipulating when using dplyr functions.
The following paragraphs will show a few examples of issues we can come across when porting local data handling to a remote source such as Spark.
7.3.2 Differences in NA
and NaN
values
Another example of differences can arise from handling NA
and NaN
values:
bycols <- c("year", "month", "day", "origin", "hour")
# Create left joins
joined_spark <- tbl_flights %>%
left_join(tbl_weather, by = bycols) %>%
collect()
joined_local <- flights %>%
left_join(weather, by = bycols)
# Look at counts of NaN values
joined_local %>% filter(is.nan(temp)) %>% count()
## # A tibble: 1 x 1
## n
## <int>
## 1 0
## # A tibble: 1 x 1
## n
## <int>
## 1 1573
7.3.3 Dates, times and time zones
Special care must also be taken when dealing with date/time values and their time zones:
## # A tibble: 26,115 x 2
## id time_hour
## <int> <dttm>
## 1 1 2013-01-01 01:00:00
## 2 2 2013-01-01 02:00:00
## 3 3 2013-01-01 03:00:00
## 4 4 2013-01-01 04:00:00
## 5 5 2013-01-01 05:00:00
## 6 6 2013-01-01 06:00:00
## 7 7 2013-01-01 07:00:00
## 8 8 2013-01-01 08:00:00
## 9 9 2013-01-01 09:00:00
## 10 10 2013-01-01 10:00:00
## # … with 26,105 more rows
## # Source: spark<?> [?? x 2]
## id time_hour
## <int> <dttm>
## 1 1 2013-01-01 06:00:00
## 2 2 2013-01-01 07:00:00
## 3 3 2013-01-01 08:00:00
## 4 4 2013-01-01 09:00:00
## 5 5 2013-01-01 10:00:00
## 6 6 2013-01-01 11:00:00
## 7 7 2013-01-01 12:00:00
## 8 8 2013-01-01 13:00:00
## 9 9 2013-01-01 14:00:00
## 10 10 2013-01-01 15:00:00
## # … with more rows
7.3.4 Joins
Another example of a different behavior is joining due to matching column values. Care must be taken to write conditions that are portable to ensure that the joins are consistent across data sources.
7.3.5 Portability of used methods
And, rather obviously, when using Hive built-in functions in the dplyr-based function, we will most likely not be able to execute it on the local data frames, as we have seen previously.
7.4 Conclusion, take-home messages
In this chapter, we have shown that we can take advantage of the performance of Spark while still writing arbitrary R functions by using dplyr syntax, which supports translation to Spark SQL using the dbplyr backend. We have also looked at some important differences when applying the same dplyr transformations to local and remote data sets.
With this approach, we can use R development best practices, testing, and documentation methods in a standard way when writing our R packages, getting the best of both worlds - Apache Spark for performance and R for convenient development of data science applications.
In the next chapter, we will look at writing R functions that will be using SQL directly, instead of relying on dbplyr for the translation, and how we can efficiently send them to the Spark instance for execution and optionally retrieve the results to our R session.