DS205 2025-2026 Winter Term Icon

🖥️ Week 05 Lecture

Data Pipelines and API Design

Author

Dr Jon Cardoso-Silva

Published

06 March 2026

Last Updated: 16 February 2026

No slides this week. I’ll live-code and use the whiteboard in the room. This page is the reference copy you can come back to afterwards.

📍 Session Details

  • Date: Monday, 16 February 2026
  • Time: 16:00 - 18:00
  • Location: SAL.G.03

📋 Preparation

  • Continue working on ✍️ Problem Set 1. By now you should have access to your partner’s repository and be thinking about Part B (the API).
  • Make sure your food conda environment is active and up to date.
  • Bring any questions or blockers you’re facing with the peer handoff or API design.

💭 Something to think about

Many of you came to this course with an experience of Python coding that involved more Jupyter Notebooks and only occasionally running code in the terminal. For the kind of things we build in this course (data pipelines, data collection pipelines that can scale to millions of items, data products that can be deployed and used by others), running .py scripts on the terminal is more the norm.

Section 1: Data layers

Your ✍️ Problem Set 1 project produces or repurposes data about the same thing several times over. We can probably identify three layers of data in the pipeline:

Raw collection

Your spider writes its output here (e.g. data/scraped/). If you run the spider again, you get fresh data and if you configured the spider correctly, it will only write new products to files if they have not been scraped yet.

Enrichment

A separate step reads the raw data, queries external sources, and writes the combined result (e.g. data/enriched/). Depending on how you configured your enrichment script, re-running can skip products that have already been matched, so you don’t waste API calls. (One could do the enrichment directly while scraping with Scrapy, using the concept of the pipelines.py file)

Serving

The API loads the enriched data at startup and serves it to consumers. Depending on how the enriched data is stored, it might need to (re)convert it to JSON. The API could but ideally shouldn’t be calling external APIs during a request as that would be very inefficient.

One note on data governance

Keep large data files out of your repository. Add data/scraped/ and data/enriched/ to your .gitignore, along with any .env files containing API keys.1

What definitely belongs in the repo: your scripts, configuration files, and a small sample data file so other developers can test the API without running the full pipeline.2

Whatever you decide to do, make sure you have a good README.md file in your repository that explains how to run the pipeline, where the live data lives and how to test the API.

(Optional) A note on SQLite

If you already know SQL, you might want to store your enriched data in a SQLite database instead of JSONL files. This gives you indexed queries and proper relational joins. We won’t talk about databases before your ✍️ Problem Set 1 deadline, but if you’re comfortable with it, these resources will help you get started:

Section 2: Pipeline orchestration with click

When we use the tools we’ve been practising with the way they’ve been designed to be used, we often call them from the terminal. When we need to run the scraper, we might run:

cd /files/jon-ps1/scraper
scrapy crawl waitrose

If you wrote your enchriment logic as a separate Python script, you might run it like this:

python scripts/enrich.py

(or clicking on the Run button on VSCode perhaps, which in turn runs the script above in the terminal)

Similarly, when we are ready to serve data in our bespoke APIs, we might need to run:

uvicorn api.main:app --reload

You need to remember all of these commands or keep them ready to be copied and pasted. Imagine what would happen if our list of scripts (our pipeline) grew to 10 steps rather than just 3. What if we needed to run things in parallel? A concept in data engineering that resolves this problem is called orchestration.

In this section, we’ll be looking at how to orchestrate our pipeline in a simple way - there are way more sophisticated tools out there though! See for example Airflow or Prefect I’ve never used this one though.

👉 Our goal here is to create a primitive but useful and functional version of a pipeline orchestration in a single Python script we could use to run our pipeline.

That is, we want to be able to run all of our pipeline steps in a single command like this:

python run_pipeline.py scrape --category=bakery
python run_pipeline.py enrich --only-new
python run_pipeline.py api --port=8000

Creating a pipeline script like this one is optional for PS1. It would certainly look your work more professional, but it truly is not a core requirement (it will be for future assignments!).

You can run your scraper and enrichment manually and get full marks.

A plain pipeline script

Before writing any pipeline code, you need to understand a bit about a Python module called subprocess.

Subprocess allows you to run external commands you would normally type in your terminal from within Python. The subprocess.run function launches an external command, waits for it to finish, and gives you a return code.

