2  Pipeline Architecture

3 Pipeline Overview

The BMF processing pipeline is organized into 11 sequential phases, each with a specific responsibility. The pipeline is orchestrated by R/run_pipeline.R.

3.1 11-Phase Pipeline

flowchart TB
    classDef node fill:#d2d2d2,stroke:#333,color:#000

    subgraph Acquisition[Data Acquisition]
        P1["(1) Extraction"]
        P2["(2) Pre-Validation"]
        P1 --> P2
    end

    subgraph Transformation[Data Transformation]
        P3["(3) Identity"]
        P4["(4) Classification"]
        P5["(5) Activity"]
        P6["(6) Temporal"]
        P7["(7) Financial"]
        P8["(8) Filing"]
        P3 --> P4 --> P5 --> P6 --> P7 --> P8
    end

    subgraph Output[Data Output]
        P9["(9) Post-Validation"]
        P10["(10) Intermediate"]
        P11["(11) Processed"]
        P9 --> P10 --> P11
    end

    Acquisition --> Transformation
    Transformation --> Output

    style Acquisition fill:#fff9e6,stroke:#fdbf11
    style Transformation fill:#fee39b,stroke:#fdbf11
    style Output fill:#fdbf11,stroke:#d19c0f

3.2 Phase Details

3.2.1 Phase 1: Extraction

Downloads BMF data from S3 bucket nccsdata. A Lambda function ingests monthly BMF files from the IRS and deposits them at nccsdata/raw/bmf/YYYY-MM-BMF.csv.

The pipeline: 1. Connects to S3 using aws.s3 package 2. Downloads the most recent BMF file (or a specified year/month) 3. Extracts processing year and month from filename 4. Saves local copy to data/raw/bmf_YYYY_MM.csv

# Download most recent BMF
bmf_raw <- download_bmf_from_s3()

# Download specific month
bmf_raw <- download_bmf_from_s3(year = 2025, month = 1)

# List available files
list_available_bmf_files()
# [1] "2025-01" "2024-12" "2024-11" ...

3.2.2 Phase 2: Pre-Transformation Validation

Validates the raw data before any transformations:

  • Column Check: Ensures all 31 required columns exist
  • Duplicate Detection: Counts duplicate EINs
  • NULL Inventory: Reports NULL counts per column

If validation fails in strict mode, the pipeline stops.

3.2.3 Phase 3: Identity Transformations

Transforms organization identification fields:

Transform Input Outputs
transform_ein() EIN ein, ein_raw
transform_organization_name() NAME org_name_raw, org_name_join, org_name_display, org_legal_suffix, org_parent_name
transform_dba_name() SORT_NAME dba_name, dba_name_raw
transform_bmf_ico_name() ICO in_care_of_name_raw, in_care_of_name_clean, in_care_of_name_provided
transform_bmf_group_exemption_number() GROUP group_exemption_number_raw, group_exemption_number, group_exemption_is_member
transform_bmf_ruling_date() RULING ruling_date_ym_str, ruling_date, ruling_date_is_missing
transform_address() STREET, CITY, STATE, ZIP 17 columns (see Transform Reference)

3.2.4 Phase 4: Classification Transformations

Adds IRS classification codes with human-readable definitions. All lookup tables are sheets in bmf_code_lookup.xlsx:

Transform Lookup Sheet Outputs
transform_bmf_subsection_classification_codes() subsection_classification_code subsection_code, classification_code, exempt_organization_type, all_classifications_string
transform_bmf_affiliation_code() affiliation_code affiliation_code, affiliation_code_definition
transform_bmf_deductibility_code() deductibility_code deductibility_code, deductibility_code_definition
transform_bmf_foundation_code() foundation_code foundation_code, foundation_code_definition
transform_bmf_organization_code() organization_code organization_code, organization_code_definition
transform_bmf_status_code() status_code status_code, status_code_definition

3.2.5 Phase 5: Activity Transformations

Transforms multi-valued activity fields. Internally creates dimension tables for joins, since each activity code references multiple activities.

Transform Pattern Outputs
transform_bmf_activity_code() Dimension table + Aggregate activity_code, activity_code_definitions, activity_code_categories
transform_ntee_code() Complex ntee_code_raw, ntee_code_clean, ntee_code_definition, ntee_code_major_group, ntee_common_code, ntee_common_code_definition, naics_code, nteev2, nteev2_code, nteev2_subsector, nteev2_org_type

3.2.6 Phase 6: Temporal Transformations

Parses date and period fields:

Transform Input Outputs
transform_tax_period() TAX_PERIOD tax_period_ym_str, tax_period_ymd, tax_period_is_missing
transform_accounting_period() ACCT_PD accounting_period

3.2.7 Phase 7: Financial Transformations

Processes financial codes and amounts:

