Skip to content

Multi-Model Pipelines

Bridge Town projects can contain multiple Python models that compose a multi-step financial pipeline — for example, revenueexpensessummary, where each model reads the outputs of the model before it. This page explains how to wire them together using the PIPELINE list in run.py, how the branch-scoped /upstream mount carries intermediate outputs between models, and where Google Sheets fit (spoiler: they are external I/O, not intra-run transport).

PathWhat it carriesScopeAccess
/data/CSV/Excel/Google Sheet snapshotsProject-wide, all runsRead-only
/upstream/Intermediate outputs from earlier pipeline stepsThis run, this branchRead-only after upstream model writes

/data/ is for external inputs. It holds immutable Parquet snapshots produced by upload_data or connect_google_sheet. Every model run — whether single-model or project-wide — can read from /data/. The data inside does not change during a run; freshness is controlled by when you call connect_google_sheet or upload a new file.

/upstream/ is the pipeline bus. It exists only during a run(scope='project') call and only when PIPELINE is defined in run.py. As each model in the pipeline completes, its runtime output dict (a module-level result dict, or a legacy outputs dict) is serialised to /upstream/<model_name>/outputs.json so the next model in the list can read it. /upstream/ transport requires the model to define a runtime output dict — models that write directly to /outputs/ without defining result (or a legacy outputs dict) execute normally but are not materialised to /upstream/ and cannot be consumed by downstream models via this path. When the run finishes, /upstream/ is discarded — it is not persisted to S3 and is not visible between runs or branches.

In run.py, define a top-level PIPELINE list naming your models in execution order:

run.py
PIPELINE = ["revenue", "expenses", "summary"]

When run(scope='project') executes run.py, it detects PIPELINE and:

  1. Runs model/revenue.py; writes its result dict (or legacy outputs dict) to /upstream/revenue/outputs.json.
  2. Runs model/expenses.py; writes its result dict (or legacy outputs dict) to /upstream/expenses/outputs.json.
  3. Runs model/summary.py.

Each /upstream/ write only occurs when the model defines a runtime output dict — either a module-level result dict (preferred, pairs with the outputs = [...] contract metadata convention) or a legacy module-level outputs dict. A model that writes directly to /outputs/ without defining either runs and its files appear in run(scope='project') results, but it is not materialised to /upstream/ for downstream models.

Models not in PIPELINE are skipped. If a model listed in PIPELINE does not exist, it is skipped with a warning on stderr and execution continues.

Without PIPELINE, the scaffold auto-discovers all model/*.py files and runs them in alphabetical order (original behavior; no upstream materialisation).

The /upstream first, /data fallback pattern

Section titled “The /upstream first, /data fallback pattern”

Downstream models should prefer /upstream/ when available and fall back to a /data/ snapshot when running outside a pipeline context (for example, during a single-model run(scope='model') call or early development before all pipeline stages exist):

model/summary.py
"""Combine revenue and expense outputs into an executive summary."""
import json
import pathlib
import pandas as pd
MONTHS = 12
# --- revenue inputs ---
_upstream_rev = pathlib.Path("/upstream/revenue/outputs.json")
_data_rev = pathlib.Path("/data/revenue_actuals") # Parquet snapshot fallback
if _upstream_rev.exists():
rev_data = json.loads(_upstream_rev.read_text())
total_revenue = sum(rev_data.get("monthly_revenue", [0] * MONTHS))
else:
# Standalone run: read the Google Sheet snapshot instead.
df = pd.read_parquet(str(_data_rev))
total_revenue = float(df["revenue"].sum())
# --- expense inputs ---
_upstream_exp = pathlib.Path("/upstream/expenses/outputs.json")
_data_exp = pathlib.Path("/data/expense_actuals")
if _upstream_exp.exists():
exp_data = json.loads(_upstream_exp.read_text())
total_expenses = sum(exp_data.get("monthly_expenses", [0] * MONTHS))
else:
df = pd.read_parquet(str(_data_exp))
total_expenses = float(df["expenses"].sum())
# --- summary ---
inputs = ["monthly_revenue", "monthly_expenses"]
outputs = ["total_revenue", "total_expenses", "net_income"]
dependencies = ["revenue", "expenses"]
result = {
"total_revenue": round(total_revenue, 2),
"total_expenses": round(total_expenses, 2),
"net_income": round(total_revenue - total_expenses, 2),
}

The fallback branch makes summary.py runnable as a standalone model (useful during development), while the primary branch composes the full pipeline result when run via run(scope='project').

Here is a minimal but complete project showing the revenue → expenses → summary chain.

run.py (scaffold with PIPELINE defined):

"""Bridge Town model entry point — auto-discovery scaffold.
# BT-SCAFFOLD-SENTINEL
"""
PIPELINE = ["revenue", "expenses", "summary"]
# (The rest of this file is the standard Bridge Town scaffold.)

model/revenue.py:

"""12-month SaaS revenue projection."""
MONTHS = 12
BASE_ARR = 1_200_000
GROWTH_RATE = 0.07
monthly = []
arr = BASE_ARR
for _ in range(MONTHS):
monthly.append(round(arr / 12, 2))
arr *= 1 + GROWTH_RATE
inputs = ["base_arr", "growth_rate"]
outputs = ["monthly_revenue", "arr_eoy"]
dependencies = []
# Legacy runtime output pattern (dict) — still supported for execution.
result = {"monthly_revenue": monthly, "arr_eoy": round(arr, 2)}

model/expenses.py:

"""Monthly expense model — headcount + opex."""
import json, pathlib
_upstream = pathlib.Path("/upstream/revenue/outputs.json")
if _upstream.exists():
rev = json.loads(_upstream.read_text())
# Size opex as a percentage of revenue.
monthly_revenue = rev.get("monthly_revenue", [100_000] * 12)
else:
monthly_revenue = [100_000] * 12
HEADCOUNT_COST = 80_000 # per month
OPEX_PCT_OF_REVENUE = 0.18
monthly = [
round(HEADCOUNT_COST + r * OPEX_PCT_OF_REVENUE, 2)
for r in monthly_revenue
]
inputs = ["monthly_revenue"]
outputs = ["monthly_expenses"]
dependencies = ["revenue"]
result = {"monthly_expenses": monthly}

model/summary.py: (see /upstream first, /data fallback example above)

Running run(scope='project') on this project produces combined output keyed by model name and triggers downstream scenario analysis naturally via compare_branches.

When you run scenario analysis (run_scenario_analysis), use project runs — not Google Sheets — as the comparison mechanism:

  1. Create a scenario branch: create_branch with branch_name="scenario/upside".
  2. Edit the assumption in the relevant model on that branch: patch_file.
  3. Run the full pipeline: run(scope='project') with branch="scenario/upside".
  4. Compare outputs: compare_branches with base_branch="main" and scenario_branch="scenario/upside".

Because /upstream/ materialises all intermediate model outputs during each run, compare_branches can diff the full pipeline — not just the model you edited. Changes propagate automatically through the dependency chain.

Do not export intermediate results to Google Sheets and import them back on a different branch to compare scenarios. This breaks the branch-scoped isolation that compare_branches depends on.

When writing a downstream model that reads from /upstream/:

  • Use pathlib.Path("/upstream/<model_name>/outputs.json").exists() to guard the read — never assume the file is there.
  • Provide a /data/ or constant fallback so the model is runnable standalone.
  • Keep the upstream read near the top of the file, before any computation.
  • Do not write back to /upstream/ — it is managed by run.py.