Traffic Assignment Validation

Validating the Observed vs Modeled Traffic Volume

This notebook validates the traffic volume assigned by the Travel Demand Model against the observed traffic volume data captured by the Continuous Counting Stations (CCS).
Author
Affiliation

Pukar Bhandari

Published

December 24, 2025

1 Overview

This document validates the Travel Demand Model’s traffic assignment by comparing modeled volumes against observed counts from Continuous Count Stations (CCS).

The process is divided into modular steps to support a “Minimal Documentation” workflow: 1. Bridge Creation: Mapping CCS Stations to Network Segments (Static). 2. Observed Data: Aggregating raw telemetry data (Static). 3. Model Data: Loading specific scenario results (Dynamic). 4. Validation: Merging the three sources for visualization.

2 Environment Setup

Import Standard Libraries

Show the code
# For Analysis
import numpy as np
import pandas as pd
import geopandas as gpd

# For Visualization
import matplotlib.pyplot as plt
import seaborn as sns
import mapclassify

# misc
import os
import requests
from pathlib import Path
import importlib.util

from dotenv import load_dotenv

load_dotenv()
True

Setup Directories & Variables

Show the code
CRS_UTM = "EPSG:26912"

# Define Output Paths
intermediate_dir = Path("_intermediate")
output_dir = Path("_output")
raw_dir = Path("_data/raw")

# Create directories if they don't exist
intermediate_dir.mkdir(parents=True, exist_ok=True)
output_dir.mkdir(parents=True, exist_ok=True)
raw_dir.mkdir(parents=True, exist_ok=True)

print("Directories validated.")
Directories validated.

3 Helper Functions

Show the code
def fetch_github(
    url: str,
    mode: str = "private",
    token_env_var: str = "GITHUB_TOKEN"
) -> requests.Response:
    """
    Fetch content from GitHub repositories.

    Args:
        url: GitHub raw URL (e.g., https://raw.githubusercontent.com/...)
        mode: "public" for public repos, "private" for private repos requiring authentication
        token_env_var: Name of environment variable containing GitHub token (default: GITHUB_TOKEN)

    Returns:
        requests.Response object

    Raises:
        ValueError: If token is missing for private mode or invalid mode
        requests.HTTPError: If request fails
    """
    # Validate mode
    if mode not in ["public", "private"]:
        raise ValueError(f"mode must be 'public' or 'private', got '{mode}'")

    if mode == "public":
        response = requests.get(url, timeout=30)
    else:
        token = os.getenv(token_env_var)
        if not token:
            raise ValueError(
                f"GitHub token not found in environment variable '{token_env_var}'. "
                f"Check your .env file has: {token_env_var}=your_token_here"
            )

        headers = {
            'Authorization': f'token {token}',
            'Accept': 'application/vnd.github.v3.raw'
        }
        response = requests.get(url, headers=headers, timeout=30)

    response.raise_for_status()
    return response
Show the code
# Create function to read ArcGIS FeatureLayer or Table
def arc_read(url, where="1=1", outFields="*", outSR=4326, **kwargs):
    """
    Read an ArcGIS FeatureLayer or Table to a GeoDataFrame.

    Parameters:
    url (str): The ArcGIS REST service URL (e.g., ending in /FeatureServer/0)
    where (str): SQL WHERE clause for filtering. Default: "1=1"
    outFields (str): Comma-separated field names. Default: "*"
    outSR (int): Output spatial reference EPSG code. Default: 4326
    **kwargs: Additional query parameters passed to the ArcGIS REST API

    Returns:
    geopandas.GeoDataFrame: Spatial data from the service
    """
    # Ensure URL ends with /query
    if not url.endswith("/query"):
        url = url.rstrip("/") + "/query"

    # Build query parameters
    params = {
        "where": where,
        "outFields": outFields,
        "returnGeometry": "true",
        "outSR": outSR,
        "f": "geojson",
    }

    # Add any additional parameters
    params.update(kwargs)

    # Make request
    response = requests.get(url, params=params)
    response.raise_for_status()  # Good practice to check for HTTP errors

    # Read as GeoDataFrame
    # We use io.BytesIO to handle the response content safely for read_file
    import io

    return gpd.read_file(io.BytesIO(response.content), engine="pyogrio")