Transform Input Outputs
transform_bmf_asset_code() ASSET_CD asset_code, asset_code_definition
transform_bmf_income_code() INCOME_CD income_code, income_code_definition
transform_asset_amount() ASSET_AMT asset_amount
transform_income_amount() INCOME_AMT income_amount
transform_revenue_amount() REVENUE_AMT revenue_amount

3.2.8 Phase 8: Filing Requirement Transformations

Adds filing requirement classifications:

Transform Input Outputs
transform_bmf_filing_requirement_code() FILING_REQ_CD filing_requirement_code, filing_requirement_code_definition
transform_bmf_pf_filing_requirement_code() PF_FILING_REQ_CD pf_filing_requirement_code, pf_filing_requirement_code_definition

3.2.9 Phase 9: Post-Transformation Validation

Generates a comprehensive quality report:

  • Row preservation check (input rows = output rows)
  • Completeness rates for all columns
  • Critical field validation (EIN)
  • Summary statistics

3.2.10 Phase 10: Intermediate Output

Saves the complete BMF with all columns (both raw IRS columns and transformed columns) to local storage and S3. This preserves the full data lineage for debugging and auditing.

What Gets Saved:

File Format Contents
Intermediate BMF Parquet All ~107 columns (31 raw + ~76 transformed)
Quality Report JSON Validation metrics and summary statistics

Local Files:

  • data/intermediate/bmf_YYYY_MM_intermediate.parquet
  • data/quality/bmf_YYYY_MM_quality_report.json

S3 Upload (when ENABLE_S3_UPLOAD = TRUE):

Uses upload_bmf_results() function to upload to the intermediate folder:

  • s3://nccsdata/intermediate/bmf/YYYY_MM/bmf_YYYY_MM_intermediate.parquet
  • s3://nccsdata/intermediate/bmf/YYYY_MM/bmf_YYYY_MM_quality_report.json

3.2.11 Phase 11: Processed Output

Creates a clean version of the BMF by removing the original IRS columns, keeping only transformed columns. Also generates a data dictionary describing each column.

Raw Columns Removed (31 total):

EIN, NAME, ICO, STREET, CITY, STATE, ZIP, GROUP, SUBSECTION,
AFFILIATION, CLASSIFICATION, RULING, DEDUCTIBILITY, FOUNDATION,
ACTIVITY, ORGANIZATION, STATUS, TAX_PERIOD, ASSET_CD, INCOME_CD,
FILING_REQ_CD, PF_FILING_REQ_CD, ASSET_AMT, INCOME_AMT,
REVENUE_AMT, NTEE_CD, SORT_NAME, ACCT_PD, REGION, RYEAR, ID

What Gets Saved:

File Format Contents
Processed BMF CSV ~63 transformed columns only
Data Dictionary CSV Column names, types, descriptions, sample values
Quality Report JSON Same report as Phase 10
Quality Report HTML Rendered visualization for GitHub Pages

Local Files:

  • data/processed/bmf_YYYY_MM_processed.csv
  • data/processed/bmf_YYYY_MM_data_dictionary.csv
  • docs/quality-reports/bmf_YYYY_MM_quality_report.html

S3 Upload (when ENABLE_S3_UPLOAD = TRUE):

Uses upload_processed_bmf() function to upload to the processed folder:

  • s3://nccsdata/processed/bmf/YYYY_MM/bmf_YYYY_MM_processed.csv
  • s3://nccsdata/processed/bmf/YYYY_MM/bmf_YYYY_MM_data_dictionary.csv
  • s3://nccsdata/processed/bmf/YYYY_MM/bmf_YYYY_MM_quality_report.json

3.2.12 Output Comparison

flowchart LR
    classDef node fill:#d2d2d2,stroke:#333,color:#000

    subgraph Phase10[Phase 10: Intermediate]
        I1[(Parquet)]
        I2[(Quality JSON)]
    end

    subgraph Phase11[Phase 11: Processed]
        P1[(CSV)]
        P2[(Dictionary)]
        P3[(Quality JSON + HTML)]
    end

    subgraph S3Int[S3: intermediate/bmf/]
        S3I1["intermediate.parquet"]
        S3I2["quality_report.json"]
    end

    subgraph S3Proc[S3: processed/bmf/]
        S3P1["processed.csv"]
        S3P2["data_dictionary.csv"]
        S3P3["quality_report.json"]
    end

    I1 --> S3I1
    I2 --> S3I2
    P1 --> S3P1
    P2 --> S3P2
    P3 --> S3P3

    style Phase10 fill:#fee39b,stroke:#fdbf11
    style Phase11 fill:#fee39b,stroke:#fdbf11
    style S3Int fill:#fdbf11,stroke:#d19c0f
    style S3Proc fill:#fdbf11,stroke:#d19c0f

When to Use Each Output:

Use Case Recommended Output
Research and analysis Processed CSV
Debugging transformations Intermediate Parquet
Auditing data lineage Intermediate Parquet
Building downstream applications Processed CSV + Data Dictionary
Investigating quality issues Quality Report JSON/HTML

