๐ฃ๏ธ Week 11 Lecture
Advanced RAG Systems: Similarity Search and Data Pipelines

Last Updated: 30 March 2025, 23:00
In this final lecture, weโll explore advanced techniques for similarity search and information retrieval with language models, moving beyond the basic methods weโve covered previously. Weโll also discuss practical implementation strategies for data pipelines and introduce the group project requirements.
๐ Session Details
- Date: Monday, 31 March 2025
- Time: 10:00 am - 12:00 pm
- Location: KSW.1.04
๐ฅ Lecture Materials
I will be showing you the code examples in the lecture but I wonโt share Jupyter Notebooks or scripts directly. This is because the code is interwoven with my own private code, which is almost a code solution for the โ๏ธ Problem Set 2, so it would be a bit of a giveaway.
But I will share code snippets below that you can adapt into your own code!
๐ฃ๏ธ Lecture Content
1. Similarity Search with pgVector (10:00 - 10:30)
Here are some newer tips about code I shared during ๐ฃ๏ธ Week 09 Lecture and ๐ป Week 09 Lab which I believe will be more robust and work for a greater number of you.
These are things I have been playing around by myself since W08 and I have now converged on improved methods.
๐ก TIP 1:
Do not add the embedding
column to the models.py
version of the DocChunk
class. Instead, leave it to your data pipeline script (e.g. tasks.py
) to add it as a new column.
This means you could have a DocChunk
class like this:
Click to expand/collapse code
class DocChunk(Base):
"""Model for document chunks created during processing."""
= 'doc_chunks'
__tablename__
= Column(String,
doc_id 'documents.doc_id'),
ForeignKey(=False, primary_key=True)
nullable= Column(Integer, primary_key=True)
chunk_id = Column(Integer, nullable=True)
page_number
# Chunk content and metadata
= Column(Text, nullable=False)
content
# Timestamps
= Column(DateTime(timezone=True), default=now_london_time)
created_at = Column(DateTime(timezone=True),
updated_at =now_london_time,
default=now_london_time)
onupdate
def __repr__(self):
return f"DocChunk(id={self.chunk_id}, doc_id={self.doc_id})"
๐ก TIP 2:
To get the TIP 1 to work, you need to be familiar with databases and SQL. 1 If you are NOT that familiar but still you want to use this approach, here are some code snippets that can help you:
1๏ธโฃ How to run raw SQL queries in SQLAlchemy
# Import necessary modules
import pandas as pd
from dotenv import load_dotenv
from sqlalchemy import text, create_engine
from sqlalchemy.orm import sessionmaker
# Load environment variables
# If you are following along with the problem set,
# you should have a .env file in your project directory
load_dotenv()
# Create a connection to your database
= create_engine(os.getenv("DATABASE_URL"))
engine
# Create a session
= sessionmaker(engine)()
session
# A typical raw SQL query
# You must always use the text() function to pass the query as a string
= text("""
your_raw_sql_query SELECT *
FROM your_table
""")
# Execute the query
= session.execute(your_raw_sql_query)
result
# Convert the result to a pandas DataFrame
= pd.DataFrame(result.fetchall())
df
# Display the DataFrame
print(df)
2๏ธโฃ How to add a vector column to the DocChunk table
Your raw SQL query should look like this:
-- Remember to change the size of the vector to match the model you are using
ALTER TABLE doc_chunks ADD COLUMN embedding vector(768);
From within Python, hereโs how you can make sure that the pgvector
extension is installed and loaded:
# Assuming you have a session already created
"CREATE EXTENSION IF NOT EXISTS vector;")) session.execute(text(
Then, to make sure that the embedding
column is of the correct size, you can do the following:
# Assuming you have a session already created
= text(f"ALTER TABLE doc_chunks ADD COLUMN embedding vector({EMBEDDING_DIMENSIONS});")
raw_sql_query
session.execute(raw_sql_query)
3๏ธโฃ How to insert a vector into the DocChunk table
Assuming:
- You have a session already created
- You have used a HuggingFace model to embed your text and have it in a numpy array
Then, you first need to format the array for PostgreSQL array syntax:
PgVector requires the syntax: [float,float,float,...]
without spaces. So we need to join the array into a string with commas:
= ",".join([str(value) for value in query_embedding]) embedding_str
Then, to insert the vector into the DocChunk table, either use f-strings or pass the values as a dictionary:
# Using f-strings
= f"""
query UPDATE doc_chunks
SET embedding = '{embedding_str}'
WHERE doc_id = '{doc_id}'
AND chunk_id = '{chunk_id}'
"""
session.execute(text(query))
# Using a dictionary
= """
query UPDATE doc_chunks
SET embedding = :embedding
WHERE doc_id = :doc_id
AND chunk_id = :chunk_id
"""
"embedding": embedding_str, "doc_id": doc_id, "chunk_id": chunk_id}) session.execute(text(query), {
4๏ธโฃ How to run a similarity search query
I like to filter per country first, as Iโm always only interested in a single country at a time.
To do that without/before adding the embedding
column, you can do the following:
SELECT
doc_chunks.doc_id,
doc_chunks.chunk_id,
doc_chunks.content,
documents.countryFROM doc_chunks
LEFT JOIN documents ON doc_chunks.doc_id = documents.doc_id
WHERE documents.country = 'United States'
To use the embedding column and actually run a similarity search, you can adjust the query as follows:
SELECT
doc_chunks.doc_id,
doc_chunks.chunk_id,
doc_chunks.content,
documents.country,-- The <=> operator is used to calculate the cosine distance
<=> :embedding) AS similarity
(doc_chunks.embedding FROM doc_chunks
LEFT JOIN documents ON doc_chunks.doc_id = documents.doc_id
WHERE documents.country = 'Australia'
ORDER BY similarity DESC
LIMIT 5
2. How to โconvertโ this into a RAG system (10:30 - 11:00)
RAG typically involves the following steps:
- Document Chunking
- Embedding
- Vector Search
- Response Generation
Response generation these days takes the form of a prompt that is given to a (properly large) language model. The language models we have been using for embeddings wonโt work well for this.
Click here for an example of a chat using Qwen2.5-3B-Instruct
Here is an example, taken from the official HuggingFace documentation, of how to use a large(-ish) language model to generate a response to a user query.
Donโt try this on Nuvolos ๐ฌ
from transformers import AutoModelForCausalLM, AutoTokenizer
= "Qwen/Qwen2.5-3B-Instruct"
model_name
= AutoModelForCausalLM.from_pretrained(
model
model_name,="auto",
torch_dtype="auto"
device_map
)= AutoTokenizer.from_pretrained(model_name)
tokenizer
= "Give me a short introduction to large language model."
prompt = [
messages "role": "system", "content": "You are Qwen, created by Alibaba Cloud. You are a helpful assistant."},
{"role": "user", "content": prompt}
{
]= tokenizer.apply_chat_template(
text
messages,=False,
tokenize=True
add_generation_prompt
)= tokenizer([text], return_tensors="pt").to(model.device)
model_inputs
= model.generate(
generated_ids **model_inputs,
=512
max_new_tokens
)= [
generated_ids len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
output_ids[
]
= tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]
response
print(response)
Climate policy RAG system example with source attribution
A key aspect of RAG systems is source attribution. Hereโs how to build a system that uses the LLM to cite the sources it uses:
๐จ Sadly, the current architecture of LLMs is still unreliable, and might sometimes simply make stuff up, even with the citations and despite the system prompt!
# System prompt template
= """
system_prompt You are an expert in climate policy analysis.
Answer the question SOLELY based on the provided document chunks.
Do not make any assumptions or rely on external knowledge.
Sources come in the form of [Doc ID: <id>, Chunk ID: <id>, Content: <chunk>].
Every time you use a source, cite it using the format [Doc ID: <id>, Chunk ID: <id>].
"""
# Function to format retrieval results into prompt
def format_context(results):
= []
context for row in results:
f"[Doc ID: {row.doc_id}, Chunk ID: {row.chunk_id}]\n{row.text}")
context.append(return "\n\n".join(context)
# Function to generate response
def generate_response(query, context):
= f"{system_prompt}\n\nContext:\n{context}\n\nQuestion: {query}\n\nAnswer:"
prompt = tokenizer(prompt, return_tensors="pt")
inputs = model.generate(**inputs, max_length=1024)
output return tokenizer.decode(output[0], skip_special_tokens=True)
This approach: 1. Clearly defines expectations for the model via the system prompt 2. Properly formats the retrieved chunks with their source IDs 3. Instructs the model to cite its sources when answering 4. Works with various HuggingFace models, not just Qwen
3. From tasks.py to Airflow DAGs (11:00 - 11:30)
Airflow represents workflows as Directed Acyclic Graphs (DAGs), which are collections of tasks with dependencies between them, not that dissimilar from the tasks.py
CLI approach we have been using.
The advantage of Airflow is that it allows you to schedule, monitor, and manage your workflows more efficiently, and you can use it directly in the most popular cloud platforms (AWS, Azure, GCP).
๐ก Key Airflow Concepts:
- DAG: A directed acyclic graph representing your workflow
- Operators: The building blocks of DAGs that represent a single task
- Tasks: Parameterised instances of operators
- Task Dependencies: Define the order in which tasks should be executed
Converting tasks.py to an Airflow DAG
Below is a vibe-coded/imagined example of how our tasks.py
CLI commands could look like in an Airflow DAG. Adapted using code from the official Airflow v2.0.1 Tutorial.
How our Airflow DAG could look like
from datetime import timedelta
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
# These args will get passed on to each operator
= {
default_args 'owner': 'climate_team',
'depends_on_past': False,
'email': ['climate_alerts@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Define the DAG
= DAG(
dag 'climate_policy_extractor',
=default_args,
default_args='Climate policy information extraction pipeline',
description=timedelta(days=1),
schedule_interval=days_ago(2),
start_date=['climate', 'nlp'],
tags
)
# Define the functions that will be our tasks
def init_db_func():
"""Initialize database function"""
from climate_policy_extractor.models import Base, get_db_session
from sqlalchemy import text, create_engine
import os
= create_engine(os.getenv("DATABASE_URL"))
engine
Base.metadata.create_all(engine)with engine.connect() as conn:
"CREATE EXTENSION IF NOT EXISTS vector;"))
conn.execute(text(
conn.commit()return "Database initialized"
def crawl_func():
"""Crawl NDC documents"""
from climate_policy_extractor.spiders.ndc_spider import NDCSpider
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
= get_project_settings()
settings = CrawlerProcess(settings)
process
process.crawl(NDCSpider)
process.start()return "Documents crawled"
def download_func():
"""Download PDF documents"""
from climate_policy_extractor.models import get_db_session
from tasks.downloaders import process_downloads
import os
= get_db_session(os.getenv("DATABASE_URL"))
session = os.getenv("DOWNLOAD_DIR", "data/pdfs")
download_dir = process_downloads(session, download_dir)
total, successful
session.close()return f"Downloaded {successful} of {total} documents"
def chunk_func():
"""Chunk the PDF documents"""
from climate_policy_extractor.models import get_db_session, NDCDocumentModel, DocChunk
from tasks.chunking import extract_text_data_from_pdf
from sqlalchemy import and_
import os
= get_db_session(os.getenv("DATABASE_URL"))
session # Implementation details here
session.close()return "Documents chunked"
def embed_func():
"""Generate embeddings for chunks"""
from transformers import AutoTokenizer, AutoModel
import os
# Implementation details here
return "Embeddings generated"
# Create the tasks by instantiating operators
= PythonOperator(
init_db ='init_db',
task_id=init_db_func,
python_callable=dag,
dag
)
= PythonOperator(
crawl ='crawl',
task_id=crawl_func,
python_callable=dag,
dag
)
= PythonOperator(
download ='download',
task_id=download_func,
python_callable=dag,
dag
)
= PythonOperator(
chunk ='chunk',
task_id=chunk_func,
python_callable=dag,
dag
)
= PythonOperator(
embed ='embed',
task_id=embed_func,
python_callable=dag,
dag
)
# Set the dependencies between tasks
>> crawl >> download >> chunk >> embed init_db
Key Benefits of Airflow
Why Airflow is more scalable/better than our tasks.py script
Scheduling: Rather than manually running commands, Airflow can automatically execute your pipeline at defined intervals.
Visualisation: Airflow provides a web UI where you can see the status of your DAGs and tasks:
- View the DAG structure as a graph
- Monitor running tasks in real-time
- Inspect logs directly from the UI
- View historical runs and their outcomes
See screenshots here.
Error Handling: Airflow provides robust error handling:
- Automatic retries for failed tasks
- Email notifications on failures
- Custom callbacks for different events (failure, success, retry)
Scalability: Tasks can be distributed across multiple workers, enabling parallel execution.
Parameterisation: Easily pass parameters to tasks and between tasks using XComs.
Testing: Built-in commands for testing tasks and DAGs:
# Test a specific task airflow tasks test climate_policy_extractor init_db 2023-01-01 # Test the entire DAG for a specific date airflow dags test climate_policy_extractor 2023-01-01
Backfilling: Efficiently run your pipeline for historical dates:
airflow dags backfill climate_policy_extractor \ --start-date 2023-01-01 \ --end-date 2023-01-07
Important Airflow Concepts
Itโs important to understand that Airflow DAG definition files are configuration files that define structure - they donโt process data directly:
One concept that might not be immediately intuitive is that this Airflow Python script functions purely as a configuration file specifying the DAGโs structure as code. The actual tasks defined here will run in a different context from the script itself. Different tasks run on different workers at different points in time, which means that this script cannot be used for cross-communication between tasks.
For our climate policy extraction pipeline, this means:
- Each function must load its own database connection
- You cannot share variables between tasks in the DAG file
- For communication between tasks, you would use Airflowโs XComs feature
4. Group Project Introduction (11:45 - 12:00)
- Project options and selection process
- Team formation guidelines
- Evaluation criteria
- Timeline and deliverables
(The conversation/definition of scope for groups will continue in the lab on Tuesday, 1 April 2025)
๐ฅ Session Recording
The lecture recording will be available on Moodle by the afternoon of the lecture.
Footnotes
Remember that the only reason we are using a database is because want to calculate embedding similarities in a performant way. Databases are efficient at this. If you were to do just like how we did in W08, you would have to load the entire document into memory, which is not feasible for a large number of documents. Your computer will run out of memory.โฉ๏ธ