8 Developer Guide
9 Developer Guide
This chapter covers code patterns, contribution guidelines, and how to extend the pipeline.
9.1 Code Patterns
9.1.1 Transform Function Template
All transform functions follow this pattern:
# ============================================================================
# module_name.R
# Brief description of what this module transforms
# ============================================================================
# ============================================================================
# Module Constants
# ============================================================================
# Expected columns in lookup table
MODULE_LOOKUP_COLS <- c("code", "code_definition")
# Sentinel values
MODULE_MISSING_VALUE <- "UNDEFINED"
# ============================================================================
# Transformation Function
# ============================================================================
#' Transform Module Name
#'
#' @description
#' Detailed description of what this function does.
#'
#' @param dt data.table containing BMF data with required column
#' @param input_col character name of input column (default: "COLUMN")
#' @param lookup data.table lookup table for code definitions
#'
#' @return data.table with new columns:
#' \itemize{
#' \item column_code - Standardized code
#' \item column_code_definition - Human-readable definition
#' }
#'
#' @examples
#' \dontrun{
#' bmf <- transform_module(bmf_raw, lookup = module_lookup)
#' }
#'
#' @export
transform_module <- function(dt,
input_col = "COLUMN",
lookup = module_lookup) {
# Input validation
validate_data_table(dt, c(input_col), context = "BMF data")
validate_lookup(lookup, MODULE_LOOKUP_COLS, "module_lookup")
# Safe copy (don't mutate input)
dt_safe <- data.table::copy(dt)
# Transformation logic
dt_safe[, column_code := as.integer(get(input_col))]
# Join with lookup
dt_safe[lookup, column_code_definition := i.code_definition,
on = .(column_code = code)]
# Validate join success
validate_join_success(
dt_safe,
key_col = "column_code",
result_col = "column_code_definition",
lookup_name = "module_lookup"
)
# Quality report
message(sprintf(
"Module: %s records processed",
format(nrow(dt_safe), big.mark = ",")
))
return(dt_safe)
}9.1.2 Key Principles
- Pure Functions: Don’t mutate input - use
data.table::copy() - Validate Early: Check inputs before processing
- Fail Fast: Stop on errors, warn on data quality issues
- Document: Full roxygen2 documentation
- Report Quality: Log processing metrics
9.2 Adding a New Transform
9.2.1 Step 1: Create the Transform File
Create R/new_field.R:
# ============================================================================
# new_field.R
# Transform NEW_FIELD column
# ============================================================================
# Module Constants
NEW_FIELD_LOOKUP_COLS <- c("new_field_code", "new_field_definition")
#' Transform New Field
#'
#' @param dt data.table containing BMF data
#' @param input_col character name of input column (default: "NEW_FIELD")
#' @param lookup data.table lookup table (default: from lookup_ls)
#'
#' @return data.table with new_field_code and new_field_definition columns
#'
#' @export
transform_bmf_new_field <- function(dt,
input_col = "NEW_FIELD",
lookup = lookup_ls$new_field) {
# Input validation
validate_data_table(dt, c(input_col), context = "BMF data")
validate_lookup(lookup, NEW_FIELD_LOOKUP_COLS, "new_field_lookup")
# Safe copy
dt_safe <- data.table::copy(dt)
# Transform and join
dt_safe[, new_field_code := as.character(get(input_col))]
dt_safe[lookup, new_field_definition := i.new_field_definition,
on = .(new_field_code)]
# Validate join
validate_join_success(dt_safe, "new_field_code", "new_field_definition",
"new_field_lookup")
return(dt_safe)
}9.2.2 Step 2: Create Lookup Table (if needed)
Add a new sheet to data/lookup/bmf_code_lookup.xlsx:
| new_field_code | new_field_definition |
|---|---|
| 1 | First Value |
| 2 | Second Value |
The lookup will be automatically loaded via config.R and accessible as lookup_ls$new_field.
9.2.3 Step 3: Add to Orchestrator
Edit R/run_pipeline.R:
# Source the new file
source(here::here("R", "new_field.R"))
# Add transformation call (in appropriate phase)
log_transform_start("New Field")
bmf <- transform_new_field(bmf)9.2.4 Step 4: Update Quality Checks
Add expected output columns to R/quality/post_checks.R:
# Add to BMF_OUTPUT_COLUMNS vector
BMF_OUTPUT_COLUMNS <- c(
# ... existing columns
"new_field_code",
"new_field_definition"
)
# Add to COLUMN_CATEGORIES for detailed quality reporting
COLUMN_CATEGORIES <- list(
# ... existing categories
new_field = list(
name = "New Field",
columns = list(
new_field_code = list(type = "code"),
new_field_definition = list(type = "character")
)
)
)
# Add to SOURCE_COLUMN_MAP for data lineage tracking
SOURCE_COLUMN_MAP <- list(
# ... existing mappings
new_field_code = "NEW_FIELD",
new_field_definition = "NEW_FIELD"
)
# Add to COLUMN_DESCRIPTIONS for data dictionary
COLUMN_DESCRIPTIONS <- list(
# ... existing descriptions
new_field_code = "Standardized new field code",
new_field_definition = "Human-readable new field description"
)9.2.5 Step 5: Update Documentation
Add to this guidebook: - Data lineage entry - Transform reference entry - Lookup table schema
9.3 Validation Functions
9.3.1 validate_data_table()
validate_data_table <- function(dt, required_cols, context = "data.table") {
if (!inherits(dt, "data.table")) {
stop(sprintf("%s must be a data.table, got %s", context, class(dt)[1]))
}
missing_cols <- setdiff(required_cols, names(dt))
if (length(missing_cols) > 0) {
stop(sprintf("%s missing required columns: %s",
context, paste(missing_cols, collapse = ", ")))
}
invisible(TRUE)
}9.3.2 validate_lookup()
validate_lookup <- function(lookup, required_cols, lookup_name = "lookup table") {
if (!inherits(lookup, "data.table")) {
stop(sprintf("%s must be a data.table, got %s", lookup_name, class(lookup)[1]))
}
missing_cols <- setdiff(required_cols, names(lookup))
if (length(missing_cols) > 0) {
stop(sprintf("%s missing required columns: %s",
lookup_name, paste(missing_cols, collapse = ", ")))
}
if (nrow(lookup) == 0) {
stop(sprintf("%s is empty (0 rows)", lookup_name))
}
invisible(TRUE)
}9.3.3 validate_join_success()
validate_join_success <- function(dt,
key_col,
result_col,
lookup_name = "lookup",
warn_threshold = 0.01,
error_threshold = NULL) {
total_rows <- nrow(dt)
unmatched_count <- dt[!is.na(get(key_col)) & is.na(get(result_col)), .N]
unmatched_pct <- unmatched_count / total_rows
result <- list(
unmatched_count = unmatched_count,
unmatched_pct = unmatched_pct,
total_rows = total_rows
)
if (!is.null(error_threshold) && unmatched_pct > error_threshold) {
stop(sprintf("Join to %s failed: %d rows (%.2f%%) unmatched",
lookup_name, unmatched_count, unmatched_pct * 100))
}
if (unmatched_pct > warn_threshold) {
warning(sprintf("Join to %s: %d rows (%.2f%%) unmatched",
lookup_name, unmatched_count, unmatched_pct * 100))
}
invisible(result)
}9.3.4 validate_numeric_range()
validate_numeric_range <- function(dt, col_name, allow_negative = FALSE) {
values <- dt[[col_name]]
if (!is.numeric(values)) {
stop(sprintf("Column '%s' is not numeric", col_name))
}
negative_count <- sum(values < 0, na.rm = TRUE)
if (negative_count > 0 && !allow_negative) {
negative_examples <- head(unique(values[values < 0]), 5)
warning(sprintf("Column '%s' contains %d negative values. Examples: %s",
col_name, negative_count,
paste(negative_examples, collapse = ", ")))
}
invisible(negative_count)
}9.4 Logging Functions
The logging system supports configurable log levels via the BMF_LOG_LEVEL environment variable (DEBUG, INFO, WARN, ERROR).
9.4.1 Core Logging Functions
# Log level filtering is automatic based on BMF_LOG_LEVEL
log_debug <- function(msg) # Only shown if BMF_LOG_LEVEL=DEBUG
log_info <- function(msg) # Standard informational messages
log_warn <- function(msg) # Warning messages
log_error <- function(msg) # Error messages (stops execution)9.4.2 log_info()
log_info <- function(msg) {
if (.should_log("INFO")) {
message(sprintf("[%s] [INFO] %s",
format(Sys.time(), "%Y-%m-%d %H:%M:%S"), msg))
}
invisible(NULL)
}9.4.3 log_phase_start()
log_phase_start <- function(phase_name) {
message("")
message(strrep("=", 60))
log_info(sprintf("PHASE: %s", phase_name))
message(strrep("=", 60))
invisible(NULL)
}9.4.4 log_transform_start()
log_transform_start <- function(transform_name, input_rows = NULL) {
if (is.null(input_rows)) {
log_info(sprintf("Starting transformation: %s", transform_name))
} else {
log_info(sprintf("Starting transformation: %s (%s rows)",
transform_name, format(input_rows, big.mark = ",")))
}
invisible(NULL)
}9.4.5 log_transform_complete()
log_transform_complete <- function(transform_name, output_rows = NULL, duration_secs = NULL) {
parts <- sprintf("Completed transformation: %s", transform_name)
if (!is.null(output_rows)) {
parts <- sprintf("%s (%s rows)", parts, format(output_rows, big.mark = ","))
}
if (!is.null(duration_secs)) {
parts <- sprintf("%s in %.2f seconds", parts, duration_secs)
}
log_info(parts)
invisible(NULL)
}9.4.6 log_quality_report()
log_quality_report <- function(transform_name, total, valid,
invalid = NULL, undefined = NULL) {
message("")
message(sprintf("--- %s Quality Report ---", transform_name))
message(sprintf(" Total Records: %s", format(total, big.mark = ",")))
message(sprintf(" Valid: %s (%.2f%%)",
format(valid, big.mark = ","), (valid / total) * 100))
# ... additional metrics if provided
message("")
invisible(NULL)
}9.5 Checkpoint Functions
Checkpoints allow pipeline recovery and debugging by preserving intermediate states.
9.5.1 save_checkpoint()
save_checkpoint <- function(dt, checkpoint_name) {
# Only saves if ENABLE_CHECKPOINTS is TRUE
# Files saved to: CHECKPOINT_DIR/bmf_{YEAR}_{MONTH}_{checkpoint_name}.parquet
}9.5.2 load_checkpoint()
load_checkpoint <- function(checkpoint_name) {
# Returns data.table if checkpoint exists, NULL otherwise
# Example: bmf <- load_checkpoint("02_identity")
}9.5.3 list_checkpoints()
list_checkpoints <- function() {
# Returns character vector of available checkpoint names
# Example output: c("01_raw", "02_identity", "03_classification")
}9.5.4 clear_checkpoints()
clear_checkpoints <- function(confirm = FALSE) {
# Removes all checkpoint files for current processing year/month
# Must set confirm = TRUE to actually delete files
}9.6 Testing
9.6.1 Manual Testing Pattern
# Load a checkpoint
bmf <- load_checkpoint("02_identity")
# Test single transform
result <- transform_new_field(bmf)
# Verify outputs
names(result)
result[, .N, by = new_field_code]
result[is.na(new_field_definition)]9.6.2 Creating Test Data
# Minimal test data.table
test_dt <- data.table::data.table(
EIN = c("123456789", "987654321"),
NEW_FIELD = c("A", "B")
)
# Run transform
result <- transform_new_field(test_dt)
# Assert expected outcomes
stopifnot(nrow(result) == 2)
stopifnot("new_field_code" %in% names(result))9.7 Git Workflow
9.7.1 Branch Naming
feature/add-new-field-transformfix/affiliation-code-validationdocs/update-data-lineage
9.7.2 Commit Messages
feat: add transform for NEW_FIELD column
- Create new_field.R with transform function
- Add lookup table to data/lookup/
- Update orchestrator to include new transform
- Update documentation
9.7.3 Pull Request Checklist
9.8 Common Issues
9.8.1 Issue: “object ‘lookup_ls’ not found”
Cause: Lookup tables not loaded before transform is called
Fix: Ensure source() order in orchestrator loads config.R first:
source(here::here("R", "config.R")) # Loads lookup_ls from Excel workbook
source(here::here("R", "new_field.R")) # Uses lookup_ls$new_fieldAlso verify the sheet exists in bmf_code_lookup.xlsx:
names(lookup_ls) # Should include "new_field"9.8.2 Issue: “Column ‘xxx’ not found”
Cause: Transform expects different column name than in data
Fix: Check input column name matches BMF schema:
# Verify column exists
"NEW_FIELD" %in% names(bmf_raw)9.8.3 Issue: Join produces all NAs
Cause: Key column types don’t match between data and lookup
Fix: Ensure consistent types:
# In transform - convert to same type as lookup
dt_safe[, code := as.character(get(input_col))]
# Check lookup column types
str(lookup_ls$new_field)
# If needed, convert lookup column type
lookup_ls$new_field[, code := as.character(code)]9.8.4 Issue: Checkpoint won’t load
Cause: Arrow/Parquet version mismatch or corrupt file
Fix:
# Check if file exists and is readable
file.exists("data/checkpoints/bmf_2025_02_identity.parquet")
# Try reading directly
arrow::read_parquet("data/checkpoints/bmf_2025_02_identity.parquet")9.9 Performance Tips
9.9.1 Use data.table Efficiently
# Good: Update by reference
dt[, new_col := transform(old_col)]
# Bad: Create copy with base R
dt$new_col <- transform(dt$old_col)9.9.2 Minimize Copies
# Good: Single copy at start
dt_safe <- data.table::copy(dt)
# ... all operations on dt_safe
# Bad: Multiple intermediate copies
dt1 <- copy(dt)
dt2 <- copy(dt1)9.9.3 Use Keys for Joins
# Good: Set keys before join
setkey(lookup, code)
setkey(dt, code_col)
dt[lookup, definition := i.definition]
# Okay: Specify on= for ad-hoc join
dt[lookup, definition := i.definition, on = .(code_col = code)]9.10 Future Enhancements
9.10.1 Planned Improvements
- targets Integration: Migrate to targets package for DAG-based orchestration
- Unit Tests: Add testthat test suite
- DuckDB Backend: For larger-than-memory processing
- Address Geocoding: Add lat/long from addresses
- E-file Integration: Complement BMF with Form 990 e-file data
9.10.2 Contributing
- Fork the repository
- Create a feature branch
- Follow code patterns documented above
- Submit a pull request with documentation updates