AWS Athena Unload

Dyfan Jones

Writes query results from a SELECT statement to the specified data format. Supported formats for UNLOAD include Apache Parquet, ORC, Apache Avro, and JSON. CSV is the only output format used by the Athena SELECT query, but you can use UNLOAD to write the output of a SELECT query to the formats that UNLOAD supports.

Although you can use the CTAS statement to output data in formats other than CSV, those statements also require the creation of a table in Athena. The UNLOAD statement is useful when you want to output the results of a SELECT query in a non-CSV format but do not require the associated table. For example, a downstream application might require the results of a SELECT query to be in JSON format, and Parquet or ORC might provide a performance advantage over CSV if you intend to use the results of the SELECT query for additional analysis.

(https://docs.aws.amazon.com/athena/latest/ug/unload.html)

noctua v-2.2.0.9000+ can now leverage this functionality with the unload parameter within dbGetQuery, dbSendQuery, dbExecute. This functionality offers faster performance for mid to large result sizes.

Pros and Cons

unload=FALSE (Default)

Regular query on AWS Athena and then reads the table data as CSV directly from AWS S3.

PROS:

CONS:

unload=TRUE

Wraps the query with a UNLOAD and then reads the table data as parquet directly from AWS S3.

PROS:

CONS:

Performance comparison:

Set up AWS Athena table (example taken from AWS Data Wrangler: Amazon Athena Tutorial):

# Python
import awswrangler as wr

import getpass
bucket = getpass.getpass()
path = f"s3://{bucket}/data/"

if "awswrangler_test" not in wr.catalog.databases().values:
    wr.catalog.create_database("awswrangler_test")

cols = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]

df = wr.s3.read_csv(
    path="s3://noaa-ghcn-pds/csv/189",
    names=cols,
    parse_dates=["dt", "obs_time"])  # Read 10 files from the 1890 decade (~1GB)

wr.s3.to_parquet(
    df=df,
    path=path,
    dataset=True,
    mode="overwrite",
    database="awswrangler_test",
    table="noaa"
);

wr.catalog.table(database="awswrangler_test", table="noaa")

Benchmark unload method using noctua.

# R
library(DBI)

con <- dbConnect(noctua::athena())

dbGetQuery(con, "select count(*) as n from awswrangler_test.noaa")
# Info: (Data scanned: 0 Bytes)
#           n
# 1: 29554197

# Query ran using CSV output
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa")
})
# Info: (Data scanned: 80.88 MB)
#    user  system elapsed
#  57.004   8.430 160.567 

dim(df)
# [1] 29554197        8

noctua::noctua_options(cache_size = 1)

# Query ran using UNLOAD Parquet output
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})
# Info: (Data scanned: 80.88 MB)
#    user  system elapsed 
#  21.622   2.350  39.232 

dim(df)
# [1] 29554197        8

# Query ran using cached UNLOAD Parquet output
system.time({
  df = dbGetQuery(con, "SELECT * FROM awswrangler_test.noaa", unload = T)
})
# Info: (Data scanned: 80.88 MB)
#    user  system elapsed 
#  13.738   1.886  11.029 

dim(df)
# [1] 29554197        8
Method Time (seconds)
unload=FAlSE 160.567
unload=TRUE 39.232
Cache unload=TRUE 11.029

From this simple benchmark test there is a significant improvement in the performance when querying AWS Athena while unload=TRUE.

Note: Benchmark ran on AWS Sagemaker ml.t3.xlarge instance.

Set unload = TRUE on package level:

Another method to set unload=TRUE is to use noctua_options(). By setting noctua_options(unload=TRUE), unload is set to TRUE package level and all DBI functionality will use it when applicable.

library(DBI)
library(noctua)

con <- dbConnect(athena())

noctua_options(unload = TRUE)

dbi_noaa = dbGetQuery(con, "select * from awswrangler_test.noaa")

This also give benefits for when using dplyr functionality. When setting noctua_options(unload=TRUE) all dplyr lazy evaluation will start using AWS Athena unload.

tbl_noaa = tbl(con, dbplyr::in_schema("awswrangler_test", "noaa"))

tbl_noaa %>% collect()

#> # A tibble: 29,554,197 × 8
#>    id          dt                  element value m_flag q_flag s_flag obs_time
#>    <chr>       <dttm>              <chr>   <int> <chr>  <chr>  <chr>  <chr>   
#>  1 ASN00074198 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#>  2 ASN00074222 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#>  3 ASN00074227 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#>  4 ASN00075001 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#>  5 ASN00075005 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#>  6 ASN00075006 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#>  7 ASN00075011 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#>  8 ASN00075013 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#>  9 ASN00075014 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#> 10 ASN00075018 1890-01-05 00:00:00 PRCP        0 NA     NA     a      NA      
#> # … with 29,554,187 more rows

noaa %>% filter(element == "PRCP") %>% collect()
#> # A tibble: 15,081,580 × 8
#>    id          dt                  element value m_flag q_flag s_flag obs_time
#>    <chr>       <dttm>              <chr>   <int> <chr>  <chr>  <chr>  <chr>   
#>  1 SWE00140492 1890-01-06 00:00:00 PRCP        0 NA     NA     E      NA      
#>  2 SWE00140594 1890-01-06 00:00:00 PRCP        4 NA     NA     E      NA      
#>  3 SWE00140746 1890-01-06 00:00:00 PRCP        0 NA     NA     E      NA      
#>  4 SWE00140828 1890-01-06 00:00:00 PRCP        0 NA     NA     E      NA      
#>  5 SWM00002080 1890-01-06 00:00:00 PRCP        0 NA     NA     E      NA      
#>  6 SWM00002485 1890-01-06 00:00:00 PRCP        1 NA     NA     E      NA      
#>  7 SWM00002584 1890-01-06 00:00:00 PRCP        0 NA     NA     E      NA      
#>  8 TSE00147769 1890-01-06 00:00:00 PRCP       33 NA     NA     E      NA      
#>  9 TSE00147775 1890-01-06 00:00:00 PRCP      150 NA     NA     E      NA      
#> 10 UK000047811 1890-01-06 00:00:00 PRCP       49 NA     NA     E      NA      
# … with 15,081,570 more rows