๐Ÿ—ฃ๏ธ Week 11 Lecture

Advanced RAG Systems: Similarity Search and Data Pipelines

Author
Published

30 March 2025

DS205 course icon

Last Updated: 30 March 2025, 23:00

In this final lecture, weโ€™ll explore advanced techniques for similarity search and information retrieval with language models, moving beyond the basic methods weโ€™ve covered previously. Weโ€™ll also discuss practical implementation strategies for data pipelines and introduce the group project requirements.

๐Ÿ“ Session Details

  • Date: Monday, 31 March 2025
  • Time: 10:00 am - 12:00 pm
  • Location: KSW.1.04

๐Ÿ“ฅ Lecture Materials

I will be showing you the code examples in the lecture but I wonโ€™t share Jupyter Notebooks or scripts directly. This is because the code is interwoven with my own private code, which is almost a code solution for the โœ๏ธ Problem Set 2, so it would be a bit of a giveaway.

But I will share code snippets below that you can adapt into your own code!

๐Ÿ—ฃ๏ธ Lecture Content

1. Similarity Search with pgVector (10:00 - 10:30)

Here are some newer tips about code I shared during ๐Ÿ—ฃ๏ธ Week 09 Lecture and ๐Ÿ’ป Week 09 Lab which I believe will be more robust and work for a greater number of you.

These are things I have been playing around by myself since W08 and I have now converged on improved methods.

๐Ÿ’ก TIP 1:

Do not add the embedding column to the models.py version of the DocChunk class. Instead, leave it to your data pipeline script (e.g. tasks.py) to add it as a new column.

This means you could have a DocChunk class like this:

Click to expand/collapse code
class DocChunk(Base):
    """Model for document chunks created during processing."""
    __tablename__ = 'doc_chunks'

    doc_id = Column(String, 
                    ForeignKey('documents.doc_id'), 
                    nullable=False, primary_key=True)
    chunk_id = Column(Integer, primary_key=True) 
    page_number = Column(Integer, nullable=True)

    # Chunk content and metadata
    content = Column(Text, nullable=False)
    
    # Timestamps
    created_at = Column(DateTime(timezone=True), default=now_london_time)
    updated_at = Column(DateTime(timezone=True), 
                       default=now_london_time,
                       onupdate=now_london_time)

    def __repr__(self):
        return f"DocChunk(id={self.chunk_id}, doc_id={self.doc_id})"

๐Ÿ’ก TIP 2:

To get the TIP 1 to work, you need to be familiar with databases and SQL. 1 If you are NOT that familiar but still you want to use this approach, here are some code snippets that can help you:

1๏ธโƒฃ How to run raw SQL queries in SQLAlchemy
# Import necessary modules
import pandas as pd

from dotenv import load_dotenv
from sqlalchemy import text, create_engine
from sqlalchemy.orm import sessionmaker

# Load environment variables
# If you are following along with the problem set, 
# you should have a .env file in your project directory
load_dotenv()

# Create a connection to your database
engine = create_engine(os.getenv("DATABASE_URL"))

# Create a session
session = sessionmaker(engine)()

# A typical raw SQL query
# You must always use the text() function to pass the query as a string
your_raw_sql_query = text("""
SELECT *
FROM your_table
""")

# Execute the query
result = session.execute(your_raw_sql_query)

# Convert the result to a pandas DataFrame
df = pd.DataFrame(result.fetchall())

# Display the DataFrame
print(df)
2๏ธโƒฃ How to add a vector column to the DocChunk table

Your raw SQL query should look like this:

-- Remember to change the size of the vector to match the model you are using
ALTER TABLE doc_chunks ADD COLUMN embedding vector(768);

From within Python, hereโ€™s how you can make sure that the pgvector extension is installed and loaded:

# Assuming you have a session already created
session.execute(text("CREATE EXTENSION IF NOT EXISTS vector;"))

Then, to make sure that the embedding column is of the correct size, you can do the following:

