🖥️ Week 05 Lecture
Data Pipelines and API Design
Last Updated: 16 February 2026
No slides this week. I’ll live-code and use the whiteboard in the room. This page is the reference copy you can come back to afterwards.
📍 Session Details
- Date: Monday, 16 February 2026
- Time: 16:00 - 18:00
- Location: SAL.G.03
📋 Preparation
- Continue working on ✍️ Problem Set 1. By now you should have access to your partner’s repository and be thinking about Part B (the API).
- Make sure your
foodconda environment is active and up to date. - Bring any questions or blockers you’re facing with the peer handoff or API design.
💭 Something to think about
Many of you came to this course with an experience of Python coding that involved more Jupyter Notebooks and only occasionally running code in the terminal. For the kind of things we build in this course (data pipelines, data collection pipelines that can scale to millions of items, data products that can be deployed and used by others), running .py scripts on the terminal is more the norm.
Section 1: Data layers
Your ✍️ Problem Set 1 project produces or repurposes data about the same thing several times over. We can probably identify three layers of data in the pipeline:
Raw collection
Your spider writes its output here (e.g. data/scraped/). If you run the spider again, you get fresh data and if you configured the spider correctly, it will only write new products to files if they have not been scraped yet.
Enrichment
A separate step reads the raw data, queries external sources, and writes the combined result (e.g. data/enriched/). Depending on how you configured your enrichment script, re-running can skip products that have already been matched, so you don’t waste API calls. (One could do the enrichment directly while scraping with Scrapy, using the concept of the pipelines.py file)
Serving
The API loads the enriched data at startup and serves it to consumers. Depending on how the enriched data is stored, it might need to (re)convert it to JSON. The API could but ideally shouldn’t be calling external APIs during a request as that would be very inefficient.
One note on data governance
Keep large data files out of your repository. Add data/scraped/ and data/enriched/ to your .gitignore, along with any .env files containing API keys.1
What definitely belongs in the repo: your scripts, configuration files, and a small sample data file so other developers can test the API without running the full pipeline.2
Whatever you decide to do, make sure you have a good README.md file in your repository that explains how to run the pipeline, where the live data lives and how to test the API.
(Optional) A note on SQLite
If you already know SQL, you might want to store your enriched data in a SQLite database instead of JSONL files. This gives you indexed queries and proper relational joins. We won’t talk about databases before your ✍️ Problem Set 1 deadline, but if you’re comfortable with it, these resources will help you get started:
Section 2: Pipeline orchestration with click
When we use the tools we’ve been practising with the way they’ve been designed to be used, we often call them from the terminal. When we need to run the scraper, we might run:
cd /files/jon-ps1/scraper
scrapy crawl waitroseIf you wrote your enchriment logic as a separate Python script, you might run it like this:
python scripts/enrich.py(or clicking on the Run button on VSCode perhaps, which in turn runs the script above in the terminal)
Similarly, when we are ready to serve data in our bespoke APIs, we might need to run:
uvicorn api.main:app --reloadYou need to remember all of these commands or keep them ready to be copied and pasted. Imagine what would happen if our list of scripts (our pipeline) grew to 10 steps rather than just 3. What if we needed to run things in parallel? A concept in data engineering that resolves this problem is called orchestration.
In this section, we’ll be looking at how to orchestrate our pipeline in a simple way - there are way more sophisticated tools out there though! See for example Airflow or Prefect I’ve never used this one though.
👉 Our goal here is to create a primitive but useful and functional version of a pipeline orchestration in a single Python script we could use to run our pipeline.
That is, we want to be able to run all of our pipeline steps in a single command like this:
python run_pipeline.py scrape --category=bakery
python run_pipeline.py enrich --only-new
python run_pipeline.py api --port=8000Creating a pipeline script like this one is optional for PS1. It would certainly look your work more professional, but it truly is not a core requirement (it will be for future assignments!).
You can run your scraper and enrichment manually and get full marks.
A plain pipeline script
Before writing any pipeline code, you need to understand a bit about a Python module called subprocess.
Subprocess allows you to run external commands you would normally type in your terminal from within Python. The subprocess.run function launches an external command, waits for it to finish, and gives you a return code.
import subprocess
# Same as typing "cd scraper && scrapy crawl waitrose" in your terminal
result = subprocess.run(["scrapy", "crawl", "waitrose"], cwd="scraper")
# result.returncode is 0 if it worked, non-zero if it failed
print(result.returncode)See the subprocess documentation for the full API.
With that in mind, here’s the simplest pipeline script:
# run_pipeline.py (version 1: plain script | NOT A GOOD IDEA!)
import subprocess
import sys
print("=== Step 1: Scraping ===")
result = subprocess.run(
["scrapy", "crawl", "waitrose",
"-o", "../data/scraped/products.jsonl"],
cwd="scraper"
)
if result.returncode != 0:
print("Scraping failed.")
sys.exit(1)
print("=== Step 2: Enriching ===")
result = subprocess.run(["python", "scripts/enrich.py"])
if result.returncode != 0:
print("Enrichment failed.")
sys.exit(1)
print("=== Step 3: Starting API ===")
subprocess.run(["uvicorn", "api.main:app", "--reload"])This works, but consider: what if you only want to run one step? What if you want to pass options (like which category to scrape)? There’s no help text, no way to skip steps, and no way to customise behaviour without editing the script.
The main guard pattern and argparse
Before we improve the script, there’s a Python pattern you need to know. When you run python run_pipeline.py, Python sets a special variable called __name__ to the string "__main__". When you import run_pipeline from another file, __name__ is set to "run_pipeline" instead. The guard at the bottom of a script:
if __name__ == "__main__":
# this code only runs when you execute the file directly
main()ensures that the main() function only runs when you execute the file as a script, not when another module imports it. Without this guard, importing the file would trigger the whole pipeline. You’ll see this pattern in the click version below, and it’s good practice for any script that defines reusable functions.
Python’s standard library also includes argparse for building command-line interfaces. Here’s a sketch of what it looks like (this is not runnable code, just the structure):
# What argparse looks like (sketch only)
import argparse
def scrape(category):
...
def enrich(only_new):
...
if __name__ == "__main__":
###################
# Parse arguments #
###################
parser = argparse.ArgumentParser(description="Pipeline for scraping, enriching, and serving Waitrose product data.")
subparsers = parser.add_subparsers(dest="command")
scrape_parser = subparsers.add_parser("scrape")
scrape_parser.add_argument("--category", default="groceries")
enrich_parser = subparsers.add_parser("enrich")
enrich_parser.add_argument("--only-new", action="store_true")
args = parser.parse_args()
#########################
# Handle each command #
#########################
if args.command == "scrape":
scrape(args.category)
elif args.command == "enrich":
enrich(args.only_new)
elif args.command == "serve":
serve(args.port)
else:
parser.print_help()
sys.exit(1)This works, but a library called click does the same thing with less boilerplate.
The click version (stub for live demo)
This version does nothing real. It just prints what each command would do. You can run it right now to see how click structures a CLI, without needing a working scraper or API:
#!/usr/bin/env python
# run_pipeline.py (stub version for demonstration)
import click
@click.group()
def cli():
"""Pipeline for scraping, enriching, and serving Waitrose product data."""
pass
@cli.command()
@click.option("--category", default="groceries", help="Waitrose category to scrape")
def scrape(category: str):
"""Run the Scrapy spider and save output to data/scraped/."""
click.echo(f"[STUB] Would scrape category: {category}")
click.echo(f"[STUB] Would save to data/scraped/products.jsonl")
@cli.command()
@click.option("--only-new/--all", default=True, help="Skip already-enriched products")
def enrich(only_new: bool):
"""Match scraped products against OpenFoodFacts and save to data/enriched/."""
mode = "only new products" if only_new else "all products"
click.echo(f"[STUB] Would enrich {mode}")
click.echo(f"[STUB] Would save to data/enriched/products.jsonl")
@cli.command()
@click.option("--port", default=8000, help="Port for the API server")
def serve(port: int):
"""Start the FastAPI server."""
click.echo(f"[STUB] Would start uvicorn on port {port}")
if __name__ == "__main__":
cli()Try it:
$ python run_pipeline.py --help
$ python run_pipeline.py scrape --category=bakery
$ python run_pipeline.py enrich --help
The click version (full implementation)
Here is the full run_pipeline.py with real subprocess calls and OpenFoodFacts enrichment:
#!/usr/bin/env python
# run_pipeline.py
import sys
import json
import click
import requests
import subprocess
import pandas as pd
from pathlib import Path
#############
# CONSTANTS #
#############
DATA_DIR = Path("data")
SCRAPED_DIR = DATA_DIR / "scraped"
ENRICHED_DIR = DATA_DIR / "enriched"
################
# CLI COMMANDS #
################
@click.group()
def cli():
"""Pipeline for scraping, enriching, and serving Waitrose product data."""
pass
@cli.command()
@click.option("--category", default="groceries", help="Waitrose category to scrape")
def scrape(category: str):
"""Run the Scrapy spider and save output to data/scraped/."""
SCRAPED_DIR.mkdir(parents=True, exist_ok=True)
output_file = SCRAPED_DIR / "products.jsonl"
click.echo(f"Scraping category: {category}")
result = subprocess.run(
["scrapy", "crawl", "waitrose",
"-a", f"category={category}",
"-o", f"../{output_file}"],
cwd="scraper"
)
if result.returncode != 0:
click.echo("Scraping failed.", err=True)
sys.exit(1)
click.echo(f"Saved to {output_file}")
@cli.command()
@click.option("--only-new/--all", default=True, help="Skip already-enriched products")
def enrich(only_new: bool):
"""Match scraped products against OpenFoodFacts and save to data/enriched/."""
ENRICHED_DIR.mkdir(parents=True, exist_ok=True)
scraped_file = SCRAPED_DIR / "products.jsonl"
enriched_file = ENRICHED_DIR / "products.jsonl"
scraped = pd.read_json(scraped_file, lines=True).to_dict(orient="records")
click.echo(f"Loaded {len(scraped)} scraped products")
already_done: set[str] = set()
if only_new and enriched_file.exists():
enriched_df = pd.read_json(enriched_file, lines=True)
for barcodes in enriched_df["barcode"]:
for bc in barcodes:
already_done.add(bc)
click.echo(f"Skipping {len(already_done)} already-enriched barcodes")
enriched_count = 0
with open(enriched_file, "a") as out:
for product in scraped:
barcodes = product.get("barcode", [])
if only_new and any(bc in already_done for bc in barcodes):
continue
nova_group = None
for bc in barcodes:
resp = requests.get(
f"https://world.openfoodfacts.org/api/v2/product/{bc}",
params={"fields": "nova_group,product_name"},
headers={"User-Agent": "DS205-LSE/1.0"}
)
if resp.status_code == 200:
off_data = resp.json().get("product", {})
nova_group = off_data.get("nova_group")
if nova_group is not None:
break
product["nova_group"] = nova_group
out.write(json.dumps(product) + "\n")
enriched_count += 1
click.echo(f"Enriched {enriched_count} new products")
@cli.command()
@click.option("--port", default=8000, help="Port for the API server")
def serve(port: int):
"""Start the FastAPI server."""
click.echo(f"Starting API on port {port}")
subprocess.run(
["uvicorn", "api.main:app", "--reload", "--port", str(port)]
)
if __name__ == "__main__":
cli()Running python run_pipeline.py --help produces:
$ python run_pipeline.py --help
Usage: run_pipeline.py [COMMAND]
Pipeline for scraping, enriching, and serving Waitrose product data.
Commands:
enrich Match scraped products against OpenFoodFacts and save to...
scrape Run the Scrapy spider and save output to data/scraped/.
serve Start the FastAPI server.Each subcommand has its own help text:
$ python run_pipeline.py enrich --help
Usage: run_pipeline.py enrich [OPTIONS]
Match scraped products against OpenFoodFacts and save to data/enriched/.
Options:
--only-new / --all Skip already-enriched products
--help Show this message and exit.Here’s what each click concept does:
@click.group()makesclia container for subcommands.@cli.command()registers a function as a subcommand. Same decorator pattern as@app.get()in FastAPI.@click.option()adds flags with types, defaults, and help text.click.echo()prints to the terminal. It works better thanprint()for CLI tools because it handles encoding issues across operating systems.--only-new/--allis a boolean flag pair. The enrichment script opens the output file in append mode ("a"), so re-running adds new results without overwriting.
See the click quickstart for more examples.
Remember to update the environment!
You need to add click to your environment.yml under dependencies:
# --- Pipeline orchestration ---
- clickThen update the environment:
conda env update -f environment.yml --prune
Making it executable (chmod +x)
The first line of the script, #!/usr/bin/env python, is called a shebang.3 It tells the operating system which programme to use when running the file directly.
On Linux or macOS (including Nuvolos), you can make the script executable:
chmod +x run_pipeline.pychmod changes file permissions. +x adds execute permission. After this, you can run the script directly:
./run_pipeline.py scrape --category=bakery
./run_pipeline.py enrich --only-new
./run_pipeline.py serveOn Windows, you still need python run_pipeline.py.
Section 3: Improving the API schema
In the 💻 W04 Lab, you built a working API with a WaitroseProduct model and a /products endpoint. That’s enough to serve data, but think about who might use an API like this: a researcher querying for UPF data, a public health team, a journalist investigating supermarket nutrition. The Swagger UI docs page is the first thing they see. Better schemas make the API self-documenting.
This section makes incremental improvements to the W04 lab code. Each change is small and independent.
Increment 1: Add Field descriptions
Here’s the model from the W04 lab:
# models.py (W04 lab version)
from pydantic import BaseModel, Field
class WaitroseProduct(BaseModel):
name: str
category: str
url: str
barcode: list[str] = Field(min_length=1)
food_type: list[dict] | None = NoneNow add description to each field:
# models.py (with descriptions)
from pydantic import BaseModel, Field
class WaitroseProduct(BaseModel):
name: str = Field(
description="Product name as displayed on Waitrose website"
)
category: str = Field(
description="Waitrose category slug, e.g. 'milk-butter-and-eggs'"
)
url: str = Field(
description="Full URL to the product page on waitrose.com"
)
barcode: list[str] = Field(
min_length=1,
description="One or more EAN barcodes for this product"
)
food_type: list[dict] | None = Field(
default=None,
description="Waitrose food type classifications, if available"
)Field(description="...") adds help text that appears in Swagger UI next to each field. Small effort, big payoff for anyone reading your docs.
Increment 2: Add numeric constraints and enrichment fields
Add nova_group and price with validation constraints:
# Add these fields to WaitroseProduct:
nova_group: int | None = Field(
default=None,
ge=1,
le=4,
description="NOVA classification (1-4) from OpenFoodFacts, None if not matched"
)
price: float | None = Field(
default=None,
gt=0,
description="Price in GBP, None if not available"
)ge=1, le=4 means greater-or-equal to 1, less-or-equal to 4. Pydantic rejects values outside this range with a validation error. gt=0 means strictly greater than zero (a price of 0 or negative makes no sense).
Increment 3: Add a complete example with json_schema_extra
Add model_config to give Swagger UI a realistic example:
class WaitroseProduct(BaseModel):
"""A product scraped from Waitrose, optionally enriched with NOVA classification."""
model_config = {
"json_schema_extra": {
"examples": [
{
"name": "Waitrose Essential British Semi Skimmed Milk 2L",
"category": "milk-butter-and-eggs",
"url": "https://www.waitrose.com/ecom/products/...",
"barcode": ["5000128931830"],
"nova_group": 1,
"price": 1.55,
}
]
}
}
# ... fields as abovejson_schema_extra with examples adds a complete example object that Swagger UI uses to pre-fill the “Try it out” form. When someone opens the docs, they immediately see what a realistic product looks like.
See:
Increment 4: Improve the FastAPI app metadata
In the W04 lab, the app was created with no metadata:
app = FastAPI()Add a title, description, and version:
app = FastAPI(
title="Waitrose Product API",
description="Waitrose products enriched with NOVA classifications from OpenFoodFacts.",
version="0.1.0",
)These appear at the top of the Swagger UI page, so anyone visiting /docs immediately understands what the API does.
Also update the data loading to read from data/enriched/ (the output of your pipeline) instead of a flat file:
import pandas as pd
from pathlib import Path
DATA_FILE = Path("data/enriched/products.jsonl")
products: list[dict] = []
if DATA_FILE.exists():
products = pd.read_json(DATA_FILE, lines=True).to_dict(orient="records")
print(f"Loaded {len(products)} enriched products")
else:
print(f"Warning: {DATA_FILE} not found. API will serve empty results.")Increment 5: Add Query descriptions and validation to endpoints
Here’s the /products endpoint from the W04 lab:
@app.get("/products")
def get_products(
category: str | None = None,
name_contains: str | None = None
) -> list[WaitroseProduct]:
...Now add Query with descriptions, validation, a nova_group filter, and basic pagination:
from fastapi import Query
@app.get(
"/products",
summary="Browse and filter products",
)
def get_products(
category: str | None = Query(
default=None, description="Filter by Waitrose category slug"
),
name_contains: str | None = Query(
default=None, description="Case-insensitive substring search on product name"
),
nova_group: int | None = Query(
default=None, ge=1, le=4, description="Filter by NOVA group (1-4)"
),
limit: int = Query(
default=20, ge=1, le=100, description="Max results to return"
),
offset: int = Query(
default=0, ge=0, description="Number of results to skip"
),
) -> list[WaitroseProduct]:
results = products
if category:
results = [p for p in results if p.get("category") == category]
if name_contains:
term = name_contains.lower()
results = [p for p in results if term in p.get("name", "").lower()]
if nova_group:
results = [p for p in results if p.get("nova_group") == nova_group]
page = results[offset : offset + limit]
return [WaitroseProduct(**p) for p in page]Query(description="...") does for query parameters what Field(description="...") does for model fields. Query(ge=1, le=4) means FastAPI rejects invalid values with a 422 error before your function even runs. limit and offset give basic pagination so you don’t dump thousands of products in one response.
Increment 6: Add a /stats endpoint
This endpoint answers the PS1 driving question directly: what proportion of products are ultra-processed?
@app.get(
"/stats",
summary="Summary statistics on NOVA classification coverage",
)
def get_stats() -> dict:
total = len(products)
nova_counts = {1: 0, 2: 0, 3: 0, 4: 0, "unknown": 0}
for p in products:
group = p.get("nova_group")
if group in nova_counts:
nova_counts[group] += 1
else:
nova_counts["unknown"] += 1
return {
"total_products": total,
"nova_counts": nova_counts,
"enrichment_rate": (
round((total - nova_counts["unknown"]) / total, 3) if total > 0 else 0
),
}It returns counts by NOVA group and a coverage rate showing what fraction of your products have been successfully matched against OpenFoodFacts.
A brief note on testing
We are likely to use pytest properly in the second half of the course. For now, test your API by visiting /docs and trying each endpoint with different parameters.
What’s next
The W05 lab is dedicated working time for ✍️ Problem Set 1. Bring your code, your questions, and any blockers. Jon will circulate to help troubleshoot.
PS1 is due Thursday 26 February 2026, 8pm UK time. Use this week and Reading Week to finish Part B.
🎥 Session Recording
The lecture recording will be available on Moodle by the afternoon of the lecture.
Appendix | Reference Links
Pipeline tools
Pydantic schema customisation
FastAPI endpoints
SQLite (optional)
Problem Set 1
Footnotes
Read about the
pydotenvlibrary to understand more about.envfiles and how to work with them in Python.↩︎Git doesn’t handle large files well. You’ll get a warning when pushing files larger than 50 MB, so keep bulky data out of the repo.↩︎
env pythonfinds whicheverpythonis first on yourPATH. Afterconda activate food, that resolves to the Python inside your conda environment. You can check this yourself: runwhich pythonon Linux/macOS (including Nuvolos) orwhere pythonon Windows. On Nuvolos you might see something like/opt/conda/envs/food/bin/python; on a Windows machine with miniconda it would be something likeC:\Users\Jon\miniconda3\envs\food\python.exe.↩︎