4 Step 1: Create Station-to-Network Bridge

Objective: Create a static reference file that links Station IDs to Network SEGIDs and attaches static network attributes (County, Functional Class, Truck %).

Load Network (Input Shapefile)

We load the raw input shapefile. We do not filter by model results here, ensuring the bridge is independent of any specific model run.

Show the code
# Load Segments (Input)
# Source: https://github.com/WFRCAnalytics/WF-TDM-Documentation/tree/v10-calib/v10x/v1000/validation/data/5-assignhwy/seg_shp
gdf_segments = gpd.read_file(
    raw_dir / "Segments/WFv1000_Segments.shp",
    engine="pyogrio",
).to_crs(CRS_UTM)

# Ensure SEGID is string for consistent matching
gdf_segments["SEGID"] = gdf_segments["SEGID"].astype(str)

# Parse Route and Milepost from SEGID directly (Independent of Model Results)
temp_route = gdf_segments["SEGID"].str.split("_").str[0]
gdf_segments["MODEL_ROUTE"] = pd.to_numeric(temp_route, errors="coerce")
gdf_segments["MODEL_MP"] = gdf_segments["SEGID"].str.split("_").str[1].astype(float)

# Filter to valid network links for matching
gdf_network_clean = gdf_segments.dropna(subset=["MODEL_ROUTE", "MODEL_MP"]).copy()
gdf_network_clean["MODEL_ROUTE"] = gdf_network_clean["MODEL_ROUTE"].astype(int)

print(f"Loaded Network with {len(gdf_network_clean)} valid segments.")
Loaded Network with 8078 valid segments.

WF-TDM Region

Show the code
# Configuration
service_url = "https://services1.arcgis.com/taguadKoI1XFwivx/ArcGIS/rest/services/RegionalBoundaryComponents/FeatureServer/0"
gdb_path = raw_dir / "RegionalBoundary.gpkg"

# Download/Load logic
if not gdb_path.exists():
    print("Downloading Regional Boundaries...")
    gdf_download = arc_read(service_url)
    gdf_download.to_file(gdb_path, layer="RegionalBoundaryComponents", driver="GPKG")
else:
    print("Loading Regional Boundaries from cache...")

# Load and Project
gdf_region = (
    gpd.read_file(
        gdb_path,
        layer="RegionalBoundaryComponents",
        where="PlanOrg IN ('MAG MPO', 'WFRC MPO')",
    )
    .dissolve()
    .to_crs(CRS_UTM)
)

# Fix the holes and slivers in the regional boundary
gdf_region["geometry"] = gdf_region.buffer(1).buffer(-1)
Loading Regional Boundaries from cache...

Load CCS Stations

Show the code
# Configuration
service_url = "https://services.arcgis.com/pA2nEVnB6tquxgOW/ArcGIS/rest/services/CCS_Data_v2/FeatureServer/1"
gdb_path = raw_dir / "CCS_Data_v2.gpkg"

# Download/Load logic
if not gdb_path.exists():
    print("Downloading CCS Stations...")
    gdf_download = arc_read(service_url)
    if "SITEID" in gdf_download.columns:
        gdf_download["SITEID"] = gdf_download["SITEID"].astype("int32")
    gdf_download.to_file(gdb_path, layer="CCS_Data_v2", driver="GPKG")
else:
    print("Loading CCS Stations from cache...")

# Load and Project
gdf_ccs_locations = gpd.read_file(
    gdb_path,
    layer="CCS_Data_v2",
    # where="COUNTY IN ('Box Elder', 'Weber', 'Davis', 'Salt Lake', 'Utah')",
    mask=gdf_region,
).to_crs(CRS_UTM)

print(f"Total Stations Loaded (Region): {len(gdf_ccs_locations)}")
Loading CCS Stations from cache...
Total Stations Loaded (Region): 126

Run Spatial Matching Logic

