8  Developer Guide

9 Developer Guide

This chapter covers code patterns, contribution guidelines, and how to extend the pipeline.

9.1 Code Patterns

9.1.1 Transform Function Template

All transform functions follow this pattern:

# ============================================================================
# module_name.R
# Brief description of what this module transforms
# ============================================================================

# ============================================================================
# Module Constants
# ============================================================================

# Expected columns in lookup table
MODULE_LOOKUP_COLS <- c("code", "code_definition")

# Sentinel values
MODULE_MISSING_VALUE <- "UNDEFINED"

# ============================================================================
# Transformation Function
# ============================================================================

#' Transform Module Name
#'
#' @description
#' Detailed description of what this function does.
#'
#' @param dt data.table containing BMF data with required column
#' @param input_col character name of input column (default: "COLUMN")
#' @param lookup data.table lookup table for code definitions
#'
#' @return data.table with new columns:
#'   \itemize{
#'     \item column_code - Standardized code
#'     \item column_code_definition - Human-readable definition
#'   }
#'
#' @examples
#' \dontrun{
#' bmf <- transform_module(bmf_raw, lookup = module_lookup)
#' }
#'
#' @export
transform_module <- function(dt,
                             input_col = "COLUMN",
                             lookup = module_lookup) {

  # Input validation
  validate_data_table(dt, c(input_col), context = "BMF data")
  validate_lookup(lookup, MODULE_LOOKUP_COLS, "module_lookup")

  # Safe copy (don't mutate input)
  dt_safe <- data.table::copy(dt)

  # Transformation logic
  dt_safe[, column_code := as.integer(get(input_col))]

  # Join with lookup
  dt_safe[lookup, column_code_definition := i.code_definition,
          on = .(column_code = code)]

  # Validate join success
  validate_join_success(
    dt_safe,
    key_col = "column_code",
    result_col = "column_code_definition",
    lookup_name = "module_lookup"
  )

  # Quality report
  message(sprintf(
    "Module: %s records processed",
    format(nrow(dt_safe), big.mark = ",")
  ))

  return(dt_safe)
}

9.1.2 Key Principles

  1. Pure Functions: Don’t mutate input - use data.table::copy()
  2. Validate Early: Check inputs before processing
  3. Fail Fast: Stop on errors, warn on data quality issues
  4. Document: Full roxygen2 documentation
  5. Report Quality: Log processing metrics

9.2 Adding a New Transform

9.2.1 Step 1: Create the Transform File

Create R/new_field.R:

# ============================================================================
# new_field.R
# Transform NEW_FIELD column
# ============================================================================

# Module Constants
NEW_FIELD_LOOKUP_COLS <- c("new_field_code", "new_field_definition")

#' Transform New Field
#'
#' @param dt data.table containing BMF data
#' @param input_col character name of input column (default: "NEW_FIELD")
#' @param lookup data.table lookup table (default: from lookup_ls)
#'
#' @return data.table with new_field_code and new_field_definition columns
#'
#' @export
transform_bmf_new_field <- function(dt,
                                    input_col = "NEW_FIELD",
                                    lookup = lookup_ls$new_field) {

  # Input validation

  validate_data_table(dt, c(input_col), context = "BMF data")
  validate_lookup(lookup, NEW_FIELD_LOOKUP_COLS, "new_field_lookup")

  # Safe copy

  dt_safe <- data.table::copy(dt)

  # Transform and join
  dt_safe[, new_field_code := as.character(get(input_col))]
  dt_safe[lookup, new_field_definition := i.new_field_definition,
          on = .(new_field_code)]

  # Validate join
  validate_join_success(dt_safe, "new_field_code", "new_field_definition",
                        "new_field_lookup")

  return(dt_safe)
}

9.2.2 Step 2: Create Lookup Table (if needed)

Add a new sheet to data/lookup/bmf_code_lookup.xlsx:

new_field_code new_field_definition
1 First Value
2 Second Value