3.3 Checkpoint System

The pipeline saves intermediate results as Parquet files after each major phase:

flowchart LR
    classDef node fill:#d2d2d2,stroke:#333,color:#000

    subgraph Checkpoints[Checkpoints]
        CP1[01_raw] --> CP2[02_identity]
        CP2 --> CP3[03_classification]
        CP3 --> CP4[04_activity]
        CP4 --> CP5[05_temporal]
        CP5 --> CP6[06_financial]
        CP6 --> CP7[07_filing]
    end

    style Checkpoints fill:#fee39b,stroke:#fdbf11

Checkpoint Location After Phase
01_raw data/checkpoints/bmf_YYYY_MM_01_raw.parquet Pre-Validation
02_identity data/checkpoints/bmf_YYYY_MM_02_identity.parquet Identity
03_classification data/checkpoints/bmf_YYYY_MM_03_classification.parquet Classification
04_activity data/checkpoints/bmf_YYYY_MM_04_activity.parquet Activity
05_temporal data/checkpoints/bmf_YYYY_MM_05_temporal.parquet Temporal
06_financial data/checkpoints/bmf_YYYY_MM_06_financial.parquet Financial
07_filing data/checkpoints/bmf_YYYY_MM_07_filing.parquet Filing

Benefits: - Resume from failure without reprocessing - Debug specific phases - Compare intermediate states

To resume from a checkpoint:

bmf <- load_checkpoint("03_classification")
# Continue processing from Phase 4

3.4 File Organization

R/
├── run_pipeline.R               # ORCHESTRATOR
├── config.R                     # S3 configuration and lookups
├── checkpoints.R                # Save/load pipeline checkpoints
├── input_validation.R           # Validation functions
│
├── utils/
│   ├── logging.R                # log_info(), log_phase_start()
│   └── transform_utils.R        # Shared transform utilities
│
├── quality/
│   ├── pre_checks.R             # validate_raw_bmf_structure()
│   ├── post_checks.R            # generate_quality_report()
│   └── quality_report_template.qmd  # HTML/PDF report template
│
├── ein.R                        # Phase 3
├── organization_name.R          # Phase 3
├── dba_name.R                   # Phase 3
├── ico_name.R                   # Phase 3
├── group_exemption_number.R     # Phase 3
├── ruling_date.R                # Phase 3
├── address.R                    # Phase 3 (17 output columns)
│
├── subsection_classification_codes.R  # Phase 4
├── affiliation_code.R           # Phase 4
├── deductibility_code.R         # Phase 4
├── foundation_code.R            # Phase 4
├── organization_code.R          # Phase 4
├── status_code.R                # Phase 4
│
├── activity_code.R              # Phase 5
├── transform_ntee_code.R        # Phase 5
│
├── transform_tax_period.R       # Phase 6
├── accounting_period.R          # Phase 6
│
├── financial_codes.R            # Phase 7
├── asset_amount.R               # Phase 7
│
└── filing_requirement_code.R    # Phase 8

3.5 Configuration

Key configuration in R/run_pipeline.R:

# Pipeline settings (set at top of run_pipeline.R)
ENABLE_CHECKPOINTS <- TRUE       # Save intermediate results
CHECKPOINT_DIR <- "data/checkpoints"
STRICT_QUALITY_GATES <- TRUE     # Stop on validation failure
ENABLE_S3_UPLOAD <- TRUE

# User input: set before sourcing to download a specific BMF file
BMF_YEAR <- NULL    # e.g., 2025 (NULL = most recent)
BMF_MONTH <- NULL   # e.g., 1 (NULL = most recent)

# Pipeline-derived: extracted from downloaded file, used for output naming
PROCESSING_YEAR <- NULL
PROCESSING_MONTH <- NULL

Year/Month Variables:

Variable Set By Purpose
BMF_YEAR User (optional) Request specific BMF file to download. If NULL, downloads most recent.
BMF_MONTH User (optional) Request specific BMF file to download. If NULL, downloads most recent.
PROCESSING_YEAR Pipeline Extracted from downloaded file; used to name output files.
PROCESSING_MONTH Pipeline Extracted from downloaded file; used to name output files.

To process a specific month, set BMF_YEAR and BMF_MONTH before sourcing:

BMF_YEAR <- 2025
BMF_MONTH <- 1
source("R/run_pipeline.R")

S3 configuration in R/config.R:

# S3 bucket configuration
BMF_S3_BUCKET <- "nccsdata"
BMF_S3_PREFIX <- "raw/bmf/"
BMF_S3_INTERMEDIATE_PREFIX <- "intermediate/bmf/"
BMF_S3_PROCESSED_PREFIX <- "processed/bmf/"

# Download functions
download_bmf_from_s3(bucket, prefix, year, month)
list_available_bmf_files(bucket, prefix)