---
title: "Historic College Enrollment"
subtitle: Fetching College Enrollment Data from NCES Integrated Postsecondary Education Data System (IPEDS)
description: This notebook fetches Total Enrollment and Full Time Equivalent for all Post Secondary Institutions within Wasatch Front Travel Demand Model region using Integrated Postsecondary Education Data System (IPEDS) from National Center for Education Statistics.
author:
- name: Pukar Bhandari
email: pukar.bhandari@wfrc.utah.gov
affiliation:
- name: Wasatch Front Regional Council
url: "https://wfrc.utah.gov/"
date: "2025-12-31"
---
# Environment Setup
## Import Standard Libraries
```{python}
# For Analysis
import numpy as np
import pandas as pd
import geopandas as gpd
# For Visualization
import matplotlib.pyplot as plt
import seaborn as sns
# misc
import os
import requests
from pathlib import Path
import importlib.util
# Census data query libraries & modules
from pygris import blocks, block_groups
from pygris.helpers import validate_state, validate_county
from pygris.data import get_census, get_lodes
from dotenv import load_dotenv
load_dotenv()
```
## Environment Variables
```{python}
CRS_UTM = "EPSG:26912"
```
## Region Definition
```{python}
# --- Configuration ---
target_state = "UT"
target_counties = ["BOX ELDER", "WEBER", "DAVIS", "SALT LAKE", "UTAH"]
```
```{python}
# --- 1. Validate FIPS Codes ---
# Validate State
state_fips = validate_state(target_state)
print(f"State FIPS for {target_state}: {state_fips}")
# Validate Counties and get their FIPS codes
county_fips = [validate_county(state_fips, county) for county in target_counties]
print(f"Target County FIPS: {county_fips}")
```
```{python}
# Converting County Names to FIPS code
fips_codes = [f"{state_fips}{county}" for county in county_fips]
fips_codes
```
## Helper Functions
```{python}
# Create function to read ArcGIS FeatureLayer or Table
def arc_read(url, where="1=1", outFields="*", outSR=4326, **kwargs):
"""
Read an ArcGIS FeatureLayer or Table to a GeoDataFrame.
Parameters:
url (str): The ArcGIS REST service URL (e.g., ending in /FeatureServer/0)
where (str): SQL WHERE clause for filtering. Default: "1=1"
outFields (str): Comma-separated field names. Default: "*"
outSR (int): Output spatial reference EPSG code. Default: 4326
**kwargs: Additional query parameters passed to the ArcGIS REST API
Returns:
geopandas.GeoDataFrame: Spatial data from the service
"""
# Ensure URL ends with /query
if not url.endswith('/query'):
url = url.rstrip('/') + '/query'
# Build query parameters
params = {
'where': where,
'outFields': outFields,
'returnGeometry': 'true',
'outSR': outSR,
'f': 'geojson'
}
# Add any additional parameters
params.update(kwargs)
# Make request
response = requests.get(url, params=params)
response.raise_for_status() # Good practice to check for HTTP errors
# Read as GeoDataFrame
# We use io.BytesIO to handle the response content safely for read_file
import io
return gpd.read_file(io.BytesIO(response.content), engine="pyogrio")
```
# Load Data
## Regional Boundary
```{python}
# 1. Configuration
service_url = "https://services1.arcgis.com/taguadKoI1XFwivx/ArcGIS/rest/services/RegionalBoundaryComponents/FeatureServer/0"
output_dir = Path("data")
gdb_path = output_dir / "RegionalBoundary.gpkg"
layer_name = "RegionalBoundaryComponents"
# 2. Check if file exists locally, otherwise download
if not gdb_path.exists():
print(f"File not found at {gdb_path}. Downloading Regional Boundaries...")
output_dir.mkdir(parents=True, exist_ok=True)
try:
# Use the custom arc_read function
gdf_download = arc_read(service_url)
# Export to Geodatabase
print(f"Saving to {gdb_path}...")
gdf_download.to_file(gdb_path, layer=layer_name, driver="GPKG")
print("Download and export complete.")
except Exception as e:
print(f"Error during download: {e}")
else:
print(f"File found locally at {gdb_path}. Skipping download.")
# 3. Read from local GDB with Filter and Dissolve
gdf_region = (
gpd.read_file(
gdb_path, layer=layer_name, where="PlanOrg IN ('MAG MPO', 'WFRC MPO')"
)
.dissolve()
.to_crs(CRS_UTM)
)
# Fix the holes and slivers in the regional boundary
gdf_region["geometry"] = gdf_region.buffer(1).buffer(-1)
gdf_region.explore()
```
## Post-Secondary Institutions
```{python}
# 1. Setup Configuration
service_url = "https://nces.ed.gov/opengis/rest/services/Postsecondary_School_Locations/EDGE_GEOCODE_POSTSECONDARYSCH_2324/MapServer/0"
output_dir = Path("data/IPEDS")
gdb_path = output_dir / "Postsecondary_School_Locations.gpkg"
layer_name = "EDGE_GEOCODE_POSTSECONDARYSCH_2324"
# 2. Check if file exists locally
if not gdb_path.exists():
print(f"File not found at {gdb_path}. Downloading from ArcGIS Feature Service...")
# Ensure directory exists
output_dir.mkdir(parents=True, exist_ok=True)
try:
# --- Use custom arc_read function ---
gdf_download = arc_read(
service_url, where=f"STATE = '{target_state}'", outFields="*", outSR=CRS_UTM
)
# Export to Geodatabase
print(f"Saving to {gdb_path}...")
gdf_download.to_file(gdb_path, layer=layer_name, driver="GPKG")
print("Download and export complete.")
except Exception as e:
print(f"Error during download or export: {e}")
else:
print(f"File found locally at {gdb_path}. Skipping download.")
# 3. Load the data back into Python
# We load from the local GDB (cache) and apply the County filter here
gdf_college_locations = gpd.read_file(
gdb_path,
layer=layer_name,
where=f"""CNTY IN ('{"', '".join([f"{fip}" for fip in fips_codes])}')""",
mask=gdf_region,
).to_crs(CRS_UTM)
gdf_college_locations.explore()
```
## Fetch Data
### API Configuration
```{python}
# --- Configuration & Setup ---
import requests
import zipfile
import io
import pandas as pd
from pathlib import Path
import warnings
# Suppress warnings for cleaner output
warnings.filterwarnings("ignore")
# 1. Define Target Institutions
id_map = {
230038: "BYU",
230418: "ENSIGN",
230746: "SLCC",
230764: "UofU",
230737: "UVU",
230782: "WSU",
230807: "WESTMIN",
}
# Define column order for consistent output
target_cols = ["BYU", "ENSIGN", "SLCC", "UofU", "UVU", "WSU", "WESTMIN"]
# 2. Setup Data Directory
raw_data_dir = Path("data/IPEDS_Raw")
raw_data_dir.mkdir(parents=True, exist_ok=True)
print(f"Configuration loaded. Raw data will be saved to: {raw_data_dir.absolute()}")
```
Read more at https://educationdata.urban.org/documentation/colleges.html for API documentation.
### Fall Enrollment
We apply `99` (Total) filters to all available dimensions found in the codebook (`sex`, `race`, `ftpt`, `degree_seeking`, `class_level`) to ensure we receive the single grand total row.
```{python}
# --- Fetch & Pivot Fall Enrollment (Direct IPEDS Download) ---
enrollment_frames = []
base_url = "https://nces.ed.gov/ipeds/datacenter/data"
print("Fetching Fall Enrollment Data...")
# Loop from 1997 to 2024
for year in range(1997, 2025):
file_tag = f"EF{year}A"
zip_url = f"{base_url}/{file_tag}.zip"
try:
# 1. Download ZIP
response = requests.get(zip_url, timeout=30)
response.raise_for_status()
# 2. Extract CSV
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
csv_filename = [f for f in z.namelist() if f.lower().endswith(".csv")][0]
z.extract(csv_filename, path=raw_data_dir)
local_csv_path = raw_data_dir / csv_filename
# 3. Read & Process
df_year = pd.read_csv(local_csv_path, encoding="latin1", low_memory=False)
df_year.columns = df_year.columns.str.upper()
# 4. Determine Variable for Total Enrollment
if "EFTOTLT" in df_year.columns:
df_year["Total_Enrollment"] = df_year["EFTOTLT"]
elif "EFRACE15" in df_year.columns and "EFRACE16" in df_year.columns:
df_year["Total_Enrollment"] = df_year["EFRACE15"] + df_year["EFRACE16"]
elif "EFRACE24" in df_year.columns:
df_year["Total_Enrollment"] = df_year["EFRACE24"]
else:
continue
# 5. Filter and Aggregation (Max per UNITID to capture Grand Totals)
mask = df_year["UNITID"].isin(id_map.keys())
df_filtered = (
df_year.loc[mask, ["UNITID", "Total_Enrollment"]]
.groupby("UNITID")["Total_Enrollment"]
.max()
.reset_index()
)
df_filtered["Year"] = year
enrollment_frames.append(df_filtered)
print(f"{year}", end=" ")
except Exception:
pass
# 6. Pivot and Output
print("\nProcessing Complete.")
if enrollment_frames:
df_fall_enrollment = (
pd.concat(enrollment_frames)
.assign(Inst=lambda x: x["UNITID"].map(id_map))
.pivot_table(
index="Year", columns="Inst", values="Total_Enrollment", aggfunc="sum"
)
.reindex(columns=target_cols, fill_value=0)
.astype(int)
)
print("\n--- Fall Enrollment (Headcount) ---")
print(df_fall_enrollment)
```
```{python}
# --- Fetch & Pivot Fall Enrollment (Campus / Non-Online) ---
# Strategy: Download EF<Year>D (Distance Education) which contains both
# "Grand Total" and "Exclusively Distance Education" in the same file for 2012+.
# For pre-2012, assume 100% Campus.
enrollment_frames = []
base_url = "https://nces.ed.gov/ipeds/datacenter/data"
print("Fetching Fall Enrollment (Campus Only)...")
for year in range(1997, 2024):
try:
# --- Pre-2012: No Distance Ed Data (Assume All Campus) ---
if year < 2012:
file_tag = f"EF{year}A"
zip_url = f"{base_url}/{file_tag}.zip"
response = requests.get(zip_url, timeout=30)
response.raise_for_status()
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
csv_filename = [f for f in z.namelist() if f.lower().endswith(".csv")][
0
]
with z.open(csv_filename) as f:
df = pd.read_csv(f, encoding="latin1", low_memory=False)
df.columns = df.columns.str.upper()
# Determine Total
if "EFTOTLT" in df.columns:
df["Campus_Total"] = df["EFTOTLT"]
elif "EFRACE24" in df.columns:
df["Campus_Total"] = df["EFRACE24"]
elif "EFRACE15" in df.columns:
df["Campus_Total"] = df["EFRACE15"] + df["EFRACE16"]
else:
continue
# --- 2012+: Use EF<Year>D (Distance Education) File ---
else:
# The 'A' file often splits data. The 'D' file (EF<Year>D) focuses
# specifically on Distance Education counts and usually has the cleanest totals.
# However, sometimes it's EF<Year>A_DIST. We try the standard A file first
# but look for the specific Distance Ed variables.
# Note: In recent IPEDS, DE data is inside the main EF<Year>A zip
# but sometimes in a separate CSV inside that zip, or a separate download.
# We will try the specific _DIST download first for safety.
# Attempt 1: EF<Year>A_DIST (Common for 2012-2019)
file_tag = f"EF{year}A_DIST"
# In later years (2020+), it might just be inside EF<Year>A.
# Let's try the specific DIST file first.
zip_url = f"{base_url}/{file_tag}.zip"
try:
response = requests.get(zip_url, timeout=30)
if response.status_code != 200:
# Fallback to standard EF<Year>A if _DIST doesn't exist
zip_url = f"{base_url}/EF{year}A.zip"
response = requests.get(zip_url, timeout=30)
response.raise_for_status()
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
# Look for csv
csv_names = [f for f in z.namelist() if f.lower().endswith(".csv")]
# Prioritize one with "dist" in name if multiple exist
dist_csv = next(
(f for f in csv_names if "dist" in f.lower()), csv_names[0]
)
with z.open(dist_csv) as f:
df = pd.read_csv(f, encoding="latin1", low_memory=False)
df.columns = df.columns.str.upper()
# Variable Mapping for 2012+
# EFDEEX = Exclusively Distance Education (All Students)
# EFTEUG = Total Undergraduate (if split)
# EFTOTLT = Grand Total (if available)
# We need "Grand Total" and "Exclusive DE Total"
# 1. Find Grand Total Variable in this file
if "EFTOTLT" in df.columns:
total_col = "EFTOTLT"
elif "EFDETOT" in df.columns: # Total enrollment in DE file
total_col = "EFDETOT"
else:
# If total isn't here, we might need to merge.
# But usually EF...DIST files have the total col for calc purposes.
print(f"Skipping {year}: No total col in DIST file")
continue
# 2. Find Exclusive DE Variable
if "EFDEEX" in df.columns:
de_col = "EFDEEX" # Total Exclusive
elif "EFDEEX1" in df.columns and "EFDEEX2" in df.columns:
# Sum Undergrad (1) + Grad (2)
df["EFDEEX"] = df["EFDEEX1"].fillna(0) + df["EFDEEX2"].fillna(0)
de_col = "EFDEEX"
else:
de_col = None
if de_col:
# Calculate Campus
df["Campus_Total"] = df[total_col] - df[de_col]
else:
# Fallback: Assume 0 DE if variable missing (unlikely for _DIST file)
df["Campus_Total"] = df[total_col]
except Exception as e:
print(f"DL Error {year}: {e}")
continue
# --- Filter & Aggregate ---
mask = df["UNITID"].isin(id_map.keys())
df_filtered = (
df.loc[mask, ["UNITID", "Campus_Total"]]
.groupby("UNITID")["Campus_Total"]
.max() # Max helps if there are duplicate rows (e.g. by race)
.reset_index()
)
# Safety Check: Ensure no negative numbers
# If Exclusive DE > Total (data error), clip to 0
df_filtered.loc[df_filtered["Campus_Total"] < 0, "Campus_Total"] = 0
df_filtered["Year"] = year
enrollment_frames.append(df_filtered)
print(f"{year}", end=" ")
except Exception as e:
pass
# --- Pivot and Output ---
print("\nProcessing Complete.")
if enrollment_frames:
df_campus_only = (
pd.concat(enrollment_frames)
.assign(Inst=lambda x: x["UNITID"].map(id_map))
.pivot_table(index="Year", columns="Inst", values="Campus_Total", aggfunc="sum")
.reindex(columns=target_cols, fill_value=0)
.astype(int)
)
print("\n--- Fall Enrollment (Campus / Non-Online Only) ---")
print(df_campus_only)
```
### Full-Time Equivalent (FTE)
We use `level_of_study=99` to fetch the total FTE directly, avoiding the need to sum undergraduate and graduate rows.
```{python}
# --- Fetch & Pivot Full-Time Equivalent (Direct IPEDS Download) ---
fte_frames = []
base_url = "https://nces.ed.gov/ipeds/datacenter/data"
print("Fetching FTE Data...")
# Loop from 1997 to 2024
for year in range(1997, 2025):
file_tag = f"EFIA{year}"
zip_url = f"{base_url}/{file_tag}.zip"
try:
# 1. Download
response = requests.get(zip_url, timeout=30)
response.raise_for_status()
# 2. Extract
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
csv_filename = [f for f in z.namelist() if f.lower().endswith(".csv")][0]
z.extract(csv_filename, path=raw_data_dir)
local_csv_path = raw_data_dir / csv_filename
# 3. Read
df_year = pd.read_csv(local_csv_path, encoding="latin1", low_memory=False)
df_year.columns = df_year.columns.str.upper()
# 4. Calculate FTE
df_year["FTE_Calc"] = 0
# Method A: Pre-calculated Total
if "FTE12MN" in df_year.columns:
df_year["FTE_Calc"] = df_year["FTE12MN"].fillna(0)
# Method B: Sum of Components
elif "FTEUG" in df_year.columns:
ug = df_year["FTEUG"].fillna(0)
gd = df_year["FTEGD"].fillna(0) if "FTEGD" in df_year.columns else 0
dpp = df_year["FTEDPP"].fillna(0) if "FTEDPP" in df_year.columns else 0
df_year["FTE_Calc"] = ug + gd + dpp
# Method C: Instructional Activity (Older Files)
elif "CDACTUA" in df_year.columns or "CNACTUA" in df_year.columns:
# Credit Hours (Academic) / 30
c_ug = df_year["CDACTUA"].fillna(0) if "CDACTUA" in df_year.columns else 0
c_gd = df_year["CDACTGA"].fillna(0) if "CDACTGA" in df_year.columns else 0
# Contact Hours (Vocational) / 900
contact = (
df_year["CNACTUA"].fillna(0) if "CNACTUA" in df_year.columns else 0
)
df_year["FTE_Calc"] = ((c_ug + c_gd) / 30) + (contact / 900)
else:
continue
# 5. Filter and Aggregation
mask = df_year["UNITID"].isin(id_map.keys())
df_filtered = (
df_year.loc[mask, ["UNITID", "FTE_Calc"]]
.groupby("UNITID")["FTE_Calc"]
.max()
.reset_index()
)
df_filtered["Year"] = year
fte_frames.append(df_filtered)
print(f"{year}", end=" ")
except Exception:
pass
# 6. Pivot and Output
print("\nProcessing Complete.")
if fte_frames:
df_fte = (
pd.concat(fte_frames)
.assign(Inst=lambda x: x["UNITID"].map(id_map))
.pivot_table(index="Year", columns="Inst", values="FTE_Calc", aggfunc="sum")
.reindex(columns=target_cols, fill_value=0)
.astype(int)
)
print("\n--- Full-Time Equivalent (FTE) ---")
print(df_fte)
```
### 12-Month Unduplicated Headcount (E12)
```{python}
# --- Fetch & Pivot 12-Month Unduplicated Headcount (EFFY) ---
effy_frames = []
base_url = "https://nces.ed.gov/ipeds/datacenter/data"
print("Fetching 12-Month Unduplicated Headcount...")
# Loop from 1997 to 2024
# Note: EFFY files usually report for the *previous* academic year,
# but IPEDS labels the file by the collection year.
for year in range(1997, 2025):
file_tag = f"EFFY{year}"
zip_url = f"{base_url}/{file_tag}.zip"
try:
# 1. Download
response = requests.get(zip_url, timeout=30)
response.raise_for_status()
# 2. Extract
with zipfile.ZipFile(io.BytesIO(response.content)) as z:
csv_filename = [f for f in z.namelist() if f.lower().endswith(".csv")][0]
z.extract(csv_filename, path=raw_data_dir)
local_csv_path = raw_data_dir / csv_filename
# 3. Read
df_year = pd.read_csv(local_csv_path, encoding="latin1", low_memory=False)
df_year.columns = df_year.columns.str.upper()
# 4. Determine Variable (EFYTOTLT = Grand Total 12-Month Enrollment)
# Variable names have shifted slightly over decades
if "EFYTOTLT" in df_year.columns:
df_year["Headcount_12M"] = df_year["EFYTOTLT"]
elif "EFYTOTL" in df_year.columns:
df_year["Headcount_12M"] = df_year["EFYTOTL"]
# Fallback for older files: Sum of Level of Study (Undergrad + Grad)
elif "EFYUNDER" in df_year.columns and "EFYGRAD" in df_year.columns:
# Fillna(0) is critical here as some schools may lack grad programs
df_year["Headcount_12M"] = df_year["EFYUNDER"].fillna(0) + df_year[
"EFYGRAD"
].fillna(0)
else:
# If we can't find a total, skip this year
continue
# 5. Filter and Aggregation
mask = df_year["UNITID"].isin(id_map.keys())
df_filtered = (
df_year.loc[mask, ["UNITID", "Headcount_12M"]]
.groupby("UNITID")["Headcount_12M"]
.max() # Max handles cases where data might be disaggregated by race/gender rows
.reset_index()
)
df_filtered["Year"] = year
effy_frames.append(df_filtered)
print(f"{year}", end=" ")
except Exception:
pass
# 6. Pivot and Output
print("\nProcessing Complete.")
if effy_frames:
df_effy = (
pd.concat(effy_frames)
.assign(Inst=lambda x: x["UNITID"].map(id_map))
.pivot_table(
index="Year", columns="Inst", values="Headcount_12M", aggfunc="sum"
)
.reindex(columns=target_cols, fill_value=0)
.astype(int)
)
print("\n--- 12-Month Unduplicated Headcount (E12) ---")
print(df_effy)
```
# Export
```{python}
# --- Export Data to CSV ---
from pathlib import Path
# 1. Define and Create Output Directory
output_dir = Path("intermediate")
output_dir.mkdir(parents=True, exist_ok=True)
# 2. Export Fall Enrollment (Headcount)
if "df_fall_enrollment" in locals():
enrollment_path = output_dir / "Fall_Enrollment_1997_2023.csv"
df_fall_enrollment.to_csv(enrollment_path)
print(f"Saved Fall Enrollment to: {enrollment_path.absolute()}")
else:
print("df_fall_enrollment not found. Skipping export.")
# 2. Export Non-Online Fall Enrollment
if "df_campus_only" in locals():
# Naming it "Campus_Enrollment" to distinguish from Total
enrollment_path = output_dir / "Fall_Enrollment_CampusOnly_1997_2023.csv"
df_campus_only.to_csv(enrollment_path)
print(f"Saved Non-Online Enrollment to: {enrollment_path.absolute()}")
# 3. Export Full-Time Equivalent (FTE)
if "df_fte" in locals():
fte_path = output_dir / "FTE_Enrollment_1997_2023.csv"
df_fte.to_csv(fte_path)
print(f"Saved FTE to: {fte_path.absolute()}")
else:
print("df_fte not found. Skipping export.")
# 4. Export 12-Month Unduplicated Headcount
if "df_effy" in locals():
effy_path = output_dir / "Headcount_12Month_1997_2023.csv"
df_effy.to_csv(effy_path)
print(f"Saved 12-Month Headcount to: {effy_path.absolute()}")
else:
print("df_effy not found. Skipping export.")
```