The lookup will be automatically loaded via config.R and accessible as lookup_ls$new_field.

9.2.3 Step 3: Add to Orchestrator

Edit R/run_pipeline.R:

# Source the new file
source(here::here("R", "new_field.R"))

# Add transformation call (in appropriate phase)
log_transform_start("New Field")
bmf <- transform_new_field(bmf)

9.2.4 Step 4: Update Quality Checks

Add expected output columns to R/quality/post_checks.R:

# Add to BMF_OUTPUT_COLUMNS vector
BMF_OUTPUT_COLUMNS <- c(
  # ... existing columns
  "new_field_code",
  "new_field_definition"
)

# Add to COLUMN_CATEGORIES for detailed quality reporting
COLUMN_CATEGORIES <- list(
  # ... existing categories
  new_field = list(
    name = "New Field",
    columns = list(
      new_field_code = list(type = "code"),
      new_field_definition = list(type = "character")
    )
  )
)

# Add to SOURCE_COLUMN_MAP for data lineage tracking
SOURCE_COLUMN_MAP <- list(
  # ... existing mappings
  new_field_code = "NEW_FIELD",
  new_field_definition = "NEW_FIELD"
)

# Add to COLUMN_DESCRIPTIONS for data dictionary
COLUMN_DESCRIPTIONS <- list(
  # ... existing descriptions
  new_field_code = "Standardized new field code",
  new_field_definition = "Human-readable new field description"
)

9.2.5 Step 5: Update Documentation

Add to this guidebook: - Data lineage entry - Transform reference entry - Lookup table schema


9.3 Validation Functions

9.3.1 validate_data_table()

validate_data_table <- function(dt, required_cols, context = "data.table") {
  if (!inherits(dt, "data.table")) {
    stop(sprintf("%s must be a data.table, got %s", context, class(dt)[1]))
  }

  missing_cols <- setdiff(required_cols, names(dt))
  if (length(missing_cols) > 0) {
    stop(sprintf("%s missing required columns: %s",
                 context, paste(missing_cols, collapse = ", ")))
  }

  invisible(TRUE)
}

9.3.2 validate_lookup()

validate_lookup <- function(lookup, required_cols, lookup_name = "lookup table") {
  if (!inherits(lookup, "data.table")) {
    stop(sprintf("%s must be a data.table, got %s", lookup_name, class(lookup)[1]))
  }

  missing_cols <- setdiff(required_cols, names(lookup))
  if (length(missing_cols) > 0) {
    stop(sprintf("%s missing required columns: %s",
                 lookup_name, paste(missing_cols, collapse = ", ")))
  }

  if (nrow(lookup) == 0) {
    stop(sprintf("%s is empty (0 rows)", lookup_name))
  }

  invisible(TRUE)
}

9.3.3 validate_join_success()

validate_join_success <- function(dt,
                                  key_col,
                                  result_col,
                                  lookup_name = "lookup",
                                  warn_threshold = 0.01,
                                  error_threshold = NULL) {

  total_rows <- nrow(dt)
  unmatched_count <- dt[!is.na(get(key_col)) & is.na(get(result_col)), .N]
  unmatched_pct <- unmatched_count / total_rows

  result <- list(
    unmatched_count = unmatched_count,
    unmatched_pct = unmatched_pct,
    total_rows = total_rows
  )

  if (!is.null(error_threshold) && unmatched_pct > error_threshold) {
    stop(sprintf("Join to %s failed: %d rows (%.2f%%) unmatched",
                 lookup_name, unmatched_count, unmatched_pct * 100))
  }

  if (unmatched_pct > warn_threshold) {
    warning(sprintf("Join to %s: %d rows (%.2f%%) unmatched",
                    lookup_name, unmatched_count, unmatched_pct * 100))
  }

  invisible(result)
}

9.3.4 validate_numeric_range()

