flowchart TB
classDef node fill:#d2d2d2,stroke:#333,color:#000
subgraph Acquisition[Data Acquisition]
P1["(1) Extraction"]
P2["(2) Pre-Validation"]
P1 --> P2
end
subgraph Transformation[Data Transformation]
P3["(3) Identity"]
P4["(4) Classification"]
P5["(5) Activity"]
P6["(6) Temporal"]
P7["(7) Financial"]
P8["(8) Filing"]
P3 --> P4 --> P5 --> P6 --> P7 --> P8
end
subgraph Output[Data Output]
P9["(9) Post-Validation"]
P10["(10) Intermediate"]
P11["(11) Processed"]
P9 --> P10 --> P11
end
Acquisition --> Transformation
Transformation --> Output
style Acquisition fill:#fff9e6,stroke:#fdbf11
style Transformation fill:#fee39b,stroke:#fdbf11
style Output fill:#fdbf11,stroke:#d19c0f
2 Pipeline Architecture
3 Pipeline Overview
The BMF processing pipeline is organized into 11 sequential phases, each with a specific responsibility. The pipeline is orchestrated by R/run_pipeline.R.
3.1 11-Phase Pipeline
3.2 Phase Details
3.2.1 Phase 1: Extraction
Downloads BMF data from S3 bucket nccsdata. A Lambda function ingests monthly BMF files from the IRS and deposits them at nccsdata/raw/bmf/YYYY-MM-BMF.csv.
The pipeline: 1. Connects to S3 using aws.s3 package 2. Downloads the most recent BMF file (or a specified year/month) 3. Extracts processing year and month from filename 4. Saves local copy to data/raw/bmf_YYYY_MM.csv
# Download most recent BMF
bmf_raw <- download_bmf_from_s3()
# Download specific month
bmf_raw <- download_bmf_from_s3(year = 2025, month = 1)
# List available files
list_available_bmf_files()
# [1] "2025-01" "2024-12" "2024-11" ...3.2.2 Phase 2: Pre-Transformation Validation
Validates the raw data before any transformations:
- Column Check: Ensures all 31 required columns exist
- Duplicate Detection: Counts duplicate EINs
- NULL Inventory: Reports NULL counts per column
If validation fails in strict mode, the pipeline stops.
3.2.3 Phase 3: Identity Transformations
Transforms organization identification fields:
| Transform | Input | Outputs |
|---|---|---|
transform_ein() |
EIN | ein, ein_raw |
transform_organization_name() |
NAME | org_name_raw, org_name_join, org_name_display, org_legal_suffix, org_parent_name |
transform_dba_name() |
SORT_NAME | dba_name, dba_name_raw |
transform_bmf_ico_name() |
ICO | in_care_of_name_raw, in_care_of_name_clean, in_care_of_name_provided |
transform_bmf_group_exemption_number() |
GROUP | group_exemption_number_raw, group_exemption_number, group_exemption_is_member |
transform_bmf_ruling_date() |
RULING | ruling_date_ym_str, ruling_date, ruling_date_is_missing |
transform_address() |
STREET, CITY, STATE, ZIP | 17 columns (see Transform Reference) |
3.2.4 Phase 4: Classification Transformations
Adds IRS classification codes with human-readable definitions. All lookup tables are sheets in bmf_code_lookup.xlsx:
| Transform | Lookup Sheet | Outputs |
|---|---|---|
transform_bmf_subsection_classification_codes() |
subsection_classification_code |
subsection_code, classification_code, exempt_organization_type, all_classifications_string |
transform_bmf_affiliation_code() |
affiliation_code |
affiliation_code, affiliation_code_definition |
transform_bmf_deductibility_code() |
deductibility_code |
deductibility_code, deductibility_code_definition |
transform_bmf_foundation_code() |
foundation_code |
foundation_code, foundation_code_definition |
transform_bmf_organization_code() |
organization_code |
organization_code, organization_code_definition |
transform_bmf_status_code() |
status_code |
status_code, status_code_definition |
3.2.5 Phase 5: Activity Transformations
Transforms multi-valued activity fields. Internally creates dimension tables for joins, since each activity code references multiple activities.
| Transform | Pattern | Outputs |
|---|---|---|
transform_bmf_activity_code() |
Dimension table + Aggregate | activity_code, activity_code_definitions, activity_code_categories |
transform_ntee_code() |
Complex | ntee_code_raw, ntee_code_clean, ntee_code_definition, ntee_code_major_group, ntee_common_code, ntee_common_code_definition, naics_code, nteev2, nteev2_code, nteev2_subsector, nteev2_org_type |
3.2.6 Phase 6: Temporal Transformations
Parses date and period fields:
| Transform | Input | Outputs |
|---|---|---|
transform_tax_period() |
TAX_PERIOD | tax_period_ym_str, tax_period_ymd, tax_period_is_missing |
transform_accounting_period() |
ACCT_PD | accounting_period |
3.2.7 Phase 7: Financial Transformations
Processes financial codes and amounts:
| Transform | Input | Outputs |
|---|---|---|
transform_bmf_asset_code() |
ASSET_CD | asset_code, asset_code_definition |
transform_bmf_income_code() |
INCOME_CD | income_code, income_code_definition |
transform_asset_amount() |
ASSET_AMT | asset_amount |
transform_income_amount() |
INCOME_AMT | income_amount |
transform_revenue_amount() |
REVENUE_AMT | revenue_amount |
3.2.8 Phase 8: Filing Requirement Transformations
Adds filing requirement classifications:
| Transform | Input | Outputs |
|---|---|---|
transform_bmf_filing_requirement_code() |
FILING_REQ_CD | filing_requirement_code, filing_requirement_code_definition |
transform_bmf_pf_filing_requirement_code() |
PF_FILING_REQ_CD | pf_filing_requirement_code, pf_filing_requirement_code_definition |
3.2.9 Phase 9: Post-Transformation Validation
Generates a comprehensive quality report:
- Row preservation check (input rows = output rows)
- Completeness rates for all columns
- Critical field validation (EIN)
- Summary statistics
3.2.10 Phase 10: Intermediate Output
Saves the complete BMF with all columns (both raw IRS columns and transformed columns) to local storage and S3. This preserves the full data lineage for debugging and auditing.
What Gets Saved:
| File | Format | Contents |
|---|---|---|
| Intermediate BMF | Parquet | All ~107 columns (31 raw + ~76 transformed) |
| Quality Report | JSON | Validation metrics and summary statistics |
Local Files:
data/intermediate/bmf_YYYY_MM_intermediate.parquetdata/quality/bmf_YYYY_MM_quality_report.json
S3 Upload (when ENABLE_S3_UPLOAD = TRUE):
Uses upload_bmf_results() function to upload to the intermediate folder:
s3://nccsdata/intermediate/bmf/YYYY_MM/bmf_YYYY_MM_intermediate.parquets3://nccsdata/intermediate/bmf/YYYY_MM/bmf_YYYY_MM_quality_report.json
3.2.11 Phase 11: Processed Output
Creates a clean version of the BMF by removing the original IRS columns, keeping only transformed columns. Also generates a data dictionary describing each column.
Raw Columns Removed (31 total):
EIN, NAME, ICO, STREET, CITY, STATE, ZIP, GROUP, SUBSECTION,
AFFILIATION, CLASSIFICATION, RULING, DEDUCTIBILITY, FOUNDATION,
ACTIVITY, ORGANIZATION, STATUS, TAX_PERIOD, ASSET_CD, INCOME_CD,
FILING_REQ_CD, PF_FILING_REQ_CD, ASSET_AMT, INCOME_AMT,
REVENUE_AMT, NTEE_CD, SORT_NAME, ACCT_PD, REGION, RYEAR, ID
What Gets Saved:
| File | Format | Contents |
|---|---|---|
| Processed BMF | CSV | ~63 transformed columns only |
| Data Dictionary | CSV | Column names, types, descriptions, sample values |
| Quality Report | JSON | Same report as Phase 10 |
| Quality Report | HTML | Rendered visualization for GitHub Pages |
Local Files:
data/processed/bmf_YYYY_MM_processed.csvdata/processed/bmf_YYYY_MM_data_dictionary.csvdocs/quality-reports/bmf_YYYY_MM_quality_report.html
S3 Upload (when ENABLE_S3_UPLOAD = TRUE):
Uses upload_processed_bmf() function to upload to the processed folder:
s3://nccsdata/processed/bmf/YYYY_MM/bmf_YYYY_MM_processed.csvs3://nccsdata/processed/bmf/YYYY_MM/bmf_YYYY_MM_data_dictionary.csvs3://nccsdata/processed/bmf/YYYY_MM/bmf_YYYY_MM_quality_report.json
3.2.12 Output Comparison
flowchart LR
classDef node fill:#d2d2d2,stroke:#333,color:#000
subgraph Phase10[Phase 10: Intermediate]
I1[(Parquet)]
I2[(Quality JSON)]
end
subgraph Phase11[Phase 11: Processed]
P1[(CSV)]
P2[(Dictionary)]
P3[(Quality JSON + HTML)]
end
subgraph S3Int[S3: intermediate/bmf/]
S3I1["intermediate.parquet"]
S3I2["quality_report.json"]
end
subgraph S3Proc[S3: processed/bmf/]
S3P1["processed.csv"]
S3P2["data_dictionary.csv"]
S3P3["quality_report.json"]
end
I1 --> S3I1
I2 --> S3I2
P1 --> S3P1
P2 --> S3P2
P3 --> S3P3
style Phase10 fill:#fee39b,stroke:#fdbf11
style Phase11 fill:#fee39b,stroke:#fdbf11
style S3Int fill:#fdbf11,stroke:#d19c0f
style S3Proc fill:#fdbf11,stroke:#d19c0f
When to Use Each Output:
| Use Case | Recommended Output |
|---|---|
| Research and analysis | Processed CSV |
| Debugging transformations | Intermediate Parquet |
| Auditing data lineage | Intermediate Parquet |
| Building downstream applications | Processed CSV + Data Dictionary |
| Investigating quality issues | Quality Report JSON/HTML |
3.3 Checkpoint System
The pipeline saves intermediate results as Parquet files after each major phase:
flowchart LR
classDef node fill:#d2d2d2,stroke:#333,color:#000
subgraph Checkpoints[Checkpoints]
CP1[01_raw] --> CP2[02_identity]
CP2 --> CP3[03_classification]
CP3 --> CP4[04_activity]
CP4 --> CP5[05_temporal]
CP5 --> CP6[06_financial]
CP6 --> CP7[07_filing]
end
style Checkpoints fill:#fee39b,stroke:#fdbf11
| Checkpoint | Location | After Phase |
|---|---|---|
01_raw |
data/checkpoints/bmf_YYYY_MM_01_raw.parquet |
Pre-Validation |
02_identity |
data/checkpoints/bmf_YYYY_MM_02_identity.parquet |
Identity |
03_classification |
data/checkpoints/bmf_YYYY_MM_03_classification.parquet |
Classification |
04_activity |
data/checkpoints/bmf_YYYY_MM_04_activity.parquet |
Activity |
05_temporal |
data/checkpoints/bmf_YYYY_MM_05_temporal.parquet |
Temporal |
06_financial |
data/checkpoints/bmf_YYYY_MM_06_financial.parquet |
Financial |
07_filing |
data/checkpoints/bmf_YYYY_MM_07_filing.parquet |
Filing |
Benefits: - Resume from failure without reprocessing - Debug specific phases - Compare intermediate states
To resume from a checkpoint:
bmf <- load_checkpoint("03_classification")
# Continue processing from Phase 43.4 File Organization
R/
├── run_pipeline.R # ORCHESTRATOR
├── config.R # S3 configuration and lookups
├── checkpoints.R # Save/load pipeline checkpoints
├── input_validation.R # Validation functions
│
├── utils/
│ ├── logging.R # log_info(), log_phase_start()
│ └── transform_utils.R # Shared transform utilities
│
├── quality/
│ ├── pre_checks.R # validate_raw_bmf_structure()
│ ├── post_checks.R # generate_quality_report()
│ └── quality_report_template.qmd # HTML/PDF report template
│
├── ein.R # Phase 3
├── organization_name.R # Phase 3
├── dba_name.R # Phase 3
├── ico_name.R # Phase 3
├── group_exemption_number.R # Phase 3
├── ruling_date.R # Phase 3
├── address.R # Phase 3 (17 output columns)
│
├── subsection_classification_codes.R # Phase 4
├── affiliation_code.R # Phase 4
├── deductibility_code.R # Phase 4
├── foundation_code.R # Phase 4
├── organization_code.R # Phase 4
├── status_code.R # Phase 4
│
├── activity_code.R # Phase 5
├── transform_ntee_code.R # Phase 5
│
├── transform_tax_period.R # Phase 6
├── accounting_period.R # Phase 6
│
├── financial_codes.R # Phase 7
├── asset_amount.R # Phase 7
│
└── filing_requirement_code.R # Phase 8
3.5 Configuration
Key configuration in R/run_pipeline.R:
# Pipeline settings (set at top of run_pipeline.R)
ENABLE_CHECKPOINTS <- TRUE # Save intermediate results
CHECKPOINT_DIR <- "data/checkpoints"
STRICT_QUALITY_GATES <- TRUE # Stop on validation failure
ENABLE_S3_UPLOAD <- TRUE
# User input: set before sourcing to download a specific BMF file
BMF_YEAR <- NULL # e.g., 2025 (NULL = most recent)
BMF_MONTH <- NULL # e.g., 1 (NULL = most recent)
# Pipeline-derived: extracted from downloaded file, used for output naming
PROCESSING_YEAR <- NULL
PROCESSING_MONTH <- NULLYear/Month Variables:
| Variable | Set By | Purpose |
|---|---|---|
BMF_YEAR |
User (optional) | Request specific BMF file to download. If NULL, downloads most recent. |
BMF_MONTH |
User (optional) | Request specific BMF file to download. If NULL, downloads most recent. |
PROCESSING_YEAR |
Pipeline | Extracted from downloaded file; used to name output files. |
PROCESSING_MONTH |
Pipeline | Extracted from downloaded file; used to name output files. |
To process a specific month, set BMF_YEAR and BMF_MONTH before sourcing:
BMF_YEAR <- 2025
BMF_MONTH <- 1
source("R/run_pipeline.R")S3 configuration in R/config.R:
# S3 bucket configuration
BMF_S3_BUCKET <- "nccsdata"
BMF_S3_PREFIX <- "raw/bmf/"
BMF_S3_INTERMEDIATE_PREFIX <- "intermediate/bmf/"
BMF_S3_PROCESSED_PREFIX <- "processed/bmf/"
# Download functions
download_bmf_from_s3(bucket, prefix, year, month)
list_available_bmf_files(bucket, prefix)