A Pydantic Graph may be a better alternative to LangGraph. For sure, Pydantic Graph has way better documentation, but if you compare any documentation to the one from LangChain, you set the bar really low. So, what else does it have to offer? As with every Pydantic project, type hints! However, a graph may be an overkill. The PydanticAI authors state in the documentation: “If you’re not confident a graph-based approach is a good idea, it might be unnecessary.” and “pydantic-graph is designed for advanced users and makes heavy use of Python generics and types hints. It is not designed to be as beginner-friendly as PydanticAI.” If this doesn’t scare you off, let’s see how to use it.

Table of Contents

  1. Pydantic Graph Installation
  2. Basic Graph Structure in Pydantic Graph
  3. Graphs with State
  4. Adding AI Agents to the Graph
  5. Monitoring and Debugging

One more thing. Graph support is in the very early beta stage. If you read this article a couple months after it was published and the code doesn’t work anymore, let me know, I will update the text.

Pydantic Graph Installation

The graph library is provided as a separate package: pydantic-graph. It has no dependency on pydantic-ai, so if you want to use AI Agents, you must install both. However, the lack of dependencies makes it possible to use the pydantic-graph library for any project that requires graph-based state machines or workflows.

I will use the library with OpenAI API, so I need to install the PydanticAI client.

pip install pydantic-graph pydantic-ai-slim[openai]

The versions I installed:

  • pydantic-ai-slim-0.0.24
  • pydantic-graph-0.0.24

Basic Graph Structure in Pydantic Graph

The graph consists of classes that inherit from BaseNode and implement the run method. The method has the return type of any other BaseNode class, the End class (to mark the end of the graph), or a union of multiple such classes. The End class requires a type annotation to specify the return value type.

When we return a value from the run method, the graph will continue execution by calling the next node. This creates a flexible graph structure because, at any point, we can return a different value type. At the same time, all return values must be specified in the return type annotation so all possible paths are known in advance.

The graph may have an execution state and dependencies, but passing data between nodes without using a global state is possible. The example below starts with a simple graph with no global state.

Let’s imagine a silly example. We want to count the total number of words in all files in a given directory. We will implement a graph equivalent of calling find _posts -type f -exec cat {} \; | wc -w in the terminal. (Not because it makes sense, but because we need an example. Don’t put such silly graphs in production.)

import os
from __future__ import annotations
from dataclasses import dataclass
from typing import List
from pathlib import Path

from pydantic_graph import BaseNode, End, Graph, GraphRunContext

Each node data class may contain a field if we want to pass data between nodes. Let’s define a node to find all files in the given directory.

@dataclass
class ListFiles(BaseNode):
  path: str

  async def run(self, ctx: GraphRunContext) -> CountWordsInFiles:
    file_paths = []
    try:
        for file_path in Path(self.path).iterdir():
            if file_path.is_file():
                file_paths.append(str(file_path))
    except FileNotFoundError:
        return CountWordsInFiles([])

    return CountWordsInFiles(file_paths)

In the next node (CountWordsInFiles), we will open all files and count the words. Then, we pass the array of file lengths to the final node to sum them up.

@dataclass
class CountWordsInFiles(BaseNode):
  file_paths: List[str]

  async def run(self, ctx: GraphRunContext) -> SumLengths:
    file_lengths = []

    for file_path in self.file_paths:
        try:
            with open(file_path, 'r') as f:
                content = f.read()
                words = content.split(" ")
                file_lengths.append(len(words))
        except:
            continue

    return SumLengths(file_lengths)

The final node is different. Its run method returns the End class with a type annotation. We must add the same type annotation to the BaseNode class. However, because the BaseNode also defines type annotation for the graph dependencies and state, we put the return type annotation at the third position of the type annotation and leave the other two as None.

@dataclass
class SumLengths(BaseNode[None, None, int]):
  file_lengths: List[int]

  async def run(self, ctx: GraphRunContext) -> End[int]:
    return End(sum(self.file_lengths))

We can finally define the graph. Note we specify only the graph nodes. Edges will be inferred automatically from the run method return values.

sum_file_lengths_graph = Graph(nodes=[ListFiles, CountWordsInFiles, SumLengths])

To use the graph, we create an instance of the first node we want to execute and call the run or run_sync method. The method returns the result value and the history of graph execution.

result, history = sum_file_lengths_graph.run_sync(ListFiles("/content"))
print(result)
print([item.data_snapshot() for item in history])

In my case, the output was:

8
[ListFiles(path='/content'), CountWordsInFiles(file_paths=['/content/test.txt', '/content/test2.txt']), SumLengths(file_lengths=[4, 4]), End(data=8)]

Want to build AI systems that actually work?

Download my expert-crafted GenAI Transformation Guide for Data Teams and discover how to properly measure AI performance, set up guardrails, and continuously improve your AI solutions like the pros.

Graphs with State

The state is a global variable shared between nodes. Every node can read and write to the state. When we start the graph execution, we must specify the initial state. As with everything in Pydantic Graph, the state is a data class. For example, let’s define a state for a graph that sends an async request and waits for the response by periodically polling the server.

@dataclass
class RequestState:
  request_id: Optional[str]
  wait_time: int
  max_wait_time: int

We can retrieve data from the state and modify the global state in any node. Let’s define a node that pretends to check the server status and may loop while waiting, move to the next node, or move to a node that indicates an error. While doing so, the node updates the current wait time stored in the global state and reads the maximum wait time from the state.

@dataclass
class CheckStatus(BaseNode):
  async def run(self, ctx:GraphRunContext) -> CheckStatus | GetDocument | WaitTimeExceeded:
    time.sleep(1)
    found = bool(random.randint(0, 1))
    if found:
      return GetDocument()
    else:
      ctx.state.wait_time += 1
      if ctx.state.wait_time > ctx.state.max_wait_time:
        return WaitTimeExceeded()

      return CheckStatus()

We get to the state through the ctx.state object, and we can directly modify its fields. When we run the graph, we pass the initial state as one of the run/run_sync method arguments.

result, history = await get_document_graph.run(RequestDocument(), state=RequestState(None, 0, 5))

Adding AI Agents to the Graph

Because Pydantic Graph and PydanticAI Agents are separate projects, using them together doesn’t require any special setup. For the Graph library, AI Agents are just Python code.

We can define an AI agent outside the graph and use the agent in the node by calling the run method.

from pydantic_ai import Agent


summarization_agent = Agent(
    'openai:gpt-4o-mini',
    result_type=str,
    system_prompt='Write a summary of the given document',
)

...

@dataclass
class GenerateDocumentSummary(BaseNode[None, None, str]):
  document_content: str

  async def run(self, ctx: GraphRunContext) -> End[str]:
    result = await summarization_agent.run(
        self.document_content
    )
    return End(result.data)

Monitoring and Debugging

Every Pydantic project supports tracking data in Logfire, and Pydantic Graph is no exception. If we install the pydantic-ai[logfire] library and enable instrumentation, we can track the graph execution and the AI Agents inside the graph.

import logfire

logfire.configure(token='...')
logfire.instrument_openai()

When we run the code, we can see the logs in the Logfire dashboard.

Logfire dashboard
Logfire dashboard

Do you need help building AI-powered applications for your business?
You can hire me!

Want to build AI systems that actually work?

Download my expert-crafted GenAI Transformation Guide for Data Teams and discover how to properly measure AI performance, set up guardrails, and continuously improve your AI solutions like the pros.

Older post

Why is it so hard to correctly estimate AI projects?

Why can't you estimate an AI project correctly and can you do anything about it?

Are you looking for an experienced AI consultant? Do you need assistance with your RAG or Agentic Workflow?
Book a Quick Consultation, send me a message on LinkedIn. Book a Quick Consultation or send me a message on LinkedIn

>