💻 Week 09 Lab
Building a Data Pipeline with PostgreSQL and Vector Search

Last Updated: 18 March 2025, 9:30
📋 Preparation
To make the most of this lab session, make sure you have done the following:
Set up your repository for the ✍️ Problem Set 2 assignment by accepting the GitHub Classroom invitation and cloning it locally.
Attended the 🗣️ Week 09 Lecture on “Building a Data Pipeline + PostgreSQL Database with SQLAlchemy” or watched the recording.
- You will still need to piece together the code from the lecture to your own project and fill in the missing pieces.
Installed PostgreSQL on your machine and set up a local database such that you can connect to it from your Python code like this:
DATABASE_URL=postgresql://climate:climate@localhost:5432/climate
That is:
- The database server is
localhost
(this machine) - The username is
climate
- The password is
climate
- The database (where the tables will be hosted) is also called
climate
- The port the database server uses to listen to requests is
5432
- Attempted to run the starter code and identified any issues or questions you have.
🛣️ Lab Roadmap (90-min)
Note to class teachers: This session is primarily a support lab where students can ask questions and get help with their Problem Set 2 assignment. Feel free to structure the time based on student needs, whether that’s answering individual questions or demonstrating key concepts to the whole group.
🎯 ACTION POINTS:
[Please, reserve a bit of time during this lab to fill out the course evaluation surveys if you haven’t done so yet.]
Make progress on your ✍️ Problem Set 2, particularly focusing on implementing the PostgreSQL database component we covered in Monday’s lecture.
This lab is a 🦸🏻 SUPER TECH SUPPORT session.
Use this time to make progress on your ✍️ Problem Set 2, particularly focusing on implementing the PostgreSQL database component we covered in Monday’s lecture. Your class teacher will be available to answer questions and help troubleshoot any issues.
Why am I being punished with having to work with PostgreSQL and pgVector? 😭
The whole reason why we’re using PostgreSQL is because of the pgVector extension.
This extension allows us to store and query vector embeddings in a way that doesn’t require us to load all the data into memory and it already comes with built-in vector similarity search which is performed in a memory efficient way than what we did last week with numpy arrays.
What if I can’t get pgVector to work at all?
Use what you’ve learned in the previous week and submit your code with a numpy array version of the vector embeddings and vector search. You can consider the faiss library as an alternative to pgVector. People who manage to get pgVector to work will be rewarded more highly, of course.
Key Focus Areas
Setting up PostgreSQL and SQLAlchemy: Get help connecting your Python code to a PostgreSQL database.
Database Schema Design: Guidance on creating the right tables and relationships for your data. You might want to use the
documents
anddocument_chunks
tables from the Week 09 Lecture code as a starting point.Implementing Vector Storage: Explore how to store and query embeddings using pgVector.
Troubleshooting: Assistance with any issues you’re facing with the starter code or your implementation.
📢 Important Note: We’re building parts of this solution together. Apart from the more ‘creative’ bits of this assignment, which involves the how you’ll use the data for answering the overall question, we are happy to help you get past the initial hurdles of this data pipeline.
Depending on how everyone is doing, Jon, Alex or Barry can decide to add more code to the starter code if needed.
💻 Code You Might Want to Use
Below are cookbook-style code snippets to help you implement database functionality with SQLAlchemy and pgVector for your ✍️ Problem Set 2 assignment. These snippets focus on specific operations rather than providing complete implementations.
⚠️ Note: These snippets are meant to be adapted to your specific needs. Make sure to replace placeholder database URLs, table names, and column definitions with ones that match your project’s requirements.
1. SQLAlchemy Basics
Defining Tables with SQLAlchemy
Similar to how we’ve used Pydantic’s BaseModel
and Field
to define our data models, and also similar to scrapy’s Item
class, SQLAlchemy uses a Base
class to define the structure of the tables in our database.
When you specify a class that inherits from Base
, SQLAlchemy will automatically create the table in the database with the columns and constraints that you define. When you build an object of this class and add it to the database, SQLAlchemy will automatically insert it as a row in the table.
Take a look at the SQLAlchemy documentation on models to understand more about how to define tables with SQLAlchemy.
from sqlalchemy import create_engine, Column, Integer, String, ForeignKey, Text, Boolean, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
import datetime
# Create a base class for our models
= declarative_base()
Base
# Define a table as a Python class
class Document(Base):
= 'documents' # Table name in the database
__tablename__
# Define columns with their types and constraints
id = Column(Integer, primary_key=True)
= Column(String(100), nullable=False)
country = Column(String(500), nullable=False)
title = Column(String(1000), nullable=False)
url = Column(Boolean, default=False)
downloaded
# Define relationships with other tables
# This creates a link to the DocumentChunk table
= relationship("DocumentChunk", back_populates="document") chunks
Connecting to PostgreSQL
You won’t be able to store the type of data above without connecting to a database first.
If you want to test it as a standalone script, you can do the following:
import os
from dotenv import load_dotenv
load_dotenv()
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# The best practice is to store your database URL in the .env file
= os.getenv("DATABASE_URL")
DATABASE_URL
# Create engine
# it's a weird convention of a name but it simply means
# that we're creating a connection to the database
= create_engine(DATABASE_URL)
engine
# Create a session factory
= sessionmaker(bind=engine)
Session
# Create a session for database operations
= Session() session
Creating Tables in the Database
With a connection to the database, you can now let SQLAlchemy create the tables for you. All you need to do is import your Base
class from your models file and call the create_all
method.
# Remember to import Base from your models file
# wherever it is that you've defined your models
from .models import Base
# Create all tables defined in your models
Base.metadata.create_all(engine)
# Drop and recreate tables (use with caution!)
# Deletes all tables
Base.metadata.drop_all(engine) # Recreates them Base.metadata.create_all(engine)
2. Data Operations with SQLAlchemy
Adding Data
You’ve specified the structure of the table in your models file and created a connection to the database. Now you can add data to the database.
First you need to create an object of the class that you’ve defined in your models file. It’s like we’ve been doing with Pydantic’s BaseModel
and scrapy’s Item
:
# Create a new record
= Document(
new_document ="Germany",
country="Germany's NDC 2023",
title="https://example.com/germany-ndc",
url=False
downloaded )
Now you can add it to the database:
# Add it to the session
session.add(new_document)
# If you have multiple records at once:
session.add_all([doc1, doc2, doc3])
# Commit the transaction
# Just like in git
session.commit()
Querying Data
What if you need to check if your table has a particular record? You can do that by using the query
method.
Take a look at the very rich SQLAlchemy documentation on querying to understand more about how to query data with SQLAlchemy.
Here are several examples of how to do that:
# Get all records from a table
= session.query(Document).all()
all_docs
# Filter by a condition
# Returns ALL records that match the condition
# kind of like in pandas or a SQL query
= (
german_docs
session.query(Document)="Germany")
.filter_by(countryall()
.
)
# If you are sure there's only one record:
= (
germany_doc
session.query(Document)=1)
.filter_by(doc_id# use first
.first()
)
# More complex filtering
= session.query(Document).filter(
docs == False) &
(Document.downloaded "Germany", "France", "UK"]))
(Document.country.in_([all()
).
# Order results
= session.query(Document).order_by(Document.country).all()
docs
# Limit results
= session.query(Document).limit(10).all() docs
Updating Data
What if you need to update some columns of a particular record?
You can do that by first querying the record and then updating the columns you need to and committing the transaction:
# Update a single record
= session.query(Document).filter_by(id=1).first()
doc = True
doc.downloaded = datetime.datetime.now()
doc.downloaded_at
session.commit()
# Bulk update
(
session.query(Document)filter(Document.country == "Germany")
."downloaded": True})
.update({
) session.commit()
Deleting Data
If you need to delete a record, you can do that by first querying the record and then deleting it and committing the transaction:
⚠️ Warning: For the purposes of this course, avoid deleting data ‘manually’ from the database. Otherwise, your data pipeline and analysis will not be reproducible.
# Delete a single record
= session.query(Document).filter_by(id=1).first()
doc
session.delete(doc)
session.commit()
# Bulk delete
(
session.query(Document)filter(Document.downloaded == False)
.
.delete()
) session.commit()
Joining Tables
# Join Document and DocumentChunk tables
= (
results
session.query(Document, DocumentChunk)
.join(DocumentChunk, id == DocumentChunk.document_id)
Document.all()
.
)
# Filter joined results
= (
results
session.query(Document.country, DocumentChunk.text)
.join(DocumentChunk)filter(Document.country == "Germany")
.all() ).
3. Working with pgVector
The whole reason why we’re using PostgreSQL is because of the pgVector extension which allows us to store and query vector embeddings. We could have simply used the numpy arrays (like we did last week) but we’re likely to encounter memory issues with that approach.
PgVector is a PostgreSQL extension that introduces a new data type called vector
which allows us to store and query vector embeddings of a fixed dimensionality.
Setting Up pgVector Extension
This assumes pgVector is already installed on your database. This whole process should already be configured for you in the starter code but in case you need to do it manually, you can do the following:
from sqlalchemy import text
# Create the pgVector extension in your database
with engine.connect() as conn:
"CREATE EXTENSION IF NOT EXISTS vector;"))
conn.execute(text( conn.commit()
Defining a Model with Vector Column
Code not tested but should work:
from sqlalchemy.dialects.postgresql import ARRAY
class DocumentChunk(Base):
= 'document_chunks'
__tablename__
id = Column(Integer, primary_key=True)
= Column(Integer, ForeignKey('documents.id'))
document_id = Column(Text, nullable=False)
text
# Store vector embeddings as an array of floats
= Column('embedding', ARRAY(Float))
embedding
# Relationship back to the Document
= relationship("Document", back_populates="chunks") document
If the code above is not working, you can use the following code to create the table:
To be honest, I’ve only ever used the vector
type with a SQL query:
CREATE TABLE document_chunks (
id SERIAL PRIMARY KEY,
INTEGER REFERENCES documents(id),
document_id NOT NULL,
text TEXT 768)
embedding vector( );
This means that to create the table using the SQL code above while using SQLAlchemy, you need to do the following:
"""
session.execute(text( CREATE TABLE document_chunks (
id SERIAL PRIMARY KEY,
document_id INTEGER REFERENCES documents(id),
text TEXT NOT NULL,
embedding vector(768)
);
"""))
session.commit()
Storing Embeddings
import numpy as np
# Assuming you have a numpy array embedding from a model
= np.random.rand(768)
embedding_array
# Create a new chunk with the embedding
= DocumentChunk(
chunk =1,
document_id="This is a text chunk",
text=embedding_array.tolist()
embedding
)
session.add(chunk) session.commit()
Use pure SQL if the code above is not working
= text("""
insert_query INSERT INTO document_chunks (document_id, text, embedding)
VALUES (:document_id, :text, :embedding)
""")
= {"document_id": 1,
params "text": "This is a text chunk",
"embedding": embedding_array.tolist()}
session.execute(insert_query, params) session.commit()
Creating a Vector Similarity Index
from sqlalchemy import text
# Create an index for faster similarity search
# This should be done after your table has been created
with engine.connect() as conn:
"""
conn.execute(text( CREATE INDEX IF NOT EXISTS chunks_embedding_idx
ON document_chunks
USING ivfflat (embedding vector_cosine_ops);
"""))
conn.commit()
While it’s not necessary that you understand the details of the code above, it’s good to know that pgVector supports two types of indexes: ivfflat (official pgVector docs) and hnsw (official pgVector docs).
Performing Vector Similarity Search
I couldn’t find a cleaner way to do this than using a raw SQL query. This is similar to how we did it in ChatLSE:
from sqlalchemy import text
import numpy as np
# Your query embedding
= np.random.rand(768)
query_embedding
# Convert to list and format for PostgreSQL array syntax
= query_embedding.tolist()
embedding_str
# Execute a similarity search query using the <-> operator
# (cosine distance, smaller is more similar)
= session.execute(text("""
result SELECT id, document_id, text
FROM document_chunks
ORDER BY embedding <=> :embedding
LIMIT 5
"""), {"embedding": embedding_str})
# Process the results
for row in result:
print(f"Chunk ID: {row.id}, Document ID: {row.document_id}")
print(f"Text: {row.text}")
print("---")
The symbol <=>
is the pgVector operator for cosine similarity. This here is the major reason why we’re going through all this trouble of using PostgreSQL and pgVector.
Cosine similarity is not the only way to compare vectors, you can also use Euclidean distance or dot product. Click here to read more about the different operators supported by pgVector.
🔗 Additional Resources
If you want to dive deeper into the topics we covered:
💡 Reminder: The deadline for Problem Set 2 is Friday, 28 March 2025, 8pm UK time. Make sure to plan your time accordingly, especially for the more complex parts like vector search implementation.
💬 Drop-in Session
If there’s enough interest, I may hold a drop-in session on Friday to answer additional questions or address specific challenges you’re facing. Please let me know if this would be helpful for you.