💻 Week 09 Lab

Building a Data Pipeline with PostgreSQL and Vector Search

Author
Published

18 March 2025

🥅 Learning Goals
The goal of this lab is to help you make progress on Problem Set 2, focusing on implementing the PostgreSQL database, integrating SQLAlchemy, and exploring vector embeddings storage for the similarity search component.
Image created with the AI embedded in MS Designer using the prompt 'abstract green and blue icon depicting the advanced stages of data wrangling, API design, and scalable pipelines for sustainability-focused data engineering.'

Last Updated: 18 March 2025, 9:30

📋 Preparation

To make the most of this lab session, make sure you have done the following:

  1. Set up your repository for the ✍️ Problem Set 2 assignment by accepting the GitHub Classroom invitation and cloning it locally.

  2. Attended the 🗣️ Week 09 Lecture on “Building a Data Pipeline + PostgreSQL Database with SQLAlchemy” or watched the recording.

    • You will still need to piece together the code from the lecture to your own project and fill in the missing pieces.
  3. Installed PostgreSQL on your machine and set up a local database such that you can connect to it from your Python code like this:

DATABASE_URL=postgresql://climate:climate@localhost:5432/climate

That is:

  • The database server is localhost (this machine)
  • The username is climate
  • The password is climate
  • The database (where the tables will be hosted) is also called climate
  • The port the database server uses to listen to requests is 5432
  1. Attempted to run the starter code and identified any issues or questions you have.

🛣️ Lab Roadmap (90-min)

Note to class teachers: This session is primarily a support lab where students can ask questions and get help with their Problem Set 2 assignment. Feel free to structure the time based on student needs, whether that’s answering individual questions or demonstrating key concepts to the whole group.

🎯 ACTION POINTS:

  1. [Please, reserve a bit of time during this lab to fill out the course evaluation surveys if you haven’t done so yet.]

  2. Make progress on your ✍️ Problem Set 2, particularly focusing on implementing the PostgreSQL database component we covered in Monday’s lecture.

This lab is a 🦸🏻 SUPER TECH SUPPORT session.

Use this time to make progress on your ✍️ Problem Set 2, particularly focusing on implementing the PostgreSQL database component we covered in Monday’s lecture. Your class teacher will be available to answer questions and help troubleshoot any issues.

Why am I being punished with having to work with PostgreSQL and pgVector? 😭

The whole reason why we’re using PostgreSQL is because of the pgVector extension.

This extension allows us to store and query vector embeddings in a way that doesn’t require us to load all the data into memory and it already comes with built-in vector similarity search which is performed in a memory efficient way than what we did last week with numpy arrays.

What if I can’t get pgVector to work at all?

Use what you’ve learned in the previous week and submit your code with a numpy array version of the vector embeddings and vector search. You can consider the faiss library as an alternative to pgVector. People who manage to get pgVector to work will be rewarded more highly, of course.

Key Focus Areas

  1. Setting up PostgreSQL and SQLAlchemy: Get help connecting your Python code to a PostgreSQL database.

  2. Database Schema Design: Guidance on creating the right tables and relationships for your data. You might want to use the documents and document_chunks tables from the Week 09 Lecture code as a starting point.

  3. Implementing Vector Storage: Explore how to store and query embeddings using pgVector.

  4. Troubleshooting: Assistance with any issues you’re facing with the starter code or your implementation.

📢 Important Note: We’re building parts of this solution together. Apart from the more ‘creative’ bits of this assignment, which involves the how you’ll use the data for answering the overall question, we are happy to help you get past the initial hurdles of this data pipeline.

Depending on how everyone is doing, Jon, Alex or Barry can decide to add more code to the starter code if needed.

💻 Code You Might Want to Use

Below are cookbook-style code snippets to help you implement database functionality with SQLAlchemy and pgVector for your ✍️ Problem Set 2 assignment. These snippets focus on specific operations rather than providing complete implementations.

⚠️ Note: These snippets are meant to be adapted to your specific needs. Make sure to replace placeholder database URLs, table names, and column definitions with ones that match your project’s requirements.

1. SQLAlchemy Basics

Defining Tables with SQLAlchemy

Similar to how we’ve used Pydantic’s BaseModel and Field to define our data models, and also similar to scrapy’s Item class, SQLAlchemy uses a Base class to define the structure of the tables in our database.