Show the code
# 1. Define FC Mapping (UDOT Station FC -> Model FUNCGROUP)
# Add or refine these mappings based on your data
fc_map = {
    # Freeways
    "Urban Principal Arterial - Interstate": "Freeway",
    "Rural Principal Arterial - Interstate": "Freeway",
    "Urban Principal Arterial - Other Freeways": "Freeway",  # Default mapping
    # Arterials
    "Urban Principal Arterial - Other": "Arterial",
    "Urban Minor Arterial": "Arterial",
    "Rural Principal Arterial - Other": "Arterial",
    "Rural Minor Arterial": "Arterial",
    # Collector/Distributor
    "Urban Major Collector": "CD Road",
    "Urban Minor Collector": "CD Road",
    "Urban Collector": "CD Road",
    "Rural Major Collector": "CD Road",
    "Urban Local System": "CD Road",  # Fallback
}
Show the code
match_results = []
used_segids = set()

print("Starting Route & Group Constrained Spatial Matching...")

for idx, ccs_row in gdf_ccs_locations.iterrows():
    station_id = ccs_row["STATION"]
    target_route = ccs_row["ROUTE"]

    # Identify the Target Group based on Station FC
    # Default to "Total" if FC is missing or unknown
    station_fc = ccs_row.get("FC", None)
    target_group = fc_map.get(station_fc, "Total")

    # --- PASS 1: Route-Constrained Spatial Match ---
    route_segments = gdf_network_clean[gdf_network_clean["MODEL_ROUTE"] == target_route]

    matched_segid = None
    match_type = "No Match"

    if not route_segments.empty:
        # Calculate distance to all segments on this route
        distances = route_segments.distance(ccs_row["geometry"])

        # Pick the absolute closest segment on this route
        best_idx = distances.idxmin()
        best_match = route_segments.loc[best_idx]

        matched_segid = best_match["SEGID"]
        match_type = "Route Match (Spatial)"

    # Add valid match to used set
    if matched_segid:
        used_segids.add(matched_segid)

    match_results.append(
        {
            "STATION": station_id,
            "MATCHED_SEGID": matched_segid,
            "MATCHED_GROUP": target_group,  # <--- NEW: Store the Target Group
            "MATCH_TYPE": match_type,
            "geometry": ccs_row["geometry"],
        }
    )

# --- PASS 2: Global Spatial Fallback (Unrestricted) ---
# (Same as your original code, but ensure we add MATCHED_GROUP = "Total" for fallbacks)
for i, result in enumerate(match_results):
    if result["MATCHED_SEGID"] is None:
        station_geom = result["geometry"]
        available_segments = gdf_network_clean[
            ~gdf_network_clean["SEGID"].isin(used_segids)
        ]

        if not available_segments.empty:
            distances = available_segments.distance(station_geom)
            min_dist_idx = distances.idxmin()
            best_match = available_segments.loc[min_dist_idx]

            match_results[i]["MATCHED_SEGID"] = best_match["SEGID"]
            match_results[i]["MATCHED_GROUP"] = "Total"  # Fallback is always Total
            match_results[i]["MATCH_TYPE"] = "Global Fallback (Nearest)"
            used_segids.add(best_match["SEGID"])

# Create DataFrame
df_matches = pd.DataFrame(match_results).drop(columns=["geometry"])
print("Matching Complete. Summary:")
print(df_matches["MATCH_TYPE"].value_counts())
Starting Route & Group Constrained Spatial Matching...
Matching Complete. Summary:
MATCH_TYPE
Route Match (Spatial)        120
Global Fallback (Nearest)      6
Name: count, dtype: int64

Enrich Bridge & Export

We attach the static network attributes here.

Show the code
# 1. Define Static Attributes from Network
static_cols = ["SEGID", "CO_FIPS", "SUTRUCKS", "CUTRUCKS"]

# 2. Map County Names
fips_map = {3: "Box Elder", 11: "Davis", 35: "Salt Lake", 49: "Utah", 57: "Weber"}

# Create a non-spatial attribute table
df_network_attrs = pd.DataFrame(gdf_network_clean[static_cols]).copy()
df_network_attrs["COUNTY_NAME"] = (
    df_network_attrs["CO_FIPS"].map(fips_map).fillna("Other")
)

