Chapter 7 Using R to construct SQL queries and let Spark execute them

I feel honoured to be the second to whom you turn. What have I done to deserve to be so high on your list?

Yennefer of Vengerberg

In the previous chapter of this series, we looked at writing R functions that can be executed directly by Spark without serialization overhead with a focus on writing functions as combinations of dplyr verbs and investigated how the SQL is generated and Spark plans created.

In this chapter, we will look at how to write R functions that generate SQL queries that can be executed by Spark, how to execute them with DBI and how to achieve lazy SQL statements that only get executed when needed. We also briefly present wrapping these approaches into functions that can be combined with other Spark operations.

7.1 R functions as Spark SQL generators

There are use cases where it is desirable to express the operations directly with SQL instead of combining dplyr verbs, for example when working within multi-language environments where re-usability is important. We can then send the SQL query directly to Spark to be executed. To create such queries, one option is to write R functions that work as query constructors.

Again using a very simple example, a naive implementation of column normalization could look as follows. Note that the use of SELECT * is discouraged and only here for illustration purposes:

Using the weather dataset would then yield the following SQL query when normalizing the temp column:

## SELECT
##   weather.*,
##   (temp - (SELECT avg(temp) FROM weather)) / (SELECT stddev_samp(temp) FROM weather) as normTemp
## FROM weather

Now that we have the query created, we can look at how to send it to Spark for execution.

7.2 Executing the generated queries via Spark

7.2.1 Using DBI as the interface

The R package DBI provides an interface for communication between R and relational database management systems. We can simply use the dbGetQuery() function to execute our query, for instance:

##   id origin year month day hour  temp  dewp humid wind_dir wind_speed wind_gust
## 1  1    EWR 2013     1   1    1 39.02 26.06 59.37      270   10.35702       NaN
## 2  2    EWR 2013     1   1    2 39.02 26.96 61.63      250    8.05546       NaN
## 3  3    EWR 2013     1   1    3 39.02 28.04 64.43      240   11.50780       NaN
## 4  4    EWR 2013     1   1    4 39.92 28.04 62.21      250   12.65858       NaN
## 5  5    EWR 2013     1   1    5 39.02 28.04 64.43      260   12.65858       NaN
## 6  6    EWR 2013     1   1    6 37.94 28.04 67.21      240   11.50780       NaN
##   precip pressure visib           time_hour   normTemp
## 1      0   1012.0    10 2013-01-01 06:00:00 -0.9130047
## 2      0   1012.3    10 2013-01-01 07:00:00 -0.9130047
## 3      0   1012.5    10 2013-01-01 08:00:00 -0.9130047
## 4      0   1012.2    10 2013-01-01 09:00:00 -0.8624083
## 5      0   1011.9    10 2013-01-01 10:00:00 -0.9130047
## 6      0   1012.4    10 2013-01-01 11:00:00 -0.9737203

As we might have noticed thanks to the way the result is printed, a standard data frame is returned, as opposed to tibbles returned by most sparklyr operations.

It is important to note that using dbGetQuery() automatically computes and collects the results to the R session. This is in contrast with the dplyr approach which constructs the query and only collects the results to the R session when collect() is called, or computes them when compute() is called.

We will now examine 2 options to use the prepared query lazily and without collecting the results into the R session.

7.2.2 Invoking sql on a Spark session object

Without going into further details on the invoke() functionality of sparklyr which we will focus on in the fourth installment of the series, if the desire is to have a “lazy” SQL that does not get automatically computed and collected when called from R, we can invoke a sql method on a SparkSession class object.

The method takes a string SQL query as input and processes it using Spark, returning the result as a Spark DataFrame. This gives us the ability to only compute and collect the results when desired:

## <jobj[313]>
##   org.apache.spark.sql.Dataset
##   [id: int, origin: string ... 15 more fields]
## # A tibble: 26,115 x 17
##       id origin  year month   day  hour  temp  dewp humid wind_dir wind_speed
##    <int> <chr>  <int> <int> <int> <int> <dbl> <dbl> <dbl>    <dbl>      <dbl>
##  1     1 EWR     2013     1     1     1  39.0  26.1  59.4      270      10.4 
##  2     2 EWR     2013     1     1     2  39.0  27.0  61.6      250       8.06
##  3     3 EWR     2013     1     1     3  39.0  28.0  64.4      240      11.5 
##  4     4 EWR     2013     1     1     4  39.9  28.0  62.2      250      12.7 
##  5     5 EWR     2013     1     1     5  39.0  28.0  64.4      260      12.7 
##  6     6 EWR     2013     1     1     6  37.9  28.0  67.2      240      11.5 
##  7     7 EWR     2013     1     1     7  39.0  28.0  64.4      240      15.0 
##  8     8 EWR     2013     1     1     8  39.9  28.0  62.2      250      10.4 
##  9     9 EWR     2013     1     1     9  39.9  28.0  62.2      260      15.0 
## 10    10 EWR     2013     1     1    10  41    28.0  59.6      260      13.8 
## # … with 26,105 more rows, and 6 more variables: wind_gust <dbl>, precip <dbl>,
## #   pressure <dbl>, visib <dbl>, time_hour <dttm>, normTemp <dbl>

7.2.3 Using tbl with dbplyr’s sql

The above method gives us a reference to a Java object as a result, which might be less intuitive to work with for R users. We can also opt to use dbplyr’s sql() function in combination with tbl() to get a more familiar result.

Note that when printing the below normalized_lazy_tbl, the query gets partially executed to provide the first few rows. Only when collect() is called the entire set is retrieved to the R session:

## # Source: spark<SELECT weather.*, (temp - (SELECT avg(temp) FROM weather)) /
## #   (SELECT stddev_samp(temp) FROM weather) as normTemp FROM weather> [?? x 17]
##       id origin  year month   day  hour  temp  dewp humid wind_dir wind_speed
##    <int> <chr>  <int> <int> <int> <int> <dbl> <dbl> <dbl>    <dbl>      <dbl>
##  1     1 EWR     2013     1     1     1  39.0  26.1  59.4      270      10.4 
##  2     2 EWR     2013     1     1     2  39.0  27.0  61.6      250       8.06
##  3     3 EWR     2013     1     1     3  39.0  28.0  64.4      240      11.5 
##  4     4 EWR     2013     1     1     4  39.9  28.0  62.2      250      12.7 
##  5     5 EWR     2013     1     1     5  39.0  28.0  64.4      260      12.7 
##  6     6 EWR     2013     1     1     6  37.9  28.0  67.2      240      11.5 
##  7     7 EWR     2013     1     1     7  39.0  28.0  64.4      240      15.0 
##  8     8 EWR     2013     1     1     8  39.9  28.0  62.2      250      10.4 
##  9     9 EWR     2013     1     1     9  39.9  28.0  62.2      260      15.0 
## 10    10 EWR     2013     1     1    10  41    28.0  59.6      260      13.8 
## # … with more rows, and 6 more variables: wind_gust <dbl>, precip <dbl>,
## #   pressure <dbl>, visib <dbl>, time_hour <dttm>, normTemp <dbl>
## # A tibble: 26,115 x 17
##       id origin  year month   day  hour  temp  dewp humid wind_dir wind_speed
##    <int> <chr>  <int> <int> <int> <int> <dbl> <dbl> <dbl>    <dbl>      <dbl>
##  1     1 EWR     2013     1     1     1  39.0  26.1  59.4      270      10.4 
##  2     2 EWR     2013     1     1     2  39.0  27.0  61.6      250       8.06
##  3     3 EWR     2013     1     1     3  39.0  28.0  64.4      240      11.5 
##  4     4 EWR     2013     1     1     4  39.9  28.0  62.2      250      12.7 
##  5     5 EWR     2013     1     1     5  39.0  28.0  64.4      260      12.7 
##  6     6 EWR     2013     1     1     6  37.9  28.0  67.2      240      11.5 
##  7     7 EWR     2013     1     1     7  39.0  28.0  64.4      240      15.0 
##  8     8 EWR     2013     1     1     8  39.9  28.0  62.2      250      10.4 
##  9     9 EWR     2013     1     1     9  39.9  28.0  62.2      260      15.0 
## 10    10 EWR     2013     1     1    10  41    28.0  59.6      260      13.8 
## # … with 26,105 more rows, and 6 more variables: wind_gust <dbl>, precip <dbl>,
## #   pressure <dbl>, visib <dbl>, time_hour <dttm>, normTemp <dbl>

7.2.4 Wrapping the tbl approach into functions

In the approach above we provided sc in the call to tbl(). When wrapping such processes into a function, it might however be useful to take the specific DataFrame reference as an input instead of the generic Spark connection reference.

In that case, we can use the fact that the connection reference is also stored in the DataFrame reference, in the con sub-element of the src element. For instance, looking at our tbl_weather:

## [1] "spark_connection"       "spark_shell_connection" "DBIConnection"

Putting this together, we can create a simple wrapper function that lazily sends a SQL query to be processed on a particular Spark DataFrame reference:

And use it to do the same as we did above with a single function call:

## # A tibble: 26,115 x 17
##       id origin  year month   day  hour  temp  dewp humid wind_dir wind_speed
##    <int> <chr>  <int> <int> <int> <int> <dbl> <dbl> <dbl>    <dbl>      <dbl>
##  1     1 EWR     2013     1     1     1  39.0  26.1  59.4      270      10.4 
##  2     2 EWR     2013     1     1     2  39.0  27.0  61.6      250       8.06
##  3     3 EWR     2013     1     1     3  39.0  28.0  64.4      240      11.5 
##  4     4 EWR     2013     1     1     4  39.9  28.0  62.2      250      12.7 
##  5     5 EWR     2013     1     1     5  39.0  28.0  64.4      260      12.7 
##  6     6 EWR     2013     1     1     6  37.9  28.0  67.2      240      11.5 
##  7     7 EWR     2013     1     1     7  39.0  28.0  64.4      240      15.0 
##  8     8 EWR     2013     1     1     8  39.9  28.0  62.2      250      10.4 
##  9     9 EWR     2013     1     1     9  39.9  28.0  62.2      260      15.0 
## 10    10 EWR     2013     1     1    10  41    28.0  59.6      260      13.8 
## # … with 26,105 more rows, and 6 more variables: wind_gust <dbl>, precip <dbl>,
## #   pressure <dbl>, visib <dbl>, time_hour <dttm>, normTemp <dbl>

7.3 Combining multiple approaches and functions into lazy datasets

The power of Spark partly comes from the lazy execution and we can take advantage of this in ways that are not immediately obvious. Consider the following function we have shown previously:

## function(tbl, qry) {
##   qry %>%
##     dbplyr::sql() %>%
##     dplyr::tbl(tbl[["src"]][["con"]], .)
## }

Since the output of this function without collection is actually only a translated SQL statement, we can take that output and keep combinining it with other operations, for instance:

## # A tibble: 3 x 2
##   origin `mean(dep_delay_norm)`
##   <chr>                   <dbl>
## 1 JFK                   -0.0131
## 2 LGA                   -0.0570
## 3 EWR                    0.0614

The crucial advantage is that even though the lazy_spark_query would return the entire updated weather dataset when collected stand-alone, in combination with other operations Spark first figures out how to execute all the operations together efficiently and only then physically executes them and returns only the grouped and aggregated data to the R session.

We can therefore effectively combine multiple approaches to interfacing with Spark while still keeping the benefit of retrieving only very small, aggregated amounts of data to the R session. The effect is quite significant even with a dataset as small as flights (336,776 rows of 19 columns) and with a local Spark instance. The chart below compares executing a query lazily, aggregating within Spark and only retrieving the aggregated data, versus retrieving first and aggregating locally. The third boxplot shows the cost of pure collection on the query itself:

7.4 Where SQL can be better than dbplyr translation

7.4.1 When a translation is not there

We have discussed in the first part that the set of operations translated to Spark SQL via dbplyr may not cover all possible use cases. In such a case, the option to write SQL directly is very useful.

7.4.2 When translation does not provide expected results

In some instances using dbplyr to translate R operations to Spark SQL can lead to unexpected results. As one example, consider the following integer division on a column of a local data frame.

## # A tibble: 26,115 x 2
##       id id_div_5
##    <int>    <int>
##  1     1        0
##  2     2        0
##  3     3        0
##  4     4        0
##  5     5        1
##  6     6        1
##  7     7        1
##  8     8        1
##  9     9        1
## 10    10        2
## # … with 26,105 more rows

As expected, we get the result of integer division in the id_div_5 column. However, applying the very same operation on a Spark DataFrame yields unexpected results:

## # Source: spark<?> [?? x 2]
##       id id_div_5
##    <int>    <dbl>
##  1     1      0.2
##  2     2      0.4
##  3     3      0.6
##  4     4      0.8
##  5     5      1  
##  6     6      1.2
##  7     7      1.4
##  8     8      1.6
##  9     9      1.8
## 10    10      2  
## # … with more rows

This is due to the fact that translation to integer division is quite difficult to implement: https://github.com/tidyverse/dbplyr/issues/108. We could certainly figure our a way to fix this particular issue, but the workarounds may prove inefficient:

## # Source: spark<?> [?? x 2]
##       id id_div_5
##    <int>    <int>
##  1     1        0
##  2     2        0
##  3     3        0
##  4     4        0
##  5     5        1
##  6     6        1
##  7     7        1
##  8     8        1
##  9     9        1
## 10    10        2
## # … with more rows
## <SQL>
## SELECT `id`, CAST(`id` / 5 AS INT) AS `id_div_5`
## FROM `weather`
## 
## <PLAN>
## == Physical Plan ==
## *(1) Project [id#24, cast((cast(id#24 as double) / 5.0) as int) AS id_div_5#6127]
## +- InMemoryTableScan [id#24]
##       +- InMemoryRelation [id#24, origin#25, year#26, month#27, day#28, hour#29, temp#30, dewp#31, humid#32, wind_dir#33, wind_speed#34, wind_gust#35, precip#36, pressure#37, visib#38, time_hour#39], StorageLevel(disk, memory, deserialized, 1 replicas)
##             +- Scan ExistingRDD[id#24,origin#25,year#26,month#27,day#28,hour#29,temp#30,dewp#31,humid#32,wind_dir#33,wind_speed#34,wind_gust#35,precip#36,pressure#37,visib#38,time_hour#39]

Using SQL and the knowledge that Hive does provide a built-in DIV arithmetic operator, we can get the desired results very simply and efficiently with writing SQL:

## # Source: spark<SELECT `id`, `id` DIV 5 `id_div_5` FROM `weather`> [?? x 2]
##       id id_div_5
##    <int>    <dbl>
##  1     1        0
##  2     2        0
##  3     3        0
##  4     4        0
##  5     5        1
##  6     6        1
##  7     7        1
##  8     8        1
##  9     9        1
## 10    10        2
## # … with more rows

Even though the numeric value of the results is correct here, we may still notice that the class of the returned id_div_5 column is actually numeric instead of integer. Such is the life of developers using data processing interfaces.

7.4.3 When portability is important

Since the languages that provide interfaces to Spark are not limited to R and multi-language setups are quite common, another reason to use SQL statements directly is the portability of such solutions. A SQL statement can be executed by interfaces provided for all languages - Scala, Java, and Python, without the need to rely on R-specific packages such as dbplyr.