When you specify a class that inherits from Base, SQLAlchemy will automatically create the table in the database with the columns and constraints that you define. When you build an object of this class and add it to the database, SQLAlchemy will automatically insert it as a row in the table.

Take a look at the SQLAlchemy documentation on models to understand more about how to define tables with SQLAlchemy.

from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Text, Boolean, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
import datetime

# Create a base class for our models
Base = declarative_base()

# Define a table as a Python class
class Document(Base):
    __tablename__ = 'documents'  # Table name in the database
    
    # Define columns with their types and constraints
    id = Column(Integer, primary_key=True)
    country = Column(String(100), nullable=False)
    title = Column(String(500), nullable=False)
    url = Column(String(1000), nullable=False)
    downloaded = Column(Boolean, default=False)
    
    # Define relationships with other tables
    # This creates a link to the DocumentChunk table
    chunks = relationship("DocumentChunk", back_populates="document")

Connecting to PostgreSQL

You won’t be able to store the type of data above without connecting to a database first.

If you want to test it as a standalone script, you can do the following:

import os

from dotenv import load_dotenv

load_dotenv()

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

# The best practice is to store your database URL in the .env file
DATABASE_URL = os.getenv("DATABASE_URL")

# Create engine
# it's a weird convention of a name but it simply means 
# that we're creating a connection to the database
engine = create_engine(DATABASE_URL)

# Create a session factory
Session = sessionmaker(bind=engine)

# Create a session for database operations
session = Session()

Creating Tables in the Database

With a connection to the database, you can now let SQLAlchemy create the tables for you. All you need to do is import your Base class from your models file and call the create_all method.

# Remember to import Base from your models file
# wherever it is that you've defined your models
from .models import Base

# Create all tables defined in your models
Base.metadata.create_all(engine)

# Drop and recreate tables (use with caution!)
Base.metadata.drop_all(engine)  # Deletes all tables
Base.metadata.create_all(engine)  # Recreates them

2. Data Operations with SQLAlchemy

Adding Data

You’ve specified the structure of the table in your models file and created a connection to the database. Now you can add data to the database.

First you need to create an object of the class that you’ve defined in your models file. It’s like we’ve been doing with Pydantic’s BaseModel and scrapy’s Item:

# Create a new record
new_document = Document(
    country="Germany",
    title="Germany's NDC 2023",
    url="https://example.com/germany-ndc",
    downloaded=False
)

Now you can add it to the database:

# Add it to the session
session.add(new_document)

# If you have multiple records at once:
session.add_all([doc1, doc2, doc3])

# Commit the transaction
# Just like in git
session.commit()

Querying Data

What if you need to check if your table has a particular record? You can do that by using the query method.

Take a look at the very rich SQLAlchemy documentation on querying to understand more about how to query data with SQLAlchemy.

Here are several examples of how to do that:

# Get all records from a table
all_docs = session.query(Document).all()

# Filter by a condition
# Returns ALL records that match the condition
# kind of like in pandas or a SQL query
german_docs = (
    session.query(Document)
    .filter_by(country="Germany")
    .all()
)

# If you are sure there's only one record:
germany_doc = (
    session.query(Document)
    .filter_by(doc_id=1)
    .first() # use first
)

# More complex filtering
docs = session.query(Document).filter(
    (Document.downloaded == False) & 
    (Document.country.in_(["Germany", "France", "UK"]))
).all()

# Order results
docs = session.query(Document).order_by(Document.country).all()

# Limit results
docs = session.query(Document).limit(10).all()

Updating Data

What if you need to update some columns of a particular record?

You can do that by first querying the record and then updating the columns you need to and committing the transaction:

# Update a single record
doc = session.query(Document).filter_by(id=1).first()
doc.downloaded = True
doc.downloaded_at = datetime.datetime.now()
session.commit()

# Bulk update
(
    session.query(Document)
    .filter(Document.country == "Germany")
    .update({"downloaded": True})
)
session.commit()

Deleting Data

If you need to delete a record, you can do that by first querying the record and then deleting it and committing the transaction:

⚠️ Warning: For the purposes of this course, avoid deleting data ‘manually’ from the database. Otherwise, your data pipeline and analysis will not be reproducible.