# 3. Rename Truck Columns for standardization
df_network_attrs = df_network_attrs.rename(
    columns={"SUTRUCKS": "PCT_SUT", "CUTRUCKS": "PCT_CUT"}
)

# 4. Merge Matches with Attributes
df_bridge = df_matches.merge(
    df_network_attrs[["SEGID", "COUNTY_NAME", "PCT_SUT", "PCT_CUT"]],
    left_on="MATCHED_SEGID",
    right_on="SEGID",
    how="left",
)

# 5. Export to Intermediate
bridge_path = intermediate_dir / "station_segid_bridge.csv"
df_bridge.to_csv(bridge_path, index=False)
print(f"Step 1 Complete: Bridge exported to {bridge_path}")
Step 1 Complete: Bridge exported to _intermediate\station_segid_bridge.csv

5 Step 2: Load Observed Counts

Objective: Fetch raw count data and aggregate it. This creates the “Fact Table A”.

Setup BigQuery

Show the code
# Setup BigQuery
response = fetch_github(
    'https://raw.githubusercontent.com/WFRCAnalytics/Resources/refs/heads/master/2-Python/global-functions/BigQuery.py',
    mode="public"
)
BigQuery = importlib.util.module_from_spec(importlib.util.spec_from_loader('BigQuery', loader=None))
exec(response.text, BigQuery.__dict__)
client = BigQuery.getBigQueryClient_Confidential2023UtahHTS()
pukar.bhandari
C:/Users/Pukar.Bhandari/.private/confidential-2023-utah-hts-5fd7ddd219a7.json

Query Continouos Count Data

Show the code
# Query
query = f"""
    SELECT
        STATION,
        CAST(ROUND(AVG(AM_SUM)) AS INT64) as AVG_AM_VOL,
        CAST(ROUND(AVG(MD_SUM)) AS INT64) as AVG_MD_VOL,
        CAST(ROUND(AVG(PM_SUM)) AS INT64) as AVG_PM_VOL,
        CAST(ROUND(AVG(EV_SUM)) AS INT64) as AVG_EV_VOL,
        CAST(ROUND(AVG(DAILY_SUM)) AS INT64) as AVG_DY_VOL
    FROM (
        SELECT
            STATION,
            DATE_ONLY,
            SUM(CASE WHEN HOUR BETWEEN 6 AND 8 THEN VOLUME ELSE 0 END) as AM_SUM,
            SUM(CASE WHEN HOUR BETWEEN 9 AND 14 THEN VOLUME ELSE 0 END) as MD_SUM,
            SUM(CASE WHEN HOUR BETWEEN 15 AND 17 THEN VOLUME ELSE 0 END) as PM_SUM,
            SUM(CASE WHEN HOUR >= 18 OR HOUR <= 5 THEN VOLUME ELSE 0 END) as EV_SUM,
            SUM(VOLUME) as DAILY_SUM
        FROM `wfrc-modeling-data.src_udot_continuous_count_2023.data`
        WHERE
            DATE_ONLY BETWEEN '2023-09-01' AND '2023-11-15'
            AND EXTRACT(DAYOFWEEK FROM DATE_ONLY) IN (3, 4, 5)
        GROUP BY STATION, DATE_ONLY
    )
    GROUP BY STATION
    ORDER BY STATION
"""

df_observed_counts = client.query(query).to_dataframe()

Export Intermediate Data

Show the code
# Export
obs_path = intermediate_dir / "observed_counts_raw.csv"
df_observed_counts.to_csv(obs_path, index=False)
print(f"Step 2 Complete: Observed data exported to {obs_path}")
Step 2 Complete: Observed data exported to _intermediate\observed_counts_raw.csv

6 Step 3: Load Model Results (Dynamic)

Objective: Load the specific scenario results (“Fact Table B”).

Show the code
# Source: https://github.com/WFRCAnalytics/WF-TDM-Documentation/tree/v10-calib/v10x/v1000/validation/data/5-assignhwy
df_segid_detail = pd.read_csv(raw_dir / "WFv1000_BY_2023_Summary_SEGID_Detailed.csv")