validate_numeric_range <- function(dt, col_name, allow_negative = FALSE) {
  values <- dt[[col_name]]

  if (!is.numeric(values)) {
    stop(sprintf("Column '%s' is not numeric", col_name))
  }

  negative_count <- sum(values < 0, na.rm = TRUE)

  if (negative_count > 0 && !allow_negative) {
    negative_examples <- head(unique(values[values < 0]), 5)
    warning(sprintf("Column '%s' contains %d negative values. Examples: %s",
                    col_name, negative_count,
                    paste(negative_examples, collapse = ", ")))
  }

  invisible(negative_count)
}

9.4 Logging Functions

The logging system supports configurable log levels via the BMF_LOG_LEVEL environment variable (DEBUG, INFO, WARN, ERROR).

9.4.1 Core Logging Functions

# Log level filtering is automatic based on BMF_LOG_LEVEL
log_debug <- function(msg)  # Only shown if BMF_LOG_LEVEL=DEBUG
log_info <- function(msg)   # Standard informational messages
log_warn <- function(msg)   # Warning messages
log_error <- function(msg)  # Error messages (stops execution)

9.4.2 log_info()

log_info <- function(msg) {
  if (.should_log("INFO")) {
    message(sprintf("[%s] [INFO] %s",
                    format(Sys.time(), "%Y-%m-%d %H:%M:%S"), msg))
  }
  invisible(NULL)
}

9.4.3 log_phase_start()

log_phase_start <- function(phase_name) {
  message("")
  message(strrep("=", 60))
  log_info(sprintf("PHASE: %s", phase_name))
  message(strrep("=", 60))
  invisible(NULL)
}

9.4.4 log_transform_start()

log_transform_start <- function(transform_name, input_rows = NULL) {
  if (is.null(input_rows)) {
    log_info(sprintf("Starting transformation: %s", transform_name))
  } else {
    log_info(sprintf("Starting transformation: %s (%s rows)",
                     transform_name, format(input_rows, big.mark = ",")))
  }
  invisible(NULL)
}

9.4.5 log_transform_complete()

log_transform_complete <- function(transform_name, output_rows = NULL, duration_secs = NULL) {
  parts <- sprintf("Completed transformation: %s", transform_name)
  if (!is.null(output_rows)) {
    parts <- sprintf("%s (%s rows)", parts, format(output_rows, big.mark = ","))
  }
  if (!is.null(duration_secs)) {
    parts <- sprintf("%s in %.2f seconds", parts, duration_secs)
  }
  log_info(parts)
  invisible(NULL)
}

9.4.6 log_quality_report()

log_quality_report <- function(transform_name, total, valid,
                               invalid = NULL, undefined = NULL) {
  message("")
  message(sprintf("--- %s Quality Report ---", transform_name))
  message(sprintf("  Total Records:    %s", format(total, big.mark = ",")))
  message(sprintf("  Valid:            %s (%.2f%%)",
                  format(valid, big.mark = ","), (valid / total) * 100))
  # ... additional metrics if provided
  message("")
  invisible(NULL)
}

9.5 Checkpoint Functions

Checkpoints allow pipeline recovery and debugging by preserving intermediate states.

9.5.1 save_checkpoint()

save_checkpoint <- function(dt, checkpoint_name) {
  # Only saves if ENABLE_CHECKPOINTS is TRUE
  # Files saved to: CHECKPOINT_DIR/bmf_{YEAR}_{MONTH}_{checkpoint_name}.parquet
}

9.5.2 load_checkpoint()

load_checkpoint <- function(checkpoint_name) {
  # Returns data.table if checkpoint exists, NULL otherwise
  # Example: bmf <- load_checkpoint("02_identity")
}

9.5.3 list_checkpoints()

list_checkpoints <- function() {
  # Returns character vector of available checkpoint names
  # Example output: c("01_raw", "02_identity", "03_classification")
}

9.5.4 clear_checkpoints()