# Assuming you have a session already created

raw_sql_query = text(f"ALTER TABLE doc_chunks ADD COLUMN embedding vector({EMBEDDING_DIMENSIONS});")

session.execute(raw_sql_query)
3๏ธโƒฃ How to insert a vector into the DocChunk table

Assuming:

  • You have a session already created
  • You have used a HuggingFace model to embed your text and have it in a numpy array

Then, you first need to format the array for PostgreSQL array syntax:

PgVector requires the syntax: [float,float,float,...] without spaces. So we need to join the array into a string with commas:

embedding_str = ",".join([str(value) for value in query_embedding])

Then, to insert the vector into the DocChunk table, either use f-strings or pass the values as a dictionary:

# Using f-strings
query = f"""
UPDATE doc_chunks 
SET embedding = '{embedding_str}' 
WHERE doc_id = '{doc_id}' 
AND chunk_id = '{chunk_id}'
"""

session.execute(text(query))

# Using a dictionary
query = """
UPDATE doc_chunks 
SET embedding = :embedding 
WHERE doc_id = :doc_id 
AND chunk_id = :chunk_id
"""

session.execute(text(query), {"embedding": embedding_str, "doc_id": doc_id, "chunk_id": chunk_id})
4๏ธโƒฃ How to run a similarity search query

I like to filter per country first, as Iโ€™m always only interested in a single country at a time.

To do that without/before adding the embedding column, you can do the following:

SELECT 
    doc_chunks.doc_id,
    doc_chunks.chunk_id,
    doc_chunks.content,
    documents.country
FROM doc_chunks
LEFT JOIN documents ON doc_chunks.doc_id = documents.doc_id
WHERE documents.country = 'United States'

To use the embedding column and actually run a similarity search, you can adjust the query as follows:

SELECT 
    doc_chunks.doc_id,
    doc_chunks.chunk_id,
    doc_chunks.content,
    documents.country,
    -- The <=> operator is used to calculate the cosine distance
    (doc_chunks.embedding <=> :embedding) AS similarity
FROM doc_chunks
LEFT JOIN documents ON doc_chunks.doc_id = documents.doc_id
WHERE documents.country = 'Australia'
ORDER BY similarity DESC
LIMIT 5

2. How to โ€˜convertโ€™ this into a RAG system (10:30 - 11:00)

RAG typically involves the following steps:

  1. Document Chunking
  2. Embedding
  3. Vector Search
  4. Response Generation

Response generation these days takes the form of a prompt that is given to a (properly large) language model. The language models we have been using for embeddings wonโ€™t work well for this.

Click here for an example of a chat using Qwen2.5-3B-Instruct

Here is an example, taken from the official HuggingFace documentation, of how to use a large(-ish) language model to generate a response to a user query.

Donโ€™t try this on Nuvolos ๐Ÿ˜ฌ

from transformers import AutoModelForCausalLM, AutoTokenizer

model_name = "Qwen/Qwen2.5-3B-Instruct"

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype="auto",
    device_map="auto"
)
tokenizer = AutoTokenizer.from_pretrained(model_name)

prompt = "Give me a short introduction to large language model."
messages = [
    {"role": "system", "content": "You are Qwen, created by Alibaba Cloud. You are a helpful assistant."},
    {"role": "user", "content": prompt}
]
text = tokenizer.apply_chat_template(
    messages,
    tokenize=False,
    add_generation_prompt=True
)
model_inputs = tokenizer([text], return_tensors="pt").to(model.device)

generated_ids = model.generate(
    **model_inputs,
    max_new_tokens=512
)
generated_ids = [
    output_ids[len(input_ids):] for input_ids, output_ids in zip(model_inputs.input_ids, generated_ids)
]

response = tokenizer.batch_decode(generated_ids, skip_special_tokens=True)[0]

print(response)
Climate policy RAG system example with source attribution

A key aspect of RAG systems is source attribution. Hereโ€™s how to build a system that uses the LLM to cite the sources it uses:

๐Ÿšจ Sadly, the current architecture of LLMs is still unreliable, and might sometimes simply make stuff up, even with the citations and despite the system prompt!

# System prompt template
system_prompt = """
You are an expert in climate policy analysis.
Answer the question SOLELY based on the provided document chunks.
Do not make any assumptions or rely on external knowledge.
Sources come in the form of [Doc ID: <id>, Chunk ID: <id>, Content: <chunk>].
Every time you use a source, cite it using the format [Doc ID: <id>, Chunk ID: <id>].
"""

# Function to format retrieval results into prompt
def format_context(results):
    context = []
    for row in results:
        context.append(f"[Doc ID: {row.doc_id}, Chunk ID: {row.chunk_id}]\n{row.text}")
    return "\n\n".join(context)

# Function to generate response
def generate_response(query, context):
    prompt = f"{system_prompt}\n\nContext:\n{context}\n\nQuestion: {query}\n\nAnswer:"
    inputs = tokenizer(prompt, return_tensors="pt")
    output = model.generate(**inputs, max_length=1024)
    return tokenizer.decode(output[0], skip_special_tokens=True)

This approach: 1. Clearly defines expectations for the model via the system prompt 2. Properly formats the retrieved chunks with their source IDs 3. Instructs the model to cite its sources when answering 4. Works with various HuggingFace models, not just Qwen

3. From tasks.py to Airflow DAGs (11:00 - 11:30)

Airflow represents workflows as Directed Acyclic Graphs (DAGs), which are collections of tasks with dependencies between them, not that dissimilar from the tasks.py CLI approach we have been using.

The advantage of Airflow is that it allows you to schedule, monitor, and manage your workflows more efficiently, and you can use it directly in the most popular cloud platforms (AWS, Azure, GCP).

๐Ÿ’ก Key Airflow Concepts:

  • DAG: A directed acyclic graph representing your workflow
  • Operators: The building blocks of DAGs that represent a single task
  • Tasks: Parameterised instances of operators
  • Task Dependencies: Define the order in which tasks should be executed

Converting tasks.py to an Airflow DAG

Below is a vibe-coded/imagined example of how our tasks.py CLI commands could look like in an Airflow DAG. Adapted using code from the official Airflow v2.0.1 Tutorial.

How our Airflow DAG could look like
from datetime import timedelta

# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG

# Operators; we need this to operate!
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago

# These args will get passed on to each operator
default_args = {
    'owner': 'climate_team',
    'depends_on_past': False,
    'email': ['climate_alerts@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define the DAG
dag = DAG(
    'climate_policy_extractor',
    default_args=default_args,
    description='Climate policy information extraction pipeline',
    schedule_interval=timedelta(days=1),
    start_date=days_ago(2),
    tags=['climate', 'nlp'],
)

# Define the functions that will be our tasks
def init_db_func():
    """Initialize database function"""
    from climate_policy_extractor.models import Base, get_db_session
    from sqlalchemy import text, create_engine
    import os
    
    engine = create_engine(os.getenv("DATABASE_URL"))
    Base.metadata.create_all(engine)
    with engine.connect() as conn:
        conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector;"))
        conn.commit()
    return "Database initialized"

def crawl_func():
    """Crawl NDC documents"""
    from climate_policy_extractor.spiders.ndc_spider import NDCSpider
    from scrapy.crawler import CrawlerProcess
    from scrapy.utils.project import get_project_settings
    
    settings = get_project_settings()
    process = CrawlerProcess(settings)
    process.crawl(NDCSpider)
    process.start()
    return "Documents crawled"

def download_func():
    """Download PDF documents"""
    from climate_policy_extractor.models import get_db_session
    from tasks.downloaders import process_downloads
    import os
    
    session = get_db_session(os.getenv("DATABASE_URL"))
    download_dir = os.getenv("DOWNLOAD_DIR", "data/pdfs")
    total, successful = process_downloads(session, download_dir)
    session.close()
    return f"Downloaded {successful} of {total} documents"

def chunk_func():
    """Chunk the PDF documents"""
    from climate_policy_extractor.models import get_db_session, NDCDocumentModel, DocChunk
    from tasks.chunking import extract_text_data_from_pdf
    from sqlalchemy import and_
    import os
    
    session = get_db_session(os.getenv("DATABASE_URL"))
    # Implementation details here
    session.close()
    return "Documents chunked"

def embed_func():
    """Generate embeddings for chunks"""
    from transformers import AutoTokenizer, AutoModel
    import os
    
    # Implementation details here
    return "Embeddings generated"

# Create the tasks by instantiating operators
init_db = PythonOperator(
    task_id='init_db',
    python_callable=init_db_func,
    dag=dag,
)

crawl = PythonOperator(
    task_id='crawl',
    python_callable=crawl_func,
    dag=dag,
)

download = PythonOperator(
    task_id='download',
    python_callable=download_func,
    dag=dag,
)

chunk = PythonOperator(
    task_id='chunk',
    python_callable=chunk_func,
    dag=dag,
)

embed = PythonOperator(
    task_id='embed',
    python_callable=embed_func,
    dag=dag,
)

# Set the dependencies between tasks
init_db >> crawl >> download >> chunk >> embed

Key Benefits of Airflow

Why Airflow is more scalable/better than our tasks.py script
  1. Scheduling: Rather than manually running commands, Airflow can automatically execute your pipeline at defined intervals.

  2. Visualisation: Airflow provides a web UI where you can see the status of your DAGs and tasks:

    • View the DAG structure as a graph
    • Monitor running tasks in real-time
    • Inspect logs directly from the UI
    • View historical runs and their outcomes

    See screenshots here.

  3. Error Handling: Airflow provides robust error handling:

    • Automatic retries for failed tasks
    • Email notifications on failures
    • Custom callbacks for different events (failure, success, retry)
  4. Scalability: Tasks can be distributed across multiple workers, enabling parallel execution.

  5. Parameterisation: Easily pass parameters to tasks and between tasks using XComs.

  6. Testing: Built-in commands for testing tasks and DAGs:

    # Test a specific task
    airflow tasks test climate_policy_extractor init_db 2023-01-01
    
    # Test the entire DAG for a specific date
    airflow dags test climate_policy_extractor 2023-01-01
  7. Backfilling: Efficiently run your pipeline for historical dates:

    airflow dags backfill climate_policy_extractor \
        --start-date 2023-01-01 \
        --end-date 2023-01-07

Important Airflow Concepts

Itโ€™s important to understand that Airflow DAG definition files are configuration files that define structure - they donโ€™t process data directly:

One concept that might not be immediately intuitive is that this Airflow Python script functions purely as a configuration file specifying the DAGโ€™s structure as code. The actual tasks defined here will run in a different context from the script itself. Different tasks run on different workers at different points in time, which means that this script cannot be used for cross-communication between tasks.

For our climate policy extraction pipeline, this means:

  • Each function must load its own database connection
  • You cannot share variables between tasks in the DAG file
  • For communication between tasks, you would use Airflowโ€™s XComs feature

4. Group Project Introduction (11:45 - 12:00)

  • Project options and selection process
  • Team formation guidelines
  • Evaluation criteria
  • Timeline and deliverables

(The conversation/definition of scope for groups will continue in the lab on Tuesday, 1 April 2025)

๐ŸŽฅ Session Recording

The lecture recording will be available on Moodle by the afternoon of the lecture.

Footnotes

  1. Remember that the only reason we are using a database is because want to calculate embedding similarities in a performant way. Databases are efficient at this. If you were to do just like how we did in W08, you would have to load the entire document into memory, which is not feasible for a large number of documents. Your computer will run out of memory.โ†ฉ๏ธŽ