# Filter for Bi-Directional Only
# REMOVED: (df_segid_detail["FUNCGROUP"] == "Total")
# We keep ALL groups (Freeway, Arterial, CD Road, Total)
df_model_clean = df_segid_detail[(df_segid_detail["DIRECTION"] == "Both")].copy()

# Ensure SEGID match format
df_model_clean["SEGID"] = df_model_clean["SEGID"].astype(str)

print(f"Step 3 Complete: Loaded {len(df_model_clean)} model records.")
Step 3 Complete: Loaded 8924 model records.

7 Step 4: Process & Validation (Integration)

Objective: Combine Bridge + Observed + Modeled into the final Dashboard CSV.

This step is designed to run independently if the CSVs exist.

Export Observed Data

Show the code
# Bridge + Observed (Inner Join = Only Stations with Data)
df_valid_obs = df_bridge.merge(df_observed_counts, on="STATION", how="inner")

# Master Observed File Generated for validation
df_valid_obs.to_csv(output_dir / "observed_traffic_with_crosswalk.csv", index=False)

Merge Data

Show the code
# --- 4.1 Load Intermediates (Resiliency Check) ---
# If this notebook is running top-to-bottom, we have the dfs.
# If running as a standalone script later, we load from CSV.

if "df_valid_obs" not in locals():
    print("Loading Bridge from CSV...")
    df_valid_obs = pd.read_csv(output_dir / "observed_traffic_with_crosswalk.csv")
Show the code
# --- 4.2 Merge Data ---

# B. Add Modeled Volumes AND Context Attributes
# We pull FTCLASS and ATYPENAME from the Model Results now
model_cols_needed = [
    "SEGID",
    "FUNCGROUP",
    "AM_Vol",
    "MD_Vol",
    "PM_Vol",
    "EV_Vol",
    "DY_Vol",
    "FTCLASS",
    "ATYPENAME",
]

# Safety check: ensure these columns actually exist in the model output
available_cols = [c for c in model_cols_needed if c in df_model_clean.columns]
df_model_subset = df_model_clean[available_cols].copy()

# Join 1: Attempt to match exact FUNCGROUP (e.g., Station Freeway -> Model Freeway)
df_join_specific = df_valid_obs.merge(
    df_model_subset,
    left_on=["MATCHED_SEGID", "MATCHED_GROUP"],
    right_on=["SEGID", "FUNCGROUP"],
    how="inner",
)

# Identify which Stations were successfully matched
matched_stations = set(df_join_specific["STATION"])

# Filter remaining unmatched stations
df_remaining = df_valid_obs[~df_valid_obs["STATION"].isin(matched_stations)].copy()

# Join 2: Fallback to "Total" for remaining stations
# (e.g., Station is Arterial, but Model only has a 'Total' row for this simple segment)
df_remaining["FALLBACK_GROUP"] = "Total"
df_join_fallback = df_remaining.merge(
    df_model_subset,
    left_on=["MATCHED_SEGID", "FALLBACK_GROUP"],
    right_on=["SEGID", "FUNCGROUP"],
    how="inner",
)

# C. Combine Results
df_master = pd.concat([df_join_specific, df_join_fallback], ignore_index=True)

# Fill missing model volumes with 0 (assuming no flow if missing from summary)
vol_cols = ["AM_Vol", "MD_Vol", "PM_Vol", "EV_Vol", "DY_Vol"]
df_master[vol_cols] = df_master[vol_cols].fillna(0)

Pivot & Calculate Vehicles

Show the code
# --- 4.3 Pivot & Calculate Vehicles ---

# Define ID variables (Dimensions)
id_vars = [
    "STATION",
    "MATCHED_SEGID",
    "FTCLASS",
    "ATYPENAME",
    "COUNTY_NAME",
    "PCT_SUT",
    "PCT_CUT",
]

# A. Melt Observed (Wide to Long)
df_obs_long = df_master.melt(
    id_vars=id_vars,
    value_vars=["AVG_AM_VOL", "AVG_MD_VOL", "AVG_PM_VOL", "AVG_EV_VOL"],
    var_name="PERIOD_KEY",
    value_name="OBSERVED_TOTAL",
)
df_obs_long["PERIOD"] = df_obs_long["PERIOD_KEY"].str.split("_").str[1]

# B. Melt Modeled (Wide to Long)
df_mod_long = df_master.melt(
    id_vars=id_vars,
    value_vars=["AM_Vol", "MD_Vol", "PM_Vol", "EV_Vol"],
    var_name="MOD_KEY",
    value_name="MODELED_TOTAL",
)
df_mod_long["PERIOD"] = df_mod_long["MOD_KEY"].str.split("_").str[0]

# C. Merge Long Forms
df_period = pd.merge(
    df_obs_long.drop(columns=["PERIOD_KEY"]),
    df_mod_long.drop(columns=["MOD_KEY"]),
    on=id_vars + ["PERIOD"],
    how="left",
)

# D. Expand Vehicle Types (Auto vs Truck)
# Calculate Auto %
df_period["PCT_AUTO"] = 1.0 - (
    df_period["PCT_SUT"].fillna(0) + df_period["PCT_CUT"].fillna(0)
)

# Melt Types
df_long = df_period.melt(
    id_vars=[
        "STATION",
        "MATCHED_SEGID",
        "FTCLASS",
        "ATYPENAME",
        "COUNTY_NAME",
        "PERIOD",
        "OBSERVED_TOTAL",
        "MODELED_TOTAL",
    ],
    value_vars=["PCT_AUTO", "PCT_SUT", "PCT_CUT"],
    var_name="TYPE_KEY",
    value_name="SHARE",
)

# Map Names
type_map = {"PCT_AUTO": "Auto", "PCT_SUT": "SUT", "PCT_CUT": "CUT"}
df_long["VEHICLE_TYPE"] = df_long["TYPE_KEY"].map(type_map)

# Calculate Final Volumes
df_long["OBSERVED"] = df_long["OBSERVED_TOTAL"] * df_long["SHARE"]
df_long["MODELED"] = df_long["MODELED_TOTAL"] * df_long["SHARE"]

Final Cleanup & Export

Show the code
# --- 4.4 Final Cleanup & Export ---
cols_final = [
    "STATION", "MATCHED_SEGID", "FTCLASS", "ATYPENAME", "COUNTY_NAME",
    "PERIOD", "VEHICLE_TYPE", "OBSERVED", "MODELED"
]
df_final = df_long[cols_final].copy()

# Add Diff Stats for CSV readability
df_final["DIFF"] = df_final["MODELED"] - df_final["OBSERVED"]
df_final["PCT_DIFF"] = (df_final["DIFF"] / df_final["OBSERVED"]).round(3)

8 Step 5: Export Final Output

Show the code
# --- 5.1 Export Final Data for OJS Plots ---
# Export
final_path = output_dir / "dashboard_data.csv"
df_final.to_csv(final_path, index=False)
print(f"Step 4 Complete: Dashboard data ready at {final_path}")
Step 4 Complete: Dashboard data ready at _output\dashboard_data.csv
Show the code
# --- 5.2 Export Geometry for OJS Map ---

# 1. Identify Valid IDs from the Final Dashboard Data
valid_stations = df_final["STATION"].unique()
valid_segments = df_final["MATCHED_SEGID"].unique()

# 2. Filter Geometry AND Project to Lat/Lon (EPSG:4326)
# Leaflet requires EPSG:4326. Without this .to_crs(), points will not show up.
gdf_out_stations = gdf_ccs_locations[
    gdf_ccs_locations["STATION"].isin(valid_stations)
].to_crs("EPSG:4326")
gdf_out_segments = gdf_network_clean[
    gdf_network_clean["SEGID"].isin(valid_segments)
].to_crs("EPSG:4326")

# 3. Export to GeoJSON
gdf_out_stations.to_file(output_dir / "stations.geojson", driver="GeoJSON")
gdf_out_segments.to_file(output_dir / "segments.geojson", driver="GeoJSON")

print(f"Step 4 Complete: Map Geometry Exported.")
print(f"  - Stations: {len(gdf_out_stations)}")
print(f"  - Segments: {len(gdf_out_segments)}")
Step 4 Complete: Map Geometry Exported.
  - Stations: 83
  - Segments: 81

9 Interactive Dashboard