# Delete a single record
doc = session.query(Document).filter_by(id=1).first()
session.delete(doc)
session.commit()

# Bulk delete
(
    session.query(Document)
    .filter(Document.downloaded == False)
    .delete()
)
session.commit()

Joining Tables

# Join Document and DocumentChunk tables
results = (
    session.query(Document, DocumentChunk)
    .join(DocumentChunk, 
          Document.id == DocumentChunk.document_id)
    .all()
)

# Filter joined results
results = (
    session.query(Document.country, DocumentChunk.text)
    .join(DocumentChunk)
    .filter(Document.country == "Germany")
).all()

3. Working with pgVector

The whole reason why we’re using PostgreSQL is because of the pgVector extension which allows us to store and query vector embeddings. We could have simply used the numpy arrays (like we did last week) but we’re likely to encounter memory issues with that approach.

PgVector is a PostgreSQL extension that introduces a new data type called vector which allows us to store and query vector embeddings of a fixed dimensionality.

Setting Up pgVector Extension

This assumes pgVector is already installed on your database. This whole process should already be configured for you in the starter code but in case you need to do it manually, you can do the following:

from sqlalchemy import text

# Create the pgVector extension in your database
with engine.connect() as conn:
    conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector;"))
    conn.commit()

Defining a Model with Vector Column

Code not tested but should work:

from sqlalchemy.dialects.postgresql import ARRAY

class DocumentChunk(Base):
    __tablename__ = 'document_chunks'
    
    id = Column(Integer, primary_key=True)
    document_id = Column(Integer, ForeignKey('documents.id'))
    text = Column(Text, nullable=False)
    
    # Store vector embeddings as an array of floats
    embedding = Column('embedding', ARRAY(Float))
    
    # Relationship back to the Document
    document = relationship("Document", back_populates="chunks")
If the code above is not working, you can use the following code to create the table:

To be honest, I’ve only ever used the vector type with a SQL query:

CREATE TABLE document_chunks (
    id SERIAL PRIMARY KEY,
    document_id INTEGER REFERENCES documents(id),
    text TEXT NOT NULL,
    embedding vector(768)
);

This means that to create the table using the SQL code above while using SQLAlchemy, you need to do the following:

session.execute(text("""
    CREATE TABLE document_chunks (
        id SERIAL PRIMARY KEY,
        document_id INTEGER REFERENCES documents(id),
        text TEXT NOT NULL,
        embedding vector(768)
    );
"""))
session.commit()

Storing Embeddings

import numpy as np

# Assuming you have a numpy array embedding from a model
embedding_array = np.random.rand(768) 

# Create a new chunk with the embedding
chunk = DocumentChunk(
    document_id=1,
    text="This is a text chunk",
    embedding=embedding_array.tolist()  
)

session.add(chunk)
session.commit()
Use pure SQL if the code above is not working
insert_query = text("""
    INSERT INTO document_chunks (document_id, text, embedding)
    VALUES (:document_id, :text, :embedding)
""")

params = {"document_id": 1, 
          "text": "This is a text chunk", 
          "embedding": embedding_array.tolist()}

session.execute(insert_query, params)
session.commit()

Creating a Vector Similarity Index

from sqlalchemy import text

# Create an index for faster similarity search
# This should be done after your table has been created
with engine.connect() as conn:
    conn.execute(text("""
        CREATE INDEX IF NOT EXISTS chunks_embedding_idx 
        ON document_chunks 
        USING ivfflat (embedding vector_cosine_ops);
    """))
    conn.commit()

While it’s not necessary that you understand the details of the code above, it’s good to know that pgVector supports two types of indexes: ivfflat (official pgVector docs) and hnsw (official pgVector docs).

🔗 Additional Resources

If you want to dive deeper into the topics we covered:

  1. SQLAlchemy Documentation
  2. pgVector GitHub Repository
  3. Click Documentation
  4. The W09 Lecture code

💡 Reminder: The deadline for Problem Set 2 is Friday, 28 March 2025, 8pm UK time. Make sure to plan your time accordingly, especially for the more complex parts like vector search implementation.

💬 Drop-in Session

If there’s enough interest, I may hold a drop-in session on Friday to answer additional questions or address specific challenges you’re facing. Please let me know if this would be helpful for you.