## ----setup, include = FALSE---------------------------------------------------
knitr::opts_chunk$set(
  collapse = TRUE,
  comment = "#>"
)

## ----eval = FALSE-------------------------------------------------------------
# # SQLite
# con <- DBI::dbConnect(RSQLite::SQLite(), ":memory:")
# 
# # PostgreSQL
# con <- DBI::dbConnect(
#   RPostgres::Postgres(),
#   host = "localhost",
#   dbname = "analytics",
#   user = "analyst",
#   password = "secret"
# )
# 
# # MySQL or MariaDB
# con <- DBI::dbConnect(
#   RMariaDB::MariaDB(),
#   host = "localhost",
#   dbname = "analytics",
#   user = "analyst",
#   password = "secret"
# )

## -----------------------------------------------------------------------------
library(DBI)
library(RSQLite)
library(featdelta)

con <- dbConnect(SQLite(), ":memory:")

orders <- data.frame(
  order_id = 1:7,
  customer_id = c(101, 102, 103, 101, 104, 105, 102),
  gross_amount = c(120, 250, 80, 310, 45, 520, 160),
  discount_amount = c(0, 25, 5, 30, 0, 60, 10),
  shipping_fee = c(8, 0, 6, 0, 5, 0, 7),
  order_to_ship_days = c(1, 3, 2, 5, 1, 4, 2),
  stringsAsFactors = FALSE
)

day_one <- 1:4
day_two <- 5:7

dbWriteTable(
  con,
  "raw_orders",
  orders[day_one, ],
  overwrite = TRUE
)

dbGetQuery(con, "SELECT * FROM raw_orders ORDER BY order_id")

## -----------------------------------------------------------------------------
source_sql <- "
  SELECT
    order_id,
    customer_id,
    gross_amount,
    discount_amount,
    shipping_fee,
    order_to_ship_days
  FROM raw_orders
  ORDER BY order_id
"

key <- "order_id"

dbGetQuery(con, source_sql)

## -----------------------------------------------------------------------------
defs <- fd_define(
  net_revenue = gross_amount - discount_amount + shipping_fee,
  discount_rate = discount_amount / gross_amount,
  free_shipping = shipping_fee == 0,
  slow_fulfillment = order_to_ship_days > 3
)

defs

## -----------------------------------------------------------------------------
run_day_one <- fd_run(
  con = con,
  sql = source_sql,
  defs = defs,
  key = key,
  feat_table_name = "order_features",
  verbose = FALSE
)

dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")

## -----------------------------------------------------------------------------
dbAppendTable(con, "raw_orders", orders[day_two, ])

dbGetQuery(con, "SELECT * FROM raw_orders ORDER BY order_id")

## -----------------------------------------------------------------------------
dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")

## -----------------------------------------------------------------------------
run_day_two <- fd_run(
  con = con,
  sql = source_sql,
  defs = defs,
  key = key,
  feat_table_name = "order_features",
  verbose = FALSE
)

dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")

## -----------------------------------------------------------------------------
new_order <- data.frame(
  order_id = 8,
  customer_id = 106,
  gross_amount = 275,
  discount_amount = 20,
  shipping_fee = 0,
  order_to_ship_days = 6
)

dbAppendTable(con, "raw_orders", new_order)

new_rows <- fd_fetch(
  con = con,
  sql = source_sql,
  key = key,
  feat_table_name = "order_features"
)

new_rows

## -----------------------------------------------------------------------------
attr(new_rows, "fd_fetch")

## -----------------------------------------------------------------------------
new_features <- fd_compute(
  data = new_rows,
  defs = defs,
  key = key
)

upsert_report <- fd_upsert(
  con = con,
  features_df = new_features,
  feat_table_name = "order_features",
  key = key,
  verbose = FALSE
)

upsert_report

dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")

## -----------------------------------------------------------------------------
defs_v2 <- fd_define(
  net_revenue = gross_amount - discount_amount + shipping_fee,
  discount_rate = discount_amount / gross_amount,
  free_shipping = shipping_fee == 0,
  slow_fulfillment = order_to_ship_days > 3,
  high_value_order = gross_amount >= 250
)

refresh_report <- fd_run(
  con = con,
  sql = source_sql,
  defs = defs_v2,
  key = key,
  feat_table_name = "order_features",
  fetch_mode = "all",
  verbose = FALSE
)

dbListFields(con, "order_features")

dbGetQuery(con, "SELECT * FROM order_features ORDER BY order_id")

## ----eval = FALSE-------------------------------------------------------------
# # Error if the table does not already exist
# fd_run(
#   con = con,
#   sql = source_sql,
#   defs = defs,
#   key = key,
#   feat_table_name = "order_features",
#   create_table = FALSE
# )
# 
# # Error if new feature columns are missing from the existing table
# fd_run(
#   con = con,
#   sql = source_sql,
#   defs = defs_v2,
#   key = key,
#   feat_table_name = "order_features",
#   alter_table = FALSE
# )

## ----eval = FALSE-------------------------------------------------------------
# fd_run(
#   con = con,
#   sql = source_sql,
#   defs = defs,
#   key = key,
#   feat_table_name = "order_features",
#   fetch_mode = "all",
#   update_table = FALSE
# )

## ----eval = FALSE-------------------------------------------------------------
# fd_run(
#   con = con,
#   sql = source_sql,
#   defs = defs,
#   key = key,
#   feat_table_name = "order_features",
#   chunk_size = 5000
# )

## ----eval = FALSE-------------------------------------------------------------
# fd_run(
#   con = con,
#   sql = "SELECT * FROM raw_schema.orders",
#   defs = defs,
#   key = "order_id",
#   feat_table_name = "feature_schema.order_features",
#   dialect = "postgres"
# )
# 
# fd_run(
#   con = con,
#   sql = "SELECT * FROM raw_schema.orders",
#   defs = defs,
#   key = "order_id",
#   feat_table_name = "feature_schema.order_features",
#   dialect = "mysql"
# )

## -----------------------------------------------------------------------------
names(refresh_report)

refresh_report$fetch

refresh_report$compute$feature_names

refresh_report$upsert$counts

refresh_report$upsert$columns_added

## ----cleanup, include = FALSE-------------------------------------------------
dbDisconnect(con)