clear_checkpoints <- function(confirm = FALSE) {
  # Removes all checkpoint files for current processing year/month
  # Must set confirm = TRUE to actually delete files
}

9.6 Testing

9.6.1 Manual Testing Pattern

# Load a checkpoint
bmf <- load_checkpoint("02_identity")

# Test single transform
result <- transform_new_field(bmf)

# Verify outputs
names(result)
result[, .N, by = new_field_code]
result[is.na(new_field_definition)]

9.6.2 Creating Test Data

# Minimal test data.table
test_dt <- data.table::data.table(
  EIN = c("123456789", "987654321"),
  NEW_FIELD = c("A", "B")
)

# Run transform
result <- transform_new_field(test_dt)

# Assert expected outcomes
stopifnot(nrow(result) == 2)
stopifnot("new_field_code" %in% names(result))

9.7 Git Workflow

9.7.1 Branch Naming

  • feature/add-new-field-transform
  • fix/affiliation-code-validation
  • docs/update-data-lineage

9.7.2 Commit Messages

feat: add transform for NEW_FIELD column

- Create new_field.R with transform function
- Add lookup table to data/lookup/
- Update orchestrator to include new transform
- Update documentation

9.7.3 Pull Request Checklist


9.8 Common Issues

9.8.1 Issue: “object ‘lookup_ls’ not found”

Cause: Lookup tables not loaded before transform is called

Fix: Ensure source() order in orchestrator loads config.R first:

source(here::here("R", "config.R"))  # Loads lookup_ls from Excel workbook
source(here::here("R", "new_field.R"))  # Uses lookup_ls$new_field

Also verify the sheet exists in bmf_code_lookup.xlsx:

names(lookup_ls)  # Should include "new_field"

9.8.2 Issue: “Column ‘xxx’ not found”

Cause: Transform expects different column name than in data

Fix: Check input column name matches BMF schema:

# Verify column exists
"NEW_FIELD" %in% names(bmf_raw)

9.8.3 Issue: Join produces all NAs

Cause: Key column types don’t match between data and lookup

Fix: Ensure consistent types:

# In transform - convert to same type as lookup
dt_safe[, code := as.character(get(input_col))]

# Check lookup column types
str(lookup_ls$new_field)

# If needed, convert lookup column type
lookup_ls$new_field[, code := as.character(code)]

9.8.4 Issue: Checkpoint won’t load

Cause: Arrow/Parquet version mismatch or corrupt file

Fix:

# Check if file exists and is readable
file.exists("data/checkpoints/bmf_2025_02_identity.parquet")

# Try reading directly
arrow::read_parquet("data/checkpoints/bmf_2025_02_identity.parquet")

9.9 Performance Tips

9.9.1 Use data.table Efficiently

# Good: Update by reference
dt[, new_col := transform(old_col)]

# Bad: Create copy with base R
dt$new_col <- transform(dt$old_col)

9.9.2 Minimize Copies

# Good: Single copy at start
dt_safe <- data.table::copy(dt)
# ... all operations on dt_safe

# Bad: Multiple intermediate copies
dt1 <- copy(dt)
dt2 <- copy(dt1)

9.9.3 Use Keys for Joins

# Good: Set keys before join
setkey(lookup, code)
setkey(dt, code_col)
dt[lookup, definition := i.definition]

# Okay: Specify on= for ad-hoc join
dt[lookup, definition := i.definition, on = .(code_col = code)]

9.10 Future Enhancements

9.10.1 Planned Improvements

  1. targets Integration: Migrate to targets package for DAG-based orchestration
  2. Unit Tests: Add testthat test suite
  3. DuckDB Backend: For larger-than-memory processing
  4. Address Geocoding: Add lat/long from addresses
  5. E-file Integration: Complement BMF with Form 990 e-file data

9.10.2 Contributing

  1. Fork the repository
  2. Create a feature branch
  3. Follow code patterns documented above
  4. Submit a pull request with documentation updates