--- title: "Production patterns" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Production patterns} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{r setup, include = FALSE} knitr::opts_chunk$set( collapse = TRUE, comment = "#>" ) ``` ## Purpose of this workshop The earlier vignettes introduced the main workflow: 1. define feature logic in R; 2. compute features from raw rows; 3. store those features in a database table; 4. rerun the pipeline as new raw rows arrive. This vignette focuses on patterns that become important once the workflow is used repeatedly. In practice, a production feature pipeline needs more than a single successful run. You need a way to preview changes, inspect failures, refresh old rows when definitions change, control schema evolution, and monitor what the pipeline did. `featdelta` is not a scheduler, an orchestration platform, or a replacement for database permissions. It is the R feature-engineering component inside that larger workflow. This workshop shows how to use the package in a way that is more comfortable to maintain over time. ## Workshop database We will use a small device-monitoring example. Imagine a database table that receives equipment readings over time. Analysts want to derive features in R and store them in a separate table for dashboards, alerts, or predictive-maintenance models. ```{r} library(DBI) library(RSQLite) library(featdelta) con <- dbConnect(SQLite(), ":memory:") readings <- data.frame( reading_id = 1:10, device_id = c("A", "A", "B", "C", "A", "B", "C", "A", "B", "C"), temperature = c(68, 71, 75, 70, 83, 78, 72, 88, 81, 74), vibration = c(0.12, 0.15, 0.31, 0.18, 0.44, 0.39, 0.22, 0.55, 0.41, 0.24), pressure = c(31, 33, 36, 32, 39, 37, 34, 42, 38, 35), runtime_hours = c(120, 125, 210, 88, 130, 216, 93, 136, 223, 97), stringsAsFactors = FALSE ) day_one <- 1:6 day_two <- 7:10 dbWriteTable( con, "raw_readings", readings[day_one, ], overwrite = TRUE ) source_sql <- " SELECT reading_id, device_id, temperature, vibration, pressure, runtime_hours FROM raw_readings ORDER BY reading_id " key <- "reading_id" dbGetQuery(con, source_sql) ``` This is a small table, but the operating pattern is realistic: raw readings arrive, R creates transformed features, and the database stores the results. ## Start by testing definitions locally Before writing to a production database table, test the feature definitions on a small local extract. This lets you check names, row counts, and basic values without changing the database feature table. ```{r} defs <- fd_define( temp_above_80 = temperature > 80, vibration_score = vibration * runtime_hours, pressure_per_hour = pressure / runtime_hours, maintenance_flag = temp_above_80 | vibration_score > 80 ) raw_preview <- dbGetQuery(con, source_sql) features_preview <- fd_compute( data = raw_preview, defs = defs, key = key ) features_preview ``` This is the first production habit: separate definition development from database writes. When the output table has the expected shape, use the same definitions in `fd_run()`. ```{r} run_initial <- fd_run( con = con, sql = source_sql, defs = defs, key = key, feat_table_name = "reading_features", verbose = FALSE ) dbGetQuery(con, "SELECT * FROM reading_features ORDER BY reading_id") ``` ## Use a development feature table for dry runs `fetch_limit`, `preview_n`, and `return_data` are useful during development, but remember that `fd_run()` is still a database-writing function. If you want to test a pipeline run without touching the production feature table, use a development database or a separate feature table. ```{r} dev_report <- fd_run( con = con, sql = source_sql, defs = defs, key = key, feat_table_name = "reading_features_dev", fetch_limit = 3, return_data = "both", preview_n = 2, verbose = FALSE ) dev_report$preview$raw dev_report$preview$features dev_report$data$features ``` This pattern is useful when changing definitions. You can inspect a small run against a scratch table, then run the same definitions against the real feature table once the output looks right. ## Let the report answer operational questions Every `fd_run()` returns a structured report. In production, this object is more useful than the printed table because it can be logged, inspected, or checked by monitoring code. ```{r} run_initial$success run_initial$stage run_initial$fetch$n_rows run_initial$compute$feature_names run_initial$upsert$counts ``` A typical monitoring check can be simple: did the run succeed, which stage finished, how many rows were fetched, and how many rows were inserted or updated? ## Handle compute failures without losing context During development, a broken feature definition should be easy to diagnose. In a production job, you may also want a failed run to return a report instead of immediately stopping the whole script. Set `fail_fast = FALSE` to capture stage failures in the returned `fd_run_report`. In the example below we define a feature, named `broken_feature`, that won't compute as the definition requires a column that does not exist. ```{r} bad_defs <- fd_define( temp_above_80 = temperature > 80, broken_feature = missing_sensor_column / 10 ) failed_report <- fd_run( con = con, sql = source_sql, defs = bad_defs, key = key, feat_table_name = "reading_features_broken", fail_fast = FALSE, return_data = "both", verbose = FALSE ) failed_report$success failed_report$stage failed_report$error failed_report$compute$report ``` The returned report keeps the failure attached to the compute stage. When possible, it also keeps partial data and the compute report, which makes it easier to see which step failed. In contrast, `fail_fast = TRUE` is useful when the surrounding job scheduler should treat any stage failure as an immediate error. ```{r, eval = FALSE} fd_run( con = con, sql = source_sql, defs = bad_defs, key = key, feat_table_name = "reading_features", fail_fast = TRUE ) ``` ## Choose the right refresh mode For regular daily processing, use the default `fetch_mode = "new_only"`. This mode processes only raw keys that are not yet present in the feature table. ```{r} dbAppendTable(con, "raw_readings", readings[day_two, ]) run_incremental <- fd_run( con = con, sql = source_sql, defs = defs, key = key, feat_table_name = "reading_features", verbose = FALSE ) # The default fetch mode is "new_only". run_incremental$fetch$mode # Only the four new day-two readings were fetched. run_incremental$fetch$n_rows # Four rows were inserted, and no existing feature rows were updated. run_incremental$upsert$counts ``` Use `fetch_mode = "all"` when feature definitions changed and existing rows should be recomputed. ```{r} defs_v2 <- fd_define( temp_above_80 = temperature > 80, vibration_score = vibration * runtime_hours, pressure_per_hour = pressure / runtime_hours, maintenance_flag = temp_above_80 | vibration_score > 80, high_pressure = pressure >= 38 ) run_refresh <- fd_run( con = con, sql = source_sql, defs = defs_v2, key = key, feat_table_name = "reading_features", fetch_mode = "all", verbose = FALSE ) # Refresh mode fetches all rows returned by the source query. run_refresh$fetch$mode run_refresh$fetch$n_rows # All rows already existed in the feature table, so they were updated. # No new rows were inserted. run_refresh$upsert$counts ``` This is one of the most important production distinctions. `new_only` is for incremental append-style processing. `all` is for refreshes, backfills, and definition changes. ## Treat schema evolution as additive With `alter_table = TRUE`, missing feature columns are added to the target table. This is convenient when a new feature is introduced. The package deliberately does not drop, rename, or retire old columns automatically. If a column exists in the database feature table but is no longer produced by the current definitions, it is reported as an extra column and left untouched. In the example below, `defs_v3` no longer produces the previously created `pressure_per_hour` feature. We also use `fetch_mode = "all"` so that all existing rows in the feature table are refreshed with the current definitions. ```{r} defs_v3 <- fd_define( temp_above_80 = temperature > 80, vibration_score = vibration * runtime_hours, maintenance_flag = temp_above_80 | vibration_score > 80, high_pressure = pressure >= 38 ) run_removed_column <- fd_run( con = con, sql = source_sql, defs = defs_v3, key = key, feat_table_name = "reading_features", fetch_mode = "all", verbose = FALSE ) # All ten source rows already existed in the feature table, so they were # counted as updates rather than inserts. run_removed_column$upsert$counts # The database table still contains the old `pressure_per_hour` column. dbGetQuery(con, "pragma table_info(reading_features)") ``` This behavior is intentional. Removing a feature column from a shared database table can affect dashboards, models, and other users. In production, column removal should usually be handled as an explicit database migration. If your organization requires approval before any schema change, set `alter_table = FALSE`. Then a new feature column will cause the run to fail instead of altering the table. To demonstrate this, we define `defs_v4` with a new feature, `thermal_load`, which does not yet exist in `reading_features`. ```{r} defs_v4 <- fd_define( temp_above_80 = temperature > 80, vibration_score = vibration * runtime_hours, maintenance_flag = temp_above_80 | vibration_score > 80, high_pressure = pressure >= 38, thermal_load = temperature * runtime_hours ) alter_error <- tryCatch( fd_run( con = con, sql = source_sql, defs = defs_v4, key = key, feat_table_name = "reading_features", fetch_mode = "all", alter_table = FALSE, verbose = FALSE ), error = function(e) conditionMessage(e) ) alter_error ``` ## Use insert-only mode when duplicates should fail Some pipelines should only append new feature rows. In those workflows, an incoming key that already exists in the feature table may indicate a logic or scheduling problem. Set `update_table = FALSE` to make existing-key conflicts fail. In the example below, `fetch_mode = "all"` sends rows whose keys already exist in `reading_features`, so insert-only mode reports a conflict. ```{r} insert_only_error <- tryCatch( fd_run( con = con, sql = source_sql, defs = defs_v3, key = key, feat_table_name = "reading_features", fetch_mode = "all", update_table = FALSE, verbose = FALSE ), error = function(e) conditionMessage(e) ) insert_only_error ``` This is stricter than the default upsert behavior. Use it when accidental updates should be blocked. ## Use chunking for large writes For large feature tables, writing everything in one batch may be inconvenient. Use `chunk_size` to stage and merge rows in batches. ```{r, eval = FALSE} fd_run( con = con, sql = source_sql, defs = defs, key = key, feat_table_name = "reading_features", chunk_size = 10000 ) ``` Chunking does not change which features are computed. It changes how computed rows are written to the database. ## Be explicit about keys and constraints The key column is the contract between the raw query and the feature table. In production, make sure that: 1. the source query returns the key column; 2. the key values are unique and non-missing in the source data; 3. the feature table has the same key column; 4. existing feature tables used with upsert behavior have a primary key or unique constraint on that key. Tables created by `fd_upsert()` include a primary key automatically. If you are using a pre-existing table, check the database schema before running the pipeline. ## Avoid concurrent writes to the same feature table `featdelta` does not currently coordinate concurrent writers. Avoid running multiple `fd_run()` or `fd_upsert()` calls against the same feature table at the same time. Concurrent writes to different feature tables are independent, but concurrent writes to the same target table should be handled by your scheduler, database permissions, or orchestration layer. ## A practical production checklist Before using a feature pipeline repeatedly, it is worth checking a few things. 1. The source SQL returns exactly the population you intend to process. 2. The source SQL returns the key column. 3. The key is unique and has no missing values. 4. Feature definitions have been tested with `fd_compute()`. 5. A small `fd_run()` has been tested against a development table. 6. The intended refresh mode is clear: `new_only` or `all`. 7. Schema evolution is intentional: `alter_table = TRUE` or `FALSE`. 8. The feature table has a primary key or unique constraint on the key. 9. The run report is logged or inspected. 10. The scheduler prevents overlapping writes to the same feature table. The package handles the repeated database mechanics, but these operational decisions still belong to the user or the surrounding production system. ## What to remember Production use is mostly about making the feature workflow predictable. Develop definitions locally, use a scratch table for small end-to-end tests, inspect the run report, choose the correct refresh mode, and be intentional about schema changes. For regular repeated jobs, `fd_run()` is the main entry point. The supporting functions, especially `fd_compute()`, `fd_fetch()`, and `fd_upsert()`, are there when you need to inspect or test individual stages. ```{r cleanup, include = FALSE} dbDisconnect(con) ```