Chapter 11 Combining approaches into lazy datasets
The power of Spark partly comes from the lazy execution and we can take advantage of this in ways that are not immediately obvious. Consider the following function we have shown previously:
## function(tbl, qry) {
## qry %>%
## dbplyr::sql() %>%
## dplyr::tbl(tbl[["src"]][["con"]], .)
## }
Since the output of this function without collection is actually only a translated SQL statement, we can take that output and keep combinining it with other operations, for instance:
qry <- normalize_sql("flights", "dep_delay", "dep_delay_norm")
lazy_spark_query(tbl_flights, qry) %>%
group_by(origin) %>%
summarise(mean(dep_delay_norm)) %>%
collect()
## Invoking sql
## Invoking schema
## Invoking fieldNames
## Invoking sql
## Invoking isStreaming
## Invoking sql
## Invoking sparklyr.Utils collect
## Invoking columns
## Invoking schema
## Invoking fields
## Invoking dataType
## Invoking toString
## Invoking name
## Invoking dataType
## Invoking toString
## Invoking name
## # A tibble: 3 x 2
## origin `mean(dep_delay_norm)`
## <chr> <dbl>
## 1 EWR 0.0614
## 2 JFK -0.0131
## 3 LGA -0.0570
The crucial advantage is that even though the lazy_spark_query
would return the entire updated weather dataset when collected stand-alone, in combination with other operations Spark first figures out how to execute all the operations together efficiently and only then physically executes them and returns only the grouped and aggregated data to the R session.
We can therefore effectively combine multiple approaches to interfacing with Spark while still keeping the benefit of retrieving only very small, aggregated amounts of data to the R session. The effect is quite significant even with a dataset as small as flights
(336,776 rows of 19 columns) and with a local Spark instance. The chart below compares executing a query lazily, aggregating within Spark and only retrieving the aggregated data, versus retrieving first and aggregating locally. The third boxplot shows the cost of pure collection on the query itself:
microbenchmark::microbenchmark(
times = 20,
collect_late = lazy_spark_query(tbl_flights, qry) %>%
group_by(origin) %>%
summarise(mean(dep_delay_norm)) %>%
collect(),
collect_first = lazy_spark_query(tbl_flights, qry) %>%
collect() %>%
group_by(origin) %>%
summarise(mean(dep_delay_norm)),
collect_only = lazy_spark_query(tbl_flights, qry) %>%
collect()
)