import subprocess

# Same as typing "cd scraper && scrapy crawl waitrose" in your terminal
result = subprocess.run(["scrapy", "crawl", "waitrose"], cwd="scraper")

# result.returncode is 0 if it worked, non-zero if it failed
print(result.returncode)

See the subprocess documentation for the full API.

With that in mind, here’s the simplest pipeline script:

# run_pipeline.py (version 1: plain script | NOT A GOOD IDEA!)
import subprocess
import sys

print("=== Step 1: Scraping ===")
result = subprocess.run(
    ["scrapy", "crawl", "waitrose",
     "-o", "../data/scraped/products.jsonl"],
    cwd="scraper"
)
if result.returncode != 0:
    print("Scraping failed.")
    sys.exit(1)

print("=== Step 2: Enriching ===")
result = subprocess.run(["python", "scripts/enrich.py"])
if result.returncode != 0:
    print("Enrichment failed.")
    sys.exit(1)

print("=== Step 3: Starting API ===")
subprocess.run(["uvicorn", "api.main:app", "--reload"])

This works, but consider: what if you only want to run one step? What if you want to pass options (like which category to scrape)? There’s no help text, no way to skip steps, and no way to customise behaviour without editing the script.

The main guard pattern and argparse

Before we improve the script, there’s a Python pattern you need to know. When you run python run_pipeline.py, Python sets a special variable called __name__ to the string "__main__". When you import run_pipeline from another file, __name__ is set to "run_pipeline" instead. The guard at the bottom of a script:

if __name__ == "__main__":
    # this code only runs when you execute the file directly
    main()

ensures that the main() function only runs when you execute the file as a script, not when another module imports it. Without this guard, importing the file would trigger the whole pipeline. You’ll see this pattern in the click version below, and it’s good practice for any script that defines reusable functions.

Python’s standard library also includes argparse for building command-line interfaces. Here’s a sketch of what it looks like (this is not runnable code, just the structure):

# What argparse looks like (sketch only)
import argparse

def scrape(category):
    ...

def enrich(only_new):
    ...



if __name__ == "__main__":

    ###################
    # Parse arguments #
    ###################
    
    parser = argparse.ArgumentParser(description="Pipeline for scraping, enriching, and serving Waitrose product data.")
    subparsers = parser.add_subparsers(dest="command")

    scrape_parser = subparsers.add_parser("scrape")
    scrape_parser.add_argument("--category", default="groceries")

    enrich_parser = subparsers.add_parser("enrich")
    enrich_parser.add_argument("--only-new", action="store_true")

    args = parser.parse_args()

    #########################
    # Handle each command #
    #########################

    if args.command == "scrape":
        scrape(args.category)
    elif args.command == "enrich":
        enrich(args.only_new)
    elif args.command == "serve":
        serve(args.port)
    else:
        parser.print_help()
        sys.exit(1)

This works, but a library called click does the same thing with less boilerplate.

The click version (stub for live demo)

This version does nothing real. It just prints what each command would do. You can run it right now to see how click structures a CLI, without needing a working scraper or API:

#!/usr/bin/env python
# run_pipeline.py (stub version for demonstration)

import click


@click.group()
def cli():
    """Pipeline for scraping, enriching, and serving Waitrose product data."""
    pass


@cli.command()
@click.option("--category", default="groceries", help="Waitrose category to scrape")
def scrape(category: str):
    """Run the Scrapy spider and save output to data/scraped/."""
    click.echo(f"[STUB] Would scrape category: {category}")
    click.echo(f"[STUB] Would save to data/scraped/products.jsonl")


@cli.command()
@click.option("--only-new/--all", default=True, help="Skip already-enriched products")
def enrich(only_new: bool):
    """Match scraped products against OpenFoodFacts and save to data/enriched/."""
    mode = "only new products" if only_new else "all products"
    click.echo(f"[STUB] Would enrich {mode}")
    click.echo(f"[STUB] Would save to data/enriched/products.jsonl")


@cli.command()
@click.option("--port", default=8000, help="Port for the API server")
def serve(port: int):
    """Start the FastAPI server."""
    click.echo(f"[STUB] Would start uvicorn on port {port}")


if __name__ == "__main__":
    cli()

Try it:

$ python run_pipeline.py --help
$ python run_pipeline.py scrape --category=bakery
$ python run_pipeline.py enrich --help
The click version (full implementation)

Here is the full run_pipeline.py with real subprocess calls and OpenFoodFacts enrichment:

#!/usr/bin/env python
# run_pipeline.py

import sys
import json
import click
import requests
import subprocess

import pandas as pd

from pathlib import Path

#############
# CONSTANTS #
#############

DATA_DIR = Path("data")
SCRAPED_DIR = DATA_DIR / "scraped"
ENRICHED_DIR = DATA_DIR / "enriched"

################
# CLI COMMANDS #
################


@click.group()
def cli():
    """Pipeline for scraping, enriching, and serving Waitrose product data."""
    pass


@cli.command()
@click.option("--category", default="groceries", help="Waitrose category to scrape")
def scrape(category: str):
    """Run the Scrapy spider and save output to data/scraped/."""

    SCRAPED_DIR.mkdir(parents=True, exist_ok=True)
    output_file = SCRAPED_DIR / "products.jsonl"

    click.echo(f"Scraping category: {category}")
    result = subprocess.run(
        ["scrapy", "crawl", "waitrose",
         "-a", f"category={category}",
         "-o", f"../{output_file}"],
        cwd="scraper"
    )

    if result.returncode != 0:
        click.echo("Scraping failed.", err=True)
        sys.exit(1)

    click.echo(f"Saved to {output_file}")


@cli.command()
@click.option("--only-new/--all", default=True, help="Skip already-enriched products")
def enrich(only_new: bool):
    """Match scraped products against OpenFoodFacts and save to data/enriched/."""

    ENRICHED_DIR.mkdir(parents=True, exist_ok=True)

    scraped_file = SCRAPED_DIR / "products.jsonl"
    enriched_file = ENRICHED_DIR / "products.jsonl"

    scraped = pd.read_json(scraped_file, lines=True).to_dict(orient="records")
    click.echo(f"Loaded {len(scraped)} scraped products")

    already_done: set[str] = set()
    if only_new and enriched_file.exists():
        enriched_df = pd.read_json(enriched_file, lines=True)
        for barcodes in enriched_df["barcode"]:
            for bc in barcodes:
                already_done.add(bc)
        click.echo(f"Skipping {len(already_done)} already-enriched barcodes")

    enriched_count = 0
    with open(enriched_file, "a") as out:
        for product in scraped:
            barcodes = product.get("barcode", [])

            if only_new and any(bc in already_done for bc in barcodes):
                continue

            nova_group = None
            for bc in barcodes:
                resp = requests.get(
                    f"https://world.openfoodfacts.org/api/v2/product/{bc}",
                    params={"fields": "nova_group,product_name"},
                    headers={"User-Agent": "DS205-LSE/1.0"}
                )
                if resp.status_code == 200:
                    off_data = resp.json().get("product", {})
                    nova_group = off_data.get("nova_group")
                    if nova_group is not None:
                        break

            product["nova_group"] = nova_group
            out.write(json.dumps(product) + "\n")
            enriched_count += 1

    click.echo(f"Enriched {enriched_count} new products")


@cli.command()
@click.option("--port", default=8000, help="Port for the API server")
def serve(port: int):
    """Start the FastAPI server."""

    click.echo(f"Starting API on port {port}")
    subprocess.run(
        ["uvicorn", "api.main:app", "--reload", "--port", str(port)]
    )


if __name__ == "__main__":
    cli()

Running python run_pipeline.py --help produces:

$ python run_pipeline.py --help
Usage: run_pipeline.py [COMMAND]

  Pipeline for scraping, enriching, and serving Waitrose product data.

Commands:
  enrich  Match scraped products against OpenFoodFacts and save to...
  scrape  Run the Scrapy spider and save output to data/scraped/.
  serve   Start the FastAPI server.

Each subcommand has its own help text:

$ python run_pipeline.py enrich --help
Usage: run_pipeline.py enrich [OPTIONS]

  Match scraped products against OpenFoodFacts and save to data/enriched/.

Options:
  --only-new / --all  Skip already-enriched products
  --help              Show this message and exit.

Here’s what each click concept does:

  • @click.group() makes cli a container for subcommands.
  • @cli.command() registers a function as a subcommand. Same decorator pattern as @app.get() in FastAPI.
  • @click.option() adds flags with types, defaults, and help text.
  • click.echo() prints to the terminal. It works better than print() for CLI tools because it handles encoding issues across operating systems.
  • --only-new/--all is a boolean flag pair. The enrichment script opens the output file in append mode ("a"), so re-running adds new results without overwriting.

See the click quickstart for more examples.

Remember to update the environment!

You need to add click to your environment.yml under dependencies:

  # --- Pipeline orchestration ---
  - click

Then update the environment:

conda env update -f environment.yml --prune
Making it executable (chmod +x)

The first line of the script, #!/usr/bin/env python, is called a shebang.3 It tells the operating system which programme to use when running the file directly.

On Linux or macOS (including Nuvolos), you can make the script executable:

chmod +x run_pipeline.py

chmod changes file permissions. +x adds execute permission. After this, you can run the script directly:

./run_pipeline.py scrape --category=bakery
./run_pipeline.py enrich --only-new
./run_pipeline.py serve

On Windows, you still need python run_pipeline.py.

Section 3: Improving the API schema

In the 💻 W04 Lab, you built a working API with a WaitroseProduct model and a /products endpoint. That’s enough to serve data, but think about who might use an API like this: a researcher querying for UPF data, a public health team, a journalist investigating supermarket nutrition. The Swagger UI docs page is the first thing they see. Better schemas make the API self-documenting.

This section makes incremental improvements to the W04 lab code. Each change is small and independent.

Increment 1: Add Field descriptions

Here’s the model from the W04 lab:

# models.py (W04 lab version)
from pydantic import BaseModel, Field


class WaitroseProduct(BaseModel):
    name: str
    category: str
    url: str
    barcode: list[str] = Field(min_length=1)
    food_type: list[dict] | None = None

Now add description to each field:

# models.py (with descriptions)
from pydantic import BaseModel, Field


class WaitroseProduct(BaseModel):
    name: str = Field(
        description="Product name as displayed on Waitrose website"
    )
    category: str = Field(
        description="Waitrose category slug, e.g. 'milk-butter-and-eggs'"
    )
    url: str = Field(
        description="Full URL to the product page on waitrose.com"
    )
    barcode: list[str] = Field(
        min_length=1,
        description="One or more EAN barcodes for this product"
    )
    food_type: list[dict] | None = Field(
        default=None,
        description="Waitrose food type classifications, if available"
    )

Field(description="...") adds help text that appears in Swagger UI next to each field. Small effort, big payoff for anyone reading your docs.

Increment 2: Add numeric constraints and enrichment fields

Add nova_group and price with validation constraints:

    # Add these fields to WaitroseProduct:
    nova_group: int | None = Field(
        default=None,
        ge=1,
        le=4,
        description="NOVA classification (1-4) from OpenFoodFacts, None if not matched"
    )
    price: float | None = Field(
        default=None,
        gt=0,
        description="Price in GBP, None if not available"
    )

ge=1, le=4 means greater-or-equal to 1, less-or-equal to 4. Pydantic rejects values outside this range with a validation error. gt=0 means strictly greater than zero (a price of 0 or negative makes no sense).

Increment 3: Add a complete example with json_schema_extra

Add model_config to give Swagger UI a realistic example:

class WaitroseProduct(BaseModel):
    """A product scraped from Waitrose, optionally enriched with NOVA classification."""

    model_config = {
        "json_schema_extra": {
            "examples": [
                {
                    "name": "Waitrose Essential British Semi Skimmed Milk 2L",
                    "category": "milk-butter-and-eggs",
                    "url": "https://www.waitrose.com/ecom/products/...",
                    "barcode": ["5000128931830"],
                    "nova_group": 1,
                    "price": 1.55,
                }
            ]
        }
    }

    # ... fields as above

json_schema_extra with examples adds a complete example object that Swagger UI uses to pre-fill the “Try it out” form. When someone opens the docs, they immediately see what a realistic product looks like.

See:

Increment 4: Improve the FastAPI app metadata

In the W04 lab, the app was created with no metadata:

app = FastAPI()

Add a title, description, and version:

app = FastAPI(
    title="Waitrose Product API",
    description="Waitrose products enriched with NOVA classifications from OpenFoodFacts.",
    version="0.1.0",
)

These appear at the top of the Swagger UI page, so anyone visiting /docs immediately understands what the API does.

Also update the data loading to read from data/enriched/ (the output of your pipeline) instead of a flat file:

import pandas as pd
from pathlib import Path

DATA_FILE = Path("data/enriched/products.jsonl")
products: list[dict] = []

if DATA_FILE.exists():
    products = pd.read_json(DATA_FILE, lines=True).to_dict(orient="records")
    print(f"Loaded {len(products)} enriched products")
else:
    print(f"Warning: {DATA_FILE} not found. API will serve empty results.")

Increment 5: Add Query descriptions and validation to endpoints

Here’s the /products endpoint from the W04 lab:

@app.get("/products")
def get_products(
    category: str | None = None,
    name_contains: str | None = None
) -> list[WaitroseProduct]:
    ...

Now add Query with descriptions, validation, a nova_group filter, and basic pagination:

from fastapi import Query

@app.get(
    "/products",
    summary="Browse and filter products",
)
def get_products(
    category: str | None = Query(
        default=None, description="Filter by Waitrose category slug"
    ),
    name_contains: str | None = Query(
        default=None, description="Case-insensitive substring search on product name"
    ),
    nova_group: int | None = Query(
        default=None, ge=1, le=4, description="Filter by NOVA group (1-4)"
    ),
    limit: int = Query(
        default=20, ge=1, le=100, description="Max results to return"
    ),
    offset: int = Query(
        default=0, ge=0, description="Number of results to skip"
    ),
) -> list[WaitroseProduct]:
    results = products

    if category:
        results = [p for p in results if p.get("category") == category]

    if name_contains:
        term = name_contains.lower()
        results = [p for p in results if term in p.get("name", "").lower()]

    if nova_group:
        results = [p for p in results if p.get("nova_group") == nova_group]

    page = results[offset : offset + limit]
    return [WaitroseProduct(**p) for p in page]

Query(description="...") does for query parameters what Field(description="...") does for model fields. Query(ge=1, le=4) means FastAPI rejects invalid values with a 422 error before your function even runs. limit and offset give basic pagination so you don’t dump thousands of products in one response.

Increment 6: Add a /stats endpoint

This endpoint answers the PS1 driving question directly: what proportion of products are ultra-processed?

@app.get(
    "/stats",
    summary="Summary statistics on NOVA classification coverage",
)
def get_stats() -> dict:
    total = len(products)
    nova_counts = {1: 0, 2: 0, 3: 0, 4: 0, "unknown": 0}

    for p in products:
        group = p.get("nova_group")
        if group in nova_counts:
            nova_counts[group] += 1
        else:
            nova_counts["unknown"] += 1

    return {
        "total_products": total,
        "nova_counts": nova_counts,
        "enrichment_rate": (
            round((total - nova_counts["unknown"]) / total, 3) if total > 0 else 0
        ),
    }

It returns counts by NOVA group and a coverage rate showing what fraction of your products have been successfully matched against OpenFoodFacts.

A brief note on testing

We are likely to use pytest properly in the second half of the course. For now, test your API by visiting /docs and trying each endpoint with different parameters.

What’s next

The W05 lab is dedicated working time for ✍️ Problem Set 1. Bring your code, your questions, and any blockers. Jon will circulate to help troubleshoot.

PS1 is due Thursday 26 February 2026, 8pm UK time. Use this week and Reading Week to finish Part B.

🎥 Session Recording

The lecture recording will be available on Moodle by the afternoon of the lecture.

Footnotes

  1. Read about the pydotenv library to understand more about .env files and how to work with them in Python.↩︎

  2. Git doesn’t handle large files well. You’ll get a warning when pushing files larger than 50 MB, so keep bulky data out of the repo.↩︎

  3. env python finds whichever python is first on your PATH. After conda activate food, that resolves to the Python inside your conda environment. You can check this yourself: run which python on Linux/macOS (including Nuvolos) or where python on Windows. On Nuvolos you might see something like /opt/conda/envs/food/bin/python; on a Windows machine with miniconda it would be something like C:\Users\Jon\miniconda3\envs\food\python.exe.↩︎