Skip to content

The Firefly Agentic Framework — Complete Tutorial

Copyright 2026 Firefly Software Foundation. Licensed under the Apache License 2.0.

From Zero to Expert — This tutorial walks you through every module in fireflyframework-agentic by building a real project from scratch: an Intelligent Document Processing (IDP) pipeline that classifies, digitises, extracts, validates, and assembles data from invoices.

Each chapter introduces a concept, explains why it exists, shows how it works with architecture diagrams, and immediately applies it to the IDP pipeline. By Chapter 18 you will have a production-grade GenAI application that uses agents, tools, prompts, reasoning patterns, content processing, memory, validation, pipelines, observability, explainability, experiments, multi-agent delegation, template agents, and a plugin system — all wired together.


Table of Contents

Part I — Foundation 1. Introduction — What, who, why, design principles, running example 2. Installation & Project Setup — UV, extras, FireflyAgenticConfig, model providers 3. Your First AgentFireflyAgent, @firefly_agent, registry, context, lifecycle (diagram) 4. Tools@firefly_tool, ToolBuilder, guards, composition, built-ins, ToolKit, attaching tools to agents (diagram) 5. Prompts — Jinja2 templates, versioning, composition, validation, file loading

Part II — Intelligence

  1. Reasoning Patterns — ReAct, CoT, Plan-and-Execute, Reflexion, Tree of Thoughts, Goal Decomposition, custom patterns (diagram)
  2. Content Processing — Chunking, batch processing, compression, sliding window
  3. Memory — Conversation memory, working memory, backends, MemoryManager, forking, integration points (diagram)
  4. Validation & Quality of Service — Rules, OutputValidator, QoS checks, OutputReviewer

Part III — Orchestration & Operations

  1. Pipeline — DAG engine, step executors, fan-out/fan-in, retries, PipelineContext (diagram)
  2. Observability — Tracing, metrics, events, OpenTelemetry exporters
  3. Explainability — Decisions, explanations, audit trail, reports

Part IV — Experimentation

  1. Experiments — A/B testing, variant comparison, tracking
  2. Lab — Interactive sessions, benchmarks, model comparison, eval datasets

Part V — Advanced

  1. Template Agents — Summariser, classifier, extractor, conversational, router
  2. Multi-Agent Delegation — Delegation router, strategies, memory forking (diagram)
  3. Plugin System — Entry-point discovery, packaging agents/tools/patterns
  4. Putting It All Together — Full IDP implementation, project structure, production checklist (full system diagram)

Part I — Foundation


Chapter 1: Introduction

What Is fireflyframework-agentic?

fireflyframework-agentic is a GenAI metaframework — it sits on top of Pydantic AI and provides the structure, patterns, and production-grade plumbing that Pydantic AI deliberately leaves to the application developer.

Think of Pydantic AI as the engine and fireflyframework-agentic as the car around it: the steering, brakes, GPS, dashboard, and everything else you need to actually drive to your destination.

Who Is It For?

  • Backend engineers building GenAI features into existing applications.
  • ML/AI engineers who want structured reasoning, validation, and observability out of the box.

The Four Design Principles

The framework is guided by four principles that show up in every module:

  1. Protocol-driven contracts — Public APIs are Python Protocol classes or abstract base classes. You can swap or extend any component without touching framework internals.
  2. Convention over configuration — Sensible defaults everywhere. One FireflyAgenticConfig object (backed by Pydantic Settings) centralises every knob and reads from environment variables automatically.
  3. Layered composition — Modules are organised into layers (Core, Agent, Intelligence, Experimentation, Orchestration). Higher layers depend on lower layers, never the reverse.
  4. Optional dependencies — Heavy libraries (embedding providers, vector store clients, storage backends) are declared as extras. The core framework imports them lazily so you only install what you use.

The Running Example: Intelligent Document Processing

Throughout this tutorial we build an IDP pipeline that processes invoices:

Raw Document → Classify → Digitise (OCR) → Extract Fields → Validate → Assemble Result

Every chapter teaches a framework concept and immediately applies it to a phase of this pipeline. By Chapter 18 you will have the complete, production-ready system.


Chapter 2: Installation & Project Setup

Prerequisites

Creating a New Project

mkdir idp-service && cd idp-service
uv init
uv add fireflyframework-agentic

This installs the core framework with its minimal dependencies: pydantic-ai, pydantic, pydantic-settings, jinja2, and opentelemetry-api/sdk.

Installing Extras

The framework provides optional extras for additional capabilities:

# Embedding providers (e.g. OpenAI / Azure)
uv add "fireflyframework-agentic[openai-embeddings]"

# Vector store backends
uv add "fireflyframework-agentic[vectorstores-chroma]"

# Memory persistence backends
uv add "fireflyframework-agentic[postgres]"

# Everything
uv add "fireflyframework-agentic[all]"

For our IDP project we will eventually use several of these, so install everything:

uv add "fireflyframework-agentic[all]"

Configuration with FireflyAgenticConfig

All framework settings live in a single Pydantic Settings class that reads from environment variables prefixed with FIREFLY_AGENTIC_:

from fireflyframework_agentic import FireflyAgenticConfig, get_config

# get_config() returns a thread-safe singleton
config = get_config()
print(config.default_model) # "openai:gpt-4o"
print(config.default_temperature) # None (no temperature forced; provider default)
print(config.max_retries) # 3

Override any setting via environment variables or a .env file:

# .env

# --- Provider API keys (read by Pydantic AI automatically) ---
OPENAI_API_KEY=sk-...
# ANTHROPIC_API_KEY=sk-ant-...
# GEMINI_API_KEY=...
# GROQ_API_KEY=gsk_...
# DEEPSEEK_API_KEY=...

# --- Framework settings ---
FIREFLY_AGENTIC_DEFAULT_MODEL=openai:gpt-4o
FIREFLY_AGENTIC_DEFAULT_TEMPERATURE=0.3
FIREFLY_AGENTIC_LOG_LEVEL=DEBUG
FIREFLY_AGENTIC_OBSERVABILITY_ENABLED=true

Here are the most commonly used configuration fields:

  • default_model — LLM model string (e.g. "openai:gpt-4o", "anthropic:claude-3-5-sonnet").
  • default_temperature — Default sampling temperature. None (default) forces no temperature, so each provider uses its own default (some models, e.g. OpenAI o1/o3, reject an explicit temperature). When set, it's merged into an agent's settings only if the caller omits one.
  • max_retries — Default retry count for agent runs.
  • observability_enabled — Toggle OpenTelemetry instrumentation.
  • prompt_templates_dir — Directory for Jinja2 prompt files.
  • default_chunk_size / default_chunk_overlap — Content chunking defaults.
  • max_context_tokens — Maximum context window (default 128,000).
  • validation_enabled — Enable/disable output validation.
  • cost_tracking_enabled — Enable/disable usage and cost tracking.
  • budget_limit_usd — Hard budget limit in USD (a warning is logged when exceeded).
  • cost_strict — When True, cost resolution raises UnknownModelCostError instead of returning None for models with no known pricing (default False).
  • memory_backend"in_memory", "file", "postgres", or "mongodb".
  • memory_max_conversation_tokens — Token budget per conversation.
  • encryption_enabled / encryption_key — Enable AES-256-GCM encryption of memory at rest.

The singleton is created once and cached for the process lifetime. Call reset_config() in tests to force re-creation.

Model Providers & Authentication

Before you can run an agent against a real LLM, you need credentials for your model provider. fireflyframework-agentic delegates model communication entirely to Pydantic AI, which supports multiple providers out of the box.

The simplest method — set the appropriate API key as an environment variable and use the "provider:model_name" string format:

Provider Env Variable Model String Example
OpenAI OPENAI_API_KEY "openai:gpt-4o"
Anthropic ANTHROPIC_API_KEY "anthropic:claude-sonnet-4-20250514"
Google Gemini GEMINI_API_KEY "google:gemini-2.0-flash"
Groq GROQ_API_KEY "groq:llama-3.3-70b"
DeepSeek DEEPSEEK_API_KEY "deepseek:deepseek-chat"
Mistral MISTRAL_API_KEY "mistral:mistral-large-latest"
AWS Bedrock AWS_* credentials "bedrock:anthropic.claude-3-5-sonnet-latest"
Ollama (local) (none required) "ollama:llama3.2"

Pydantic AI reads these variables automatically — you do not need to pass them to the framework. Just set the key and use the model string:

# .env
OPENAI_API_KEY=sk-...
FIREFLY_AGENTIC_DEFAULT_MODEL=openai:gpt-4o
from fireflyframework_agentic.agents import FireflyAgent

# Uses OPENAI_API_KEY from the environment
agent = FireflyAgent(name="my-agent", model="openai:gpt-4o")

To switch providers, change the model string and API key — no code changes required:

# .env — switch to Anthropic
ANTHROPIC_API_KEY=sk-ant-...
FIREFLY_AGENTIC_DEFAULT_MODEL=anthropic:claude-sonnet-4-20250514

Approach 2: Programmatic Model Objects

For scenarios that require explicit credential management — Azure OpenAI, AWS Bedrock, custom endpoints, or when you don't want to use environment variables — construct a Pydantic AI Model object and pass it directly to FireflyAgent:

from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.openai import OpenAIProvider
from fireflyframework_agentic.agents import FireflyAgent

# Explicit API key (e.g. loaded from a vault)
model = OpenAIChatModel(
    "gpt-4o",
    provider=OpenAIProvider(api_key="sk-...")
)
agent = FireflyAgent(name="my-agent", model=model)

Azure OpenAI:

from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.azure import AzureProvider

model = OpenAIChatModel(
    "my-gpt4o-deployment",
    provider=AzureProvider(
        azure_endpoint="https://my-resource.openai.azure.com",
        api_version="2025-03-01-preview",
        api_key="...", # or use DefaultAzureCredential
    ),
)
agent = FireflyAgent(name="azure-agent", model=model)

Anthropic with explicit key:

from pydantic_ai.models.anthropic import AnthropicModel
from pydantic_ai.providers.anthropic import AnthropicProvider

model = AnthropicModel(
    "claude-sonnet-4-20250514",
    provider=AnthropicProvider(api_key="sk-ant-...")
)
agent = FireflyAgent(name="claude-agent", model=model)

AWS Bedrock:

from pydantic_ai.models.bedrock import BedrockConverseModel

model = BedrockConverseModel(
    "anthropic.claude-3-5-sonnet-20241022-v2:0",
    region_name="us-east-1",
)
agent = FireflyAgent(name="bedrock-agent", model=model)

The framework's observability layer automatically detects Bedrock-hosted models and resolves them to the correct model family for cost tracking (Anthropic pricing), prompt caching (Anthropic cache configuration), and rate-limit retry (Bedrock ThrottlingException detection).

OpenAI-compatible endpoints (e.g. Ollama, vLLM, LiteLLM):

from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.openai import OpenAIProvider

model = OpenAIChatModel(
    "llama3.2",
    provider=OpenAIProvider(
        base_url="http://localhost:11434/v1",
        api_key="ollama", # Ollama doesn't require a real key
    ),
)
agent = FireflyAgent(name="local-agent", model=model)

Which Approach Should I Use?

  • Environment variables for standard cloud providers (OpenAI, Anthropic, Google, Groq, DeepSeek). This is the simplest path and works well in most deployments.
  • Programmatic Model objects for Azure OpenAI, AWS Bedrock, self-hosted models, OpenAI-compatible servers, or when API keys are loaded from a secrets manager at runtime.

Both approaches work identically with every framework feature — tools, reasoning patterns, pipelines, cost tracking, prompt caching, and all other modules. The framework's model_utils module normalizes model identity from both strings and Model objects, so observability and resilience features work uniformly across all providers.

IDP Tie-In

For our IDP project, we will use environment variables. Make sure your .env contains the API key for whichever provider you choose:

# .env
OPENAI_API_KEY=sk-...
FIREFLY_AGENTIC_DEFAULT_MODEL=openai:gpt-4o
FIREFLY_AGENTIC_DEFAULT_TEMPERATURE=0.3
FIREFLY_AGENTIC_OBSERVABILITY_ENABLED=true

Chapter 3: Your First Agent

Every GenAI application starts with a single question: "How do I talk to the model?" In raw Pydantic AI you create an Agent, give it a system prompt, and call run(). That works great for scripts — but the moment you need to register agents by name, share them across pipelines, delegation, and reasoning patterns, attach lifecycle hooks, or plug them into a larger system, you need a thin coordination layer on top.

That is exactly what FireflyAgent is. It wraps a Pydantic AI Agent and adds three things the framework relies on: a global registry (so any module can look up an agent by name), lifecycle management (init → warmup → shutdown), and metadata (tags, descriptions, correlation IDs). The underlying Pydantic AI agent does all the heavy lifting — model calls, tool dispatch, streaming — while FireflyAgent provides the scaffolding that makes it a team player in a larger system.

Agent System Architecture

The following diagram shows how FireflyAgent sits on top of Pydantic AI and connects to the rest of the framework:

graph TB
    subgraph Application Code
        DEC["@firefly_agent decorator"]
        CLS["FireflyAgent class"]
    end

    subgraph Agent Layer
        FA["FireflyAgent"]
        PAI["pydantic_ai.Agent\n(model calls, tool dispatch, streaming)"]
        REG["AgentRegistry\n(singleton name → agent map)"]
        CTX["AgentContext\n(correlation_id, metadata, trace)"]
        LC["AgentLifecycle\n(init → warmup → shutdown)"]
    end

    subgraph Consumers
        PIPE["Pipelines"]
        DELEG["Delegation Router"]
        REASON["Reasoning Patterns"]
    end

    DEC -->|creates + registers| FA
    CLS -->|creates| FA
    FA -->|wraps| PAI
    FA -->|registers in| REG
    FA -->|carries| CTX
    FA -->|hooks| LC
    REG -->|lookup by name| PIPE
    REG -->|lookup by name| DELEG
    REG -->|lookup by name| REASON

Creating an Agent with the Decorator

The fastest way to create and register an agent is with the @firefly_agent decorator. You write a function that returns the system prompt, and the framework takes care of creating the agent, wiring the prompt, and registering it — all in one step:

from fireflyframework_agentic.agents import firefly_agent

# The decorator creates a FireflyAgent, uses this function as the dynamic
# instructions provider, and registers the agent in the global AgentRegistry.
@firefly_agent(name="greeter", model="openai:gpt-4o")
def greeter_instructions(ctx):
    # This function is called at the start of every run.
    # You can inspect `ctx` to customise the prompt per-request.
    return "You are a friendly greeter. Say hello and ask how you can help."

What happens behind the scenes:

  1. A FireflyAgent named "greeter" is created with model "openai:gpt-4o".
  2. The decorated function becomes the agent's dynamic instructions provider — it is called at the start of every run and can use the context to customise the system prompt.
  3. The agent is automatically registered in the global AgentRegistry, so any module (pipelines, delegation routers) can look it up by name.

Creating an Agent with the Class

When you need full control — custom output types, explicit tool lists, or you prefer not to use decorators — instantiate FireflyAgent directly and register it yourself:

from fireflyframework_agentic.agents import FireflyAgent
from fireflyframework_agentic.agents.registry import agent_registry

# Create the agent with a static instructions string and a typed output.
# The `output_type` tells Pydantic AI to validate the LLM's response as a dict.
classifier = FireflyAgent(
    name="document_classifier",
    model="openai:gpt-4o",
    instructions=(
        "You are a document classification expert. "
        "Given a document, determine its type (invoice, receipt, contract, form), "
        "language, and page count. Return JSON with keys: doc_type, language, page_count."
    ),
    output_type=dict,
)

# Register it so other parts of the framework (pipelines, delegation) can find it.
agent_registry.register(classifier)

Running an Agent

Every agent supports three execution modes — pick the one that fits your context:

# Async — the standard choice for production services.
result = await classifier.run("Classify this document: Invoice from Acme Corp...")
print(result.output) # {"doc_type": "invoice", "language": "en", "page_count": 1}

# Synchronous — handy for scripts, notebooks, and quick experiments.
result = classifier.run_sync("Classify this document: ...")
print(result.output)

# Streaming — for real-time UI feedback where you want tokens as they arrive.
# streaming_mode is "buffered" (default, chunked) or "incremental" (token-by-token).
async with classifier.run_stream(
    "Classify this document: ...", streaming_mode="incremental"
) as stream:
    async for chunk in stream:
        print(chunk, end="", flush=True)

The Agent Registry

The AgentRegistry is a process-wide singleton that maps agent names to FireflyAgent instances. This is the glue that lets any module — delegation routers, pipelines, reasoning patterns — discover and invoke agents without importing them directly:

from fireflyframework_agentic.agents.registry import agent_registry

# Retrieve an agent by name — returns the FireflyAgent or raises KeyError.
agent = agent_registry.get("document_classifier")

# Check existence before retrieval.
if agent_registry.has("document_classifier"):
    print("Agent is registered")

# List all registered agents (useful for admin dashboards / health checks).
for info in agent_registry.list_agents():
    print(f"{info.name} ({info.model})")

Agent Context

When a request arrives, you often need to carry metadata along the entire execution path — who sent the request, which experiment is active, what trace ID to log. AgentContext is that bag of request-scoped data:

from fireflyframework_agentic.agents.context import AgentContext

# Create a context with a correlation ID and arbitrary metadata.
# This context will be available in instructions providers and tool functions.
ctx = AgentContext(
    correlation_id="req-abc-123",
    metadata={"tenant": "acme-corp", "source": "email-inbox"},
)

The context is available inside dynamic instructions providers and tool functions, allowing you to tailor behaviour per-request without global state.

Lifecycle Management

Real-world agents often depend on external resources — database connections, model caches, file handles. AgentLifecycle gives you three hooks to manage them cleanly: init (one-time setup), warmup (pre-heat caches), and shutdown (release resources):

from fireflyframework_agentic.agents.lifecycle import AgentLifecycle

lifecycle = AgentLifecycle()

# Each hook accepts a sync or async callable.
lifecycle.on_init(lambda: print("Loading resources..."))
lifecycle.on_warmup(lambda: print("Warming up model cache..."))
lifecycle.on_shutdown(lambda: print("Releasing resources..."))

# In your application startup/shutdown sequence:
await lifecycle.run_init()
await lifecycle.run_warmup()
# ... application serves requests ...
await lifecycle.run_shutdown()

The Middleware Stack

Every FireflyAgent runs each call (run, run_sync, run_stream, run_with_reasoning) through a MiddlewareChain — a before/after pipeline whose hooks receive a MiddlewareContext (agent name, prompt, method, model, deps). Two middlewares are auto-wired: LoggingMiddleware is always added, and ObservabilityMiddleware is added when config.observability_enabled is true. You can attach more from fireflyframework_agentic.agents:

  • PromptGuardMiddleware / OutputGuardMiddleware — block prompt-injection / leaky output.
  • CostGuardMiddleware — raises BudgetExceededError when a budget is exceeded.
  • CacheMiddleware (ResultCache) and PromptCacheMiddleware (CacheStatistics).
  • ValidationMiddleware — validate structured output.
  • RetryMiddleware — retry on failure (not auto-wired; rate-limit retry is built into run() itself).
  • ExplainabilityMiddleware — record decisions to a TraceRecorder.
from fireflyframework_agentic.agents import FireflyAgent, PromptGuardMiddleware

agent = FireflyAgent(name="assistant", model="openai:gpt-4o")
agent.middleware.add(PromptGuardMiddleware())

For provider resilience, FallbackModelWrapper / run_with_fallback let an agent fall back to a secondary model, and CircuitBreakerMiddleware (Chapter 11) trips on repeated failures.

IDP Tie-In: The Document Classifier Agent

Time to build the first piece of our IDP pipeline. The document classifier receives a raw document (text or scanned image) and outputs structured metadata — document type, language, page count, and orientation. Every subsequent stage depends on this output.

from fireflyframework_agentic.agents import FireflyAgent

# The classifier is our pipeline's entry point — it decides how the document
# will be routed through digitisation, extraction, and validation.
classifier_agent = FireflyAgent(
    name="document_classifier",
    model="openai:gpt-4o",
    instructions=(
        "You are a document classification expert. Given a document (text or image), "
        "determine its type, language, and page count. "
        "Return JSON: {doc_type, language, page_count, orientation}."
    ),
    output_type=dict, # Pydantic AI will validate the LLM output as a dict
)

For multimodal input (e.g. scanned images), the framework provides typed content wrappers that ensure the LLM receives the content in the right format:

from fireflyframework_agentic.types import ImageUrl

# Pass a list of mixed content — text instructions + an image.
# The framework converts ImageUrl into the provider-specific multimodal format.
result = await classifier_agent.run([
    "Classify this document:",
    ImageUrl(url="data:image/png;base64,<base64-data>"),
])

Other supported types include AudioUrl, DocumentUrl, and VideoUrl.


Chapter 4: Tools

LLMs are powerful reasoners, but they cannot check a database, call an API, or read a file on their own. Tools bridge that gap: they are functions the model can call during a conversation to fetch data, trigger side-effects, or run computations.

Pydantic AI already supports tool functions, but fireflyframework-agentic wraps them with a richer layer: a protocol-based type system (ToolProtocolBaseTool), guards that enforce validation, rate-limiting, and sandboxing before a tool executes (human-in-the-loop approval is separate — it pauses the run rather than rejecting it), composition primitives (sequential, fallback, conditional), a global registry for discovery, and a ToolKit that can convert framework tools into Pydantic AI tools for injection into any agent.

Tool System Architecture

The following diagram shows the full tool stack — from how you create tools at the top, through the guard chain, to how they reach an agent at the bottom:

graph TB
    subgraph "Tool Creation"
        DEC["@firefly_tool decorator"]
        BLD["ToolBuilder (fluent API)"]
        BLT["Built-in Tools\n(Http, FileSystem, DateTime, JSON, ...)"]
    end

    subgraph "Tool Protocol Layer"
        TP["ToolProtocol"]
        BT["BaseTool\n(name, description, execute)"]
    end

    subgraph "Guard Chain"
        CG["CompositeGuard"]
        VG["ValidationGuard"]
        RG["RateLimitGuard"]
        SG["SandboxGuard"]
    end

    subgraph "Composition"
        SEQ["SequentialComposer"]
        FB["FallbackComposer"]
        COND["ConditionalComposer"]
    end

    subgraph "Registration & Bridging"
        TR["ToolRegistry\n(global catalog)"]
        TK["ToolKit\n(group + as_pydantic_tools)"]
    end

    subgraph "Agent Integration"
        FA["FireflyAgent(tools=[...])"]
        PAI["pydantic_ai.Agent\n(calls tools during LLM run)"]
    end

    DEC --> BT
    BLD --> BT
    BLT --> BT
    BT --> TR
    BT -.->|"guarded"| CG
    CG --> VG
    CG --> RG
    CG --> SG
    BT -.->|"compose"| SEQ
    BT -.->|"compose"| FB
    BT -.->|"compose"| COND
    TR --> TK
    TK -->|"as_pydantic_tools"| FA
    FA -->|"tools list"| PAI

Creating a Tool with the Decorator

The fastest path — one decorator does everything:

from fireflyframework_agentic.tools import firefly_tool

# @firefly_tool creates a BaseTool, registers it in the global ToolRegistry,
# and makes it discoverable by name for ToolKit grouping and agent bridging.
@firefly_tool(name="lookup_vendor", description="Look up vendor details by name")
async def lookup_vendor(vendor_name: str) -> str:
    # In production, this would query a database or API.
    vendors = {"Acme Corp": "ID-001", "Globex": "ID-002"}
    return vendors.get(vendor_name, "Unknown vendor")

Creating a Tool with the Builder

When you need more control — or want to build tools programmatically at runtime — use the fluent ToolBuilder:

from fireflyframework_agentic.tools import ToolBuilder

async def fetch_exchange_rate(currency: str) -> float:
    """Simulated exchange rate lookup."""
    rates = {"USD": 1.0, "EUR": 0.85, "GBP": 0.73}
    return rates.get(currency, 0.0)

# The builder pattern lets you set each property explicitly.
# Call .build() to produce the final BaseTool instance.
exchange_tool = (
    ToolBuilder("exchange_rate")
    .description("Get exchange rate for a currency code")
    .parameter("currency", str, description="Three-letter currency code")
    .handler(fetch_exchange_rate)
    .build()
)

Tool Guards

In production, you rarely want a tool to run unconditionally. Guards are decorators that wrap a tool's execution with hard, synchronous policy checks — input validation, rate-limiting, or filesystem sandboxing. They run before the handler (and optionally after), and they stack via CompositeGuard. (Human-in-the-loop approval, which pauses the run rather than rejecting it, is handled separately — see below.)

Validation Guard

Ensures that all required parameters are present before the tool handler runs:

from fireflyframework_agentic.tools import firefly_tool, guarded
from fireflyframework_agentic.tools.guards import ValidationGuard

# The guard checks that all listed keys are present in kwargs.
@guarded(ValidationGuard(required_keys=["vendor_name"]))
@firefly_tool(name="lookup_vendor", description="Look up vendor")
async def lookup_vendor(vendor_name: str) -> str:
    ...

Rate Limit Guard

Prevents a tool from being called too frequently — essential for expensive or metered external APIs:

from fireflyframework_agentic.tools.guards import RateLimitGuard

# Token-bucket limiter: 10 calls per 60-second sliding window.
@guarded(RateLimitGuard(max_calls=10, period_seconds=60))
@firefly_tool(name="web_search", description="Search the web")
async def web_search(query: str) -> str:
    ...

Sandbox Guard

Restricts tool arguments via allow/deny regex patterns — useful for preventing path traversal, secret leakage, or access to dangerous locations:

from fireflyframework_agentic.tools.guards import SandboxGuard

# Allow paths under /tmp/uploads, deny path traversal (..) and .env files.
@guarded(SandboxGuard(
    allowed_patterns=[r"^/tmp/uploads"],
    denied_patterns=[r"\.\.", r"\.env"],
))
@firefly_tool(name="read_file", description="Read a file")
async def read_file(path: str) -> str:
    ...

Human-in-the-loop approval

Human approval is not a guard — it pauses the run rather than failing it. Mark a tool with requires_approval=True; the agent run then pauses before the tool executes and the caller resumes with an approval decision. See Human-in-the-Loop Tool Approval for the full flow.

from fireflyframework_agentic.agents import is_deferred
from fireflyframework_agentic.tools import DeferredToolResults

@firefly_tool(name="delete_record", description="Delete a database record", requires_approval=True)
async def delete_record(record_id: str) -> str:
    ...

result = await agent.run("Delete record 42.")
if is_deferred(result):  # paused for sign-off
    approvals = {c.tool_call_id: True for c in result.output.approvals}  # True / ToolApproved / ToolDenied
    result = await agent.run(message_history=result.all_messages(),
                             deferred_tool_results=DeferredToolResults(approvals=approvals))

Composing Guards

Chain multiple guards with CompositeGuard — all must pass (AND semantics, evaluated in order, first failure short-circuits):

from fireflyframework_agentic.tools.guards import CompositeGuard, ValidationGuard, RateLimitGuard

guard = CompositeGuard(guards=[
    ValidationGuard(required_keys=["query"]),
    RateLimitGuard(max_calls=5, period_seconds=30),
])

Retryable Tools

The @retryable decorator wraps a tool's execute method with exponential-backoff retry logic — useful for tools that call flaky external APIs:

from fireflyframework_agentic.tools import firefly_tool, retryable

@retryable(max_retries=3, backoff=1.0)
@firefly_tool(name="call_erp", description="Fetch data from the ERP API")
async def call_erp(query: str) -> str:
    # On failure, retries up to 3 times with 1s → 2s → 4s backoff
    ...

@retryable stacks with @guarded — guards run first, then retries wrap the handler:

@retryable(max_retries=2)
@guarded(RateLimitGuard(max_calls=10, period_seconds=60))
@firefly_tool(name="web_search", description="Search the web")
async def web_search(query: str) -> str:
    ...

Tool Composition

Combine tools into higher-level operations:

from fireflyframework_agentic.tools import SequentialComposer, FallbackComposer, ConditionalComposer

# Sequential: output of one becomes the `input` kwarg to the next.
# First positional arg is the composed tool's name.
pipeline = SequentialComposer(
    "fetch-parse-validate",
    [fetch_tool, parse_tool, validate_tool],
    description="Fetch, parse, then validate",
)

# Fallback: try tools in order until one succeeds; raise if all fail.
resilient = FallbackComposer(
    "resilient-api",
    [primary_api, secondary_api, cache_lookup],
)

# Conditional: a router_fn receives kwargs and returns a key into tool_map.
def select_parser(**kwargs) -> str:
    return "json" if kwargs.get("format") == "json" else "text"

router = ConditionalComposer(
    "format-router",
    router_fn=select_parser,
    tool_map={"json": json_parser, "text": text_parser},
)

Built-In Tools

The framework ships with nine ready-to-use tools in tools/builtins/:

from fireflyframework_agentic.tools.builtins import (
    DateTimeTool, # Current date/time, timezone conversion
    JsonTool, # Parse, validate, extract, format JSON
    TextTool, # Word count, regex extract, truncate, replace
    CalculatorTool, # Safe math via AST parsing (no eval)
)

# Each tool follows ToolProtocol and can be registered and guarded
datetime_tool = DateTimeTool(default_timezone="America/New_York")
result = await datetime_tool.execute(action="now")

calculator = CalculatorTool()
result = await calculator.execute(expression="sqrt(144) + pi")

The full list: HttpTool, FileSystemTool, SearchTool, DatabaseTool, ShellTool, DateTimeTool, JsonTool, TextTool, CalculatorTool.

The Tool Registry

from fireflyframework_agentic.tools.registry import tool_registry

tool_registry.register(my_tool)
tool = tool_registry.get("my_tool")
all_tools = tool_registry.list_tools()

ToolKit

A ToolKit groups related tools and can convert them to Pydantic AI tools for direct injection into an agent:

from fireflyframework_agentic.tools import ToolKit
from fireflyframework_agentic.tools.builtins import DateTimeTool, CalculatorTool

datetime_tool = DateTimeTool()
calculator = CalculatorTool()

kit = ToolKit("utility-tools", [datetime_tool, calculator], description="Common utilities")

# Register all tools in the toolkit at once
from fireflyframework_agentic.tools import tool_registry
kit.register_all(tool_registry)

Attaching Tools to Agents

The Firefly tool system (ToolRegistry, @firefly_tool, ToolKit) is a separate layer from the Pydantic AI tool system baked into each agent. Here is how they connect:

Approach 1: Pass Pydantic AI tool functions directly to FireflyAgent

The tools parameter on FireflyAgent accepts any objects that Pydantic AI recognises as tools — plain functions, pydantic_ai.Tool objects, etc.:

from fireflyframework_agentic.agents import FireflyAgent

async def lookup_vendor(ctx, vendor_name: str) -> str:
    return f"Details for {vendor_name}"

agent = FireflyAgent(
    name="extractor",
    model="openai:gpt-4o",
    tools=[lookup_vendor], # Pydantic AI tool functions
)

Approach 2: Register tools after creation with decorator proxies

FireflyAgent exposes .tool() and .tool_plain() decorator proxies that delegate to the underlying Pydantic AI agent:

agent = FireflyAgent(name="assistant", model="openai:gpt-4o")

@agent.tool_plain
async def calculate(expression: str) -> str:
    """Evaluate a math expression."""
    return str(eval(expression)) # simplified example

@agent.tool
async def get_user(ctx, user_id: str) -> str:
    """Look up a user by ID (receives RunContext)."""
    return f"User {user_id}"

Approach 3: Bridge Firefly tools via ToolKit.as_pydantic_tools()

Firefly BaseTool instances (created with @firefly_tool, ToolBuilder, or built-ins) live in the ToolRegistry. To feed them into an agent, convert via as_pydantic_tools():

from fireflyframework_agentic.tools import ToolKit
from fireflyframework_agentic.tools.builtins import DateTimeTool, JsonTool
from fireflyframework_agentic.agents import FireflyAgent

kit = ToolKit("utilities", [DateTimeTool(), JsonTool()])

agent = FireflyAgent(
    name="helper",
    model="openai:gpt-4o",
    tools=kit.as_pydantic_tools(), # Convert to Pydantic AI tools
)

Key distinction: ToolRegistry is a framework-level catalog for discovery and metadata. An agent only calls tools that are in its own Pydantic AI tools list. Use ToolKit.as_pydantic_tools() or agent.tool() to bridge between the two.

IDP Tie-In: OCR and Vendor Lookup Tools

For our IDP pipeline, we need tools the extraction agent can call. We define them with @firefly_tool, group them into a ToolKit, and attach them to the agent via as_pydantic_tools(). This is the pattern you will see end-to-end in Chapter 6 (reasoning patterns) and Chapter 18 (full IDP application).

Step 1 — Define the tools:

from fireflyframework_agentic.tools import firefly_tool, guarded
from fireflyframework_agentic.tools.guards import RateLimitGuard

@guarded(RateLimitGuard(max_calls=100, period_seconds=60))
@firefly_tool(name="ocr_extract", description="Extract text from a document image via OCR")
async def ocr_extract(image_data: str) -> str:
    """In production, call an OCR service like AWS Textract or Google Vision."""
    return "Invoice #INV-2026-001\nVendor: Acme Corp\nAmount: $1,234.56\nDate: 2026-01-15"

@firefly_tool(name="vendor_lookup", description="Look up vendor details from the ERP system")
async def vendor_lookup(vendor_name: str) -> str:
    vendors = {
        "Acme Corp": '{"id": "V-001", "tax_id": "US-12345", "payment_terms": "NET30"}',
        "Globex": '{"id": "V-002", "tax_id": "US-67890", "payment_terms": "NET60"}',
    }
    return vendors.get(vendor_name, '{"error": "Vendor not found"}')

Step 2 — Group into a ToolKit and attach to the extraction agent:

from fireflyframework_agentic.tools import ToolKit
from fireflyframework_agentic.tools.builtins import CalculatorTool
from fireflyframework_agentic.agents import FireflyAgent

extraction_kit = ToolKit(
    "idp-extraction",
    [ocr_extract, vendor_lookup, CalculatorTool()],
    description="Tools for IDP invoice extraction",
)

extractor_agent = FireflyAgent(
    name="extractor",
    model="openai:gpt-4o",
    instructions="You are an invoice data extraction specialist.",
    tools=extraction_kit.as_pydantic_tools(), # Bridge Firefly tools → Pydantic AI
)

What happens next: In Chapter 6 we pass extractor_agent (with its tools already attached) to reasoning patterns like Plan-and-Execute and Reflexion. The pattern calls agent.run() internally — the tools are available because they were bound here. Chapter 18 shows the complete production module (idp/tools.py) with retries, guards, and the full ToolKit.


Chapter 5: Prompts

If tools are an agent's hands, prompts are its brain. The exact wording of a system prompt can mean the difference between extracting 60% of invoice fields and 98%. In production you need to version prompts (so you can A/B test), compose them (system + context + task), validate them (catch missing variables before runtime), and load them from files (so non-engineers can edit them).

The Prompts module provides all of this through a Jinja2-based template engine. Every template is a first-class object with a name, a version, declared variables, and a render method — not just a raw string.

Creating a Prompt Template

A PromptTemplate takes three positional argumentsname, system_template, and user_template — plus keyword-only version, description, required_variables, and metadata. Both templates are Jinja2 source:

from fireflyframework_agentic.prompts import PromptTemplate

extraction_prompt = PromptTemplate(
    "invoice_extraction",
    # system_template
    "You are an invoice data extraction specialist.\n"
    "Always return valid JSON matching the requested schema.",
    # user_template
    "Extract the following fields from the document text below:\n"
    "- invoice_number\n"
    "- vendor_name\n"
    "- total_amount (numeric)\n"
    "- due_date (ISO format)\n"
    "- line_items (list of {description, quantity, unit_price})\n\n"
    "Document text:\n{{ document_text }}",
    version="1.0.0",
    required_variables=["document_text"],
)

# render() returns a Prompt object with .system and .user fields (NOT a string).
prompt = extraction_prompt.render(document_text="Invoice #INV-001 from Acme Corp...")
print(prompt.system)  # rendered system_template
print(prompt.user)    # rendered user_template

Templates use Jinja2 syntax — {{ variable }}, {% if %}, {% for %}, filters, and macros all work. required_variables is validated at render time: rendering without a required variable raises PromptValidationError.

Versioning

The PromptRegistry supports multiple versions of the same template. This is crucial for A/B testing different prompt strategies:

from fireflyframework_agentic.prompts import PromptRegistry

registry = PromptRegistry()
registry.register(extraction_prompt_v1)
registry.register(extraction_prompt_v2)

# Get the latest version
latest = registry.get("invoice_extraction")

# Get a specific version
v1 = registry.get("invoice_extraction", "1.0.0")

Composition

Templates can be composed using three strategies:

Sequential Composition

Render templates in order and join them — useful for building system + context + task prompts. The system parts are joined together and the user parts are joined together, and render() returns a single Prompt:

from fireflyframework_agentic.prompts.composer import SequentialComposer

# By default, templates are joined with "\n\n". Override with the keyword-only `separator=`.
composer = SequentialComposer(
    [system_prompt, context_prompt, task_prompt],
    separator="\n\n",
)
prompt = composer.render(document_text="Invoice #INV-001...")  # -> Prompt(.system, .user)

Conditional Composition

Select a template based on a runtime condition. The condition_fn receives the render kwargs and returns a string key that maps into template_map:

from fireflyframework_agentic.prompts.composer import ConditionalComposer

# The condition function inspects the kwargs and returns a template key.
# Both args are positional: (condition_fn, template_map).
composer = ConditionalComposer(
    lambda **kwargs: "invoice" if kwargs.get("doc_type") == "invoice" else "generic",
    {
        "invoice": invoice_prompt,
        "generic": generic_prompt,
    },
)
prompt = composer.render(doc_type="invoice", document_text="...")  # -> Prompt

Merge Composition

Render templates and merge with a custom function — full control over how pieces combine:

from fireflyframework_agentic.prompts.composer import MergeComposer

# The merge_fn is applied separately to the system parts and to the user parts.
# Both args are positional: (templates, merge_fn).
composer = MergeComposer(
    [header, body, footer],
    lambda parts: "\n---\n".join(parts),
)
prompt = composer.render(document_text="...")  # -> Prompt

Validation

The PromptValidator checks rendered prompts against configurable constraints — token limits and required sections — catching problems before they reach the LLM:

from fireflyframework_agentic.prompts import PromptValidator

# Validate that the rendered prompt fits within 4,000 tokens
# and contains the required "valid JSON" section.
validator = PromptValidator(max_tokens=4000, required_sections=["valid JSON"])
prompt = extraction_prompt.render(document_text="Invoice #INV-001 from Acme Corp...")
# validate() takes a string — pass the rendered user (or system) text.
result = validator.validate(prompt.user)
if not result.valid:
    print(f"Prompt issues: {result.errors}")

Using Prompts with Agents

So far we've created templates, versioned them, and composed them — but none of that is useful until the rendered text reaches an agent. Here is how the two systems connect.

Direct rendering → agent.run() — The simplest path. Render a template and pass the rendered user text as the prompt. (render() returns a Prompt; an agent's run() takes the user text — the system prompt is usually set as the agent's instructions.)

from fireflyframework_agentic.agents import FireflyAgent
from fireflyframework_agentic.prompts import PromptTemplate

extraction_prompt = PromptTemplate(
    "invoice_extraction",
    "You are a precise invoice data extraction assistant.",  # system_template
    "Extract invoice_number, vendor_name, total_amount, due_date from:\n\n"
    "{{ document_text }}\n\nReturn valid JSON.",              # user_template
)

# Use the template's system text as the agent's instructions.
prompt = extraction_prompt.render(document_text=ocr_output)
agent = FireflyAgent(
    name="extractor", model="openai:gpt-4o", instructions=prompt.system, output_type=dict
)
result = await agent.run(prompt.user)
print(result.output) # {"invoice_number": "INV-001", ...}

Composed prompts → agent.run() — Use a composer when you need to assemble multiple templates (system instructions + context + task) into one prompt:

from fireflyframework_agentic.prompts.composer import SequentialComposer

# Each template provides system + user halves; here the work lives in the user half.
system = PromptTemplate("system", "You are a precise data extraction assistant.", "")
context = PromptTemplate("context", "", "Document type: {{ doc_type }}")
task = PromptTemplate("task", "", "Extract fields from:\n{{ document_text }}")

composer = SequentialComposer([system, context, task])
prompt = composer.render(doc_type="invoice", document_text=ocr_output)
result = await agent.run(prompt.user)

Reasoning patterns use prompts internally — Every reasoning pattern (Chapter 6) has named prompt slots backed by PromptTemplate instances. When a ReAct pattern calls agent.run(), it first renders its "thought" template, passes the result to the agent, and records the output in the trace. You can override any slot:

from fireflyframework_agentic.reasoning import ReActPattern

# Override the built-in thought prompt with your own template.
custom = PromptTemplate("my:thought", "You are a careful reasoner.", "Think about: {{ context }}")
pattern = ReActPattern(prompts={"thought": custom})

See Chapter 6 → Configurable Prompts for the full list of prompt slots per pattern.

Loading from Files

For large prompts or team workflows, store templates as files:

PromptLoader exposes three static factory methods — from_string(), from_file(), and from_directory():

from fireflyframework_agentic.prompts import PromptLoader

# From an inline string (name, system_template, user_template).
template = PromptLoader.from_string(
    "invoice_extraction",
    "You are an invoice parser.",
    "Extract fields from:\n{{ document_text }}",
)

# From a YAML file (keys map to PromptTemplate fields; name defaults to the file stem).
template = PromptLoader.from_file("prompts/invoice_extraction.yaml")

# Load an entire directory (defaults to the *.j2 glob).
all_templates = PromptLoader.from_directory("prompts/")

When a name is not supplied, the loader infers it from the file stem (e.g. invoice_extraction.yaml becomes "invoice_extraction").

IDP Tie-In: Versioned Extraction Prompts

For our IDP pipeline, we create versioned prompts that can be A/B tested later:

from fireflyframework_agentic.prompts import PromptTemplate, PromptRegistry

prompt_registry = PromptRegistry()

# Version 1: Simple extraction
extraction_v1 = PromptTemplate(
    "idp_extraction",
    "You are an invoice parser. Return valid JSON.",  # system_template
    "Extract these fields from the invoice:\n"          # user_template
    "- invoice_number, vendor_name, total_amount, due_date, line_items\n\n"
    "Text: {{ document_text }}",
    version="1.0.0",
    required_variables=["document_text"],
)

# Version 2: More structured with examples
extraction_v2 = PromptTemplate(
    "idp_extraction",
    "You are an expert invoice parser. Extract structured data and return valid JSON.",
    "Required fields:\n"
    " invoice_number: string (format: INV-NNNN)\n"
    " vendor_name: string\n"
    " total_amount: float\n"
    " due_date: string (ISO 8601)\n"
    " line_items: list of {description: str, quantity: int, unit_price: float}\n\n"
    "Example output:\n"
    '{"invoice_number": "INV-0001", "vendor_name": "Example", "total_amount": 100.0, '
    '"due_date": "2026-01-01", "line_items": [{"description": "Widget", "quantity": 2, '
    '"unit_price": 50.0}]}\n\n'
    "Document text:\n{{ document_text }}",
    version="2.0.0",
    required_variables=["document_text"],
)

prompt_registry.register(extraction_v1)
prompt_registry.register(extraction_v2)

# In production, select the version based on experiment configuration
template = prompt_registry.get("idp_extraction") # Returns v2 (latest)
prompt = template.render(document_text=ocr_output)  # -> Prompt(.system, .user)

Part II — Intelligence


Chapter 6: Reasoning Patterns

Here's a hard truth about LLMs: if you throw a complex question at an agent in a single prompt, you're gambling. Sometimes the model nails it; sometimes it hallucinated half the answer. Reasoning patterns fix that by giving the agent a structured way to think before it answers — loops of thinking, acting, observing, and reflecting, each step recorded so you can see exactly what happened.

The framework ships six patterns out of the box (ReAct, Chain of Thought, Plan-and-Execute, Reflexion, Tree of Thoughts, Goal Decomposition). Each one is a different strategy for the same problem: how do you turn "figure this out" into a repeatable, observable, debuggable process?

The Architecture

All patterns share the same core engine. AbstractReasoningPattern implements the Template Method design pattern: the base class runs the outer loop (step counting, trace recording, max-steps enforcement, optional output review), and each concrete pattern overrides five hooks that define its behaviour:

_reason(state) → Generate a thought ("what do I think?")
_act(state) → Perform an action ("what should I do?")
_observe(state, act) → Process the action's result ("what happened?")
_should_continue() → Decide whether to loop again ("am I done?")
_extract_output() → Produce the final answer ("what's the result?")

Here is how all the pieces fit together:

graph TD
    subgraph AbstractReasoningPattern
        EX["execute(agent, input)"] --> R["_reason()"]
        R --> STOP{"_should_stop?"}
        STOP -->|no| A["_act()"]
        A --> O["_observe()"]
        O --> CONT{"_should_continue?"}
        CONT -->|yes| R
        CONT -->|no| OUT["_extract_output()"]
        STOP -->|yes| OUT
        OUT --> REV{"reviewer?"}
        REV -->|yes| REVIEW["OutputReviewer.review()"]
        REV -->|no| RESULT["ReasoningResult"]
        REVIEW --> RESULT
    end

    subgraph Six Patterns
        REACT["ReAct"]
        COT["Chain of Thought"]
        PAE["Plan-and-Execute"]
        REF["Reflexion"]
        TOT["Tree of Thoughts"]
        GD["Goal Decomposition"]
    end

    REACT -->|extends| EX
    COT -->|extends| EX
    PAE -->|extends| EX
    REF -->|extends| EX
    TOT -->|extends| EX
    GD -->|extends| EX

How Reasoning Patterns Use Agent Tools

An important thing to understand: reasoning patterns don't manage tools. They manage thinking. When a pattern needs to interact with the outside world (search a database, call an API, do math), it delegates to agent.run() — and the agent's configured Pydantic AI tools handle the rest.

This means you must attach tools to the agent before passing it to a pattern. Here is the canonical flow, using the framework's tool system end-to-end:

# ── Step 1: Define tools with @firefly_tool (see Chapter 4) ──────────────
# These are automatically registered in the global ToolRegistry.

from fireflyframework_agentic.tools import firefly_tool, ToolKit

@firefly_tool(name="vendor_lookup", description="Look up vendor in the ERP system")
async def vendor_lookup(vendor_name: str) -> str:
    return '{"id": "V-001", "tax_id": "US-12345"}'

@firefly_tool(name="calculate", description="Evaluate a math expression safely")
async def calculate(expression: str) -> str:
    return str(eval(expression)) # simplified

# ── Step 2: Group tools in a ToolKit and bridge to Pydantic AI ───────────
# ToolKit.as_pydantic_tools() converts framework BaseTool instances into
# pydantic_ai.Tool objects that the LLM can call during agent.run().

extraction_tools = ToolKit(
    "extraction-tools",
    [vendor_lookup, calculate],
    description="Tools available during invoice extraction",
)

# ── Step 3: Create an agent WITH the bridged tools ──────────────────────

from fireflyframework_agentic.agents import FireflyAgent

extractor = FireflyAgent(
    name="extractor",
    model="openai:gpt-4o",
    tools=extraction_tools.as_pydantic_tools(), # Bridge: Firefly → Pydantic AI
)

# ── Step 4: Pass the tool-equipped agent to a reasoning pattern ─────────
# The pattern calls extractor.run() internally, which triggers Pydantic AI's
# tool dispatch — the LLM decides when to call vendor_lookup or calculate.

from fireflyframework_agentic.reasoning import ReActPattern

react = ReActPattern(max_steps=5)
result = await react.execute(extractor, "What is the total with tax for vendor Acme Corp?")

The runtime flow is: Pattern → agent.run(prompt) → Pydantic AI → LLM decides to call tools → tool results → back to pattern loop.

You can also skip the framework tool system and pass plain Pydantic AI tool functions directly to FireflyAgent(tools=[...]) — see Chapter 4 → Attaching Tools to Agents for all three approaches.

Architecture: Why Tools Are Explicitly Bound to Agents

You may wonder: "If @firefly_tool already registers a tool in the global ToolRegistry, why doesn't every agent automatically see every tool?"

This is by design. The framework has two separate layers with different purposes:

  1. ToolRegistry (discovery layer) — A global catalog of all tools in the system. Used for admin dashboards, documentation generation, plugin discovery, and pipeline wiring. Think of it as a phone book: it lists everything that exists.

  2. Agent tool binding (execution layer) — The specific set of tools an agent can invoke during an LLM run. Only tools explicitly passed via FireflyAgent(tools=[]) or agent.tool() are available to the model.

Why explicit binding?

  • Security — An agent that processes user input should not have access to ShellTool or delete_record. Implicit auto-injection of all registered tools would create a dangerous attack surface.
  • Predictability — When you read an agent definition, you can see exactly which tools it can call. No surprises from a plugin that registered a tool at import time.
  • Cost control — Each tool in a Pydantic AI agent's tool list adds tokens to the system prompt. Injecting 50 tools when an agent only needs 3 wastes tokens and confuses the model.
  • Principle of least privilege — Each agent gets the minimum set of tools required for its task, not the maximum set available.

Use ToolKit to curate which subset of registered tools each agent receives:

from fireflyframework_agentic.tools import ToolKit
from fireflyframework_agentic.tools.registry import tool_registry

# Curate: only extraction-related tools for this agent
extraction_kit = ToolKit("extraction", [
    tool_registry.get("vendor_lookup"),
    tool_registry.get("calculate"),
])

extractor = FireflyAgent(
    name="extractor",
    model="openai:gpt-4o",
    tools=extraction_kit.as_pydantic_tools(),
)

Memory in Reasoning Patterns

Reasoning patterns can also access the framework's memory system. When you pass a MemoryManager via the memory keyword argument, it becomes available in the pattern's internal state as state["memory"]. This lets pattern hooks read and write working memory during iterations:

from fireflyframework_agentic.reasoning import PlanAndExecutePattern
from fireflyframework_agentic.memory import MemoryManager

memory = MemoryManager(working_scope_id="extraction-session")

# Store a fact that the pattern's hooks can read
memory.set_fact("doc_type", "invoice")

# The memory kwarg is forwarded into state["memory"] inside the pattern
pattern = PlanAndExecutePattern(max_steps=15)
result = await pattern.execute(
    extractor_agent,
    "Extract invoice fields from the OCR text.",
    memory=memory, # Available as state["memory"] in _reason(), _act(), etc.
)

In pipelines, this happens automatically — ReasoningStep passes PipelineContext.memory to the pattern (see Chapter 10).

Structured Output Models

Patterns use typed Pydantic models instead of raw text. This is a big deal: instead of parsing free-form strings for magic words like "FINISH", the model returns structured objects with explicit fields. No more fragile regex parsing:

from fireflyframework_agentic.reasoning.models import (
    ReasoningThought,
    ReasoningAction,
    ReasoningPlan,
    PlanStepDef,
    StepStatus,
    ReflectionVerdict,
    BranchEvaluation,
    GoalDecompositionResult,
    GoalPhase,
)

# A structured thought — is_final replaces magic phrases like "FINISH"
thought = ReasoningThought(
    content="The invoice number is INV-2026-001",
    is_final=True,
    final_answer="INV-2026-001",
    confidence=0.95,
)

# A structured plan with step tracking
plan = ReasoningPlan(
    goal="Extract all invoice fields",
    steps=[
        PlanStepDef(id="s1", description="Find invoice number"),
        PlanStepDef(id="s2", description="Find vendor name", dependencies=["s1"]),
        PlanStepDef(id="s3", description="Find amount and date", dependencies=["s1"]),
    ],
)
# Each step has a status: pending → running → completed/failed/skipped

The Six Built-In Patterns

1. ReAct (Reasoning + Acting)

Interleaves thinking and tool use. Each iteration produces a ReasoningThought; when is_final=True, the loop stops.

from fireflyframework_agentic.reasoning import ReActPattern

react = ReActPattern(max_steps=5)
result = await react.execute(my_agent, "What is the total on invoice INV-001?")
print(result.output)
print(f"Steps taken: {len(result.trace.steps)}")

When to use: Tasks that need external information (tool calls, searches, API lookups) combined with reasoning.

2. Chain of Thought

The agent reasons step by step through a problem. Each step is a ReasoningThought.

from fireflyframework_agentic.reasoning import ChainOfThoughtPattern

cot = ChainOfThoughtPattern(max_steps=10)
result = await cot.execute(my_agent, "Calculate the VAT on $1,234.56 at 20%.")

When to use: Math, logic puzzles, multi-step deductions where the agent needs to show its work.

3. Plan-and-Execute

The agent first generates a ReasoningPlan with PlanStepDef items, then executes each step with status tracking (pending → running → completed/failed). Supports replanning when steps fail.

from fireflyframework_agentic.reasoning import PlanAndExecutePattern

planner = PlanAndExecutePattern(max_steps=15, allow_replan=True)
result = await planner.execute(
    extractor_agent,
    "Extract all fields from this invoice:\n" + ocr_text,
)

When to use: Complex, multi-step tasks where you want visibility into progress and the ability to recover from failures.

4. Reflexion

Adds a self-evaluation loop. After generating output, the agent critiques itself via a ReflectionVerdict. If is_satisfactory=False, the issues and suggestions are fed back into a retry prompt.

from fireflyframework_agentic.reasoning import ReflexionPattern

reflexion = ReflexionPattern(max_steps=3)
result = await reflexion.execute(my_agent, "Write a unit test for binary search.")

When to use: Tasks where quality matters and the agent can meaningfully self-correct (writing code, generating reports, data extraction).

5. Tree of Thoughts

Explores multiple reasoning branches and evaluates each with a BranchEvaluation (score + reasoning). The highest-scoring branch is selected.

from fireflyframework_agentic.reasoning import TreeOfThoughtsPattern

tot = TreeOfThoughtsPattern(branching_factor=3, max_depth=3)
result = await tot.execute(my_agent, "Design an API for a task management system.")

When to use: Open-ended problems with multiple valid approaches where you want to explore and compare alternatives.

6. Goal Decomposition

Breaks a high-level goal into structured GoalPhase objects, each with concrete tasks.

from fireflyframework_agentic.reasoning import GoalDecompositionPattern

decomposer = GoalDecompositionPattern(max_steps=20)
result = await decomposer.execute(my_agent, "Build an IDP pipeline for invoice processing.")

When to use: Strategic planning, project decomposition, breaking down large goals into actionable phases.

Configurable Prompts

Every pattern uses PromptTemplate instances for its LLM calls. You can override any prompt by passing a prompts dict:

from fireflyframework_agentic.prompts.template import PromptTemplate
from fireflyframework_agentic.reasoning import ReActPattern

custom_thought = PromptTemplate(
    "my:react:thought",
    "You are a careful reasoner.",            # system_template
    "Think carefully about: {{ context }}",   # user_template
    required_variables=["context"],
)
pattern = ReActPattern(prompts={"thought": custom_thought})

Each pattern has named prompt slots:

  • ReAct: "thought"
  • Chain of Thought: "step"
  • Plan-and-Execute: "plan", "execute_step", "replan"
  • Reflexion: "critique", "retry"
  • Tree of Thoughts: "branch", "evaluate"
  • Goal Decomposition: "decompose", "plan_phase"

All built-in prompts are registered in the global prompt_registry under the reasoning: namespace (e.g. "reasoning:react:thought").

Reasoning Pipeline

Chain patterns sequentially — the output of one becomes the input to the next:

from fireflyframework_agentic.reasoning import ReasoningPipeline, PlanAndExecutePattern, ReActPattern

pipeline = ReasoningPipeline([
    PlanAndExecutePattern(max_steps=15),
    ReActPattern(max_steps=10),
])
result = await pipeline.execute(my_agent, "Analyse the invoice and verify the totals.")

Reasoning Trace

Every pattern records its work in a ReasoningTrace. The trace contains typed steps:

  • ThoughtStep — The agent's reasoning.
  • ActionStep — A tool invocation.
  • ObservationStep — The result of an action.
  • ReflectionStep — Self-evaluation.
  • PlanStep — A plan or sub-goal.
for step in result.trace.steps:
    print(f"[{step.step_type}] {step.content[:80]}...")

Creating a Custom Pattern

Extend AbstractReasoningPattern and override the template methods:

from fireflyframework_agentic.reasoning.base import AbstractReasoningPattern

class VerifyAndCorrectPattern(AbstractReasoningPattern):
    def __init__(self, *, max_steps: int = 5):
        super().__init__("verify_and_correct", max_steps=max_steps)

    async def _reason(self, state):
        # Generate a thought about the current state
        ...

    async def _act(self, state):
        # Perform verification action
        ...

    async def _observe(self, state, action):
        # Process verification result
        ...

    async def _should_continue(self, state):
        # Continue until verification passes or max steps reached
        ...

    async def _extract_output(self, state):
        # Return the verified/corrected output
        ...

Register it to make it available framework-wide:

from fireflyframework_agentic.reasoning.registry import reasoning_registry

reasoning_registry.register("verify_and_correct", VerifyAndCorrectPattern)

IDP Tie-In: Extraction with Plan-and-Execute + Reflexion

Now let's put reasoning patterns to work in our invoice pipeline. In Chapter 4 we defined IDP tools (ocr_extract, vendor_lookup) and grouped them into extraction_kit. We also created extractor_agent with those tools attached via extraction_kit.as_pydantic_tools(). Here we pass that agent — tools and all — to reasoning patterns.

The extraction phase is the hardest part — we need to find invoice numbers, vendor names, amounts, and line items from messy OCR text. A single-shot prompt might miss something, so we use Plan-and-Execute to break it into steps, and Reflexion as a safety net when validation catches errors:

from fireflyframework_agentic.reasoning import PlanAndExecutePattern, ReflexionPattern

# ── Recall from Chapter 4 ───────────────────────────────────────────
# extractor_agent = FireflyAgent(
# name="extractor",
# model="openai:gpt-4o",
# tools=extraction_kit.as_pydantic_tools(), # ocr_extract, vendor_lookup, CalculatorTool
# )
# The tools are already bound — reasoning patterns call agent.run() internally,
# so the agent can invoke any of its tools during each reasoning step.
# ─────────────────────────────────────────────────────────────────────

# Phase 3: Systematic extraction.
# Plan-and-Execute generates a plan ("find invoice number", "find vendor", ...)
# and executes each step with status tracking. If a step fails, it can replan.
extraction_pattern = PlanAndExecutePattern(max_steps=15, allow_replan=True)
extraction_result = await extraction_pattern.execute(
    extractor_agent, # Tools already attached in Ch4 via ToolKit.as_pydantic_tools()
    f"Extract invoice fields from:\n{ocr_text}",
)

# Phase 4: If validation catches errors (e.g. invalid date format), use Reflexion
# to let the agent critique its own output and self-correct.
if not validation_passed:
    reflexion = ReflexionPattern(max_steps=2)
    corrected = await reflexion.execute(
        extractor_agent,
        f"Previous extraction had errors: {errors}. Re-extract from:\n{ocr_text}",
    )

Architecture recap: Reasoning patterns never see tools directly. They receive an agent (which owns its tools) and call agent.run(). This is why tools must be bound to the agent before passing it to a pattern — see the "Attaching Tools to Agents" section in Chapter 4 and the full idp/tools.py module in Chapter 18.


Chapter 7: Content Processing

Here's a problem you'll hit fast: your 50-page invoice PDF produces OCR text that is 200,000 tokens long — but your model's context window is 128K, and you're paying per token. You can't just shove the entire document into one call.

The Content module solves this with a three-stage pipeline:

  1. Chunk — Split oversized content into overlapping pieces that each fit within a token budget. TextChunker handles text (by token, sentence, or paragraph), DocumentSplitter handles multi-page documents, and ImageTiler handles large images.
  2. Process — Send chunks through an agent concurrently via BatchProcessor. This is where you do per-chunk OCR cleanup, translation, or summarisation.
  3. Compress — Merge the results back into a single context that fits the downstream agent's window. ContextCompressor supports three strategies: truncation (cheap, lossy), summarisation (LLM-based, preserves meaning), and map-reduce (chunk → summarise each → merge).

Together they let you process documents of any size without losing information or blowing up your bill.

Content Processing Pipeline

graph LR
    subgraph "1. Chunk"
        TC["TextChunker"]
        DS["DocumentSplitter"]
        IT["ImageTiler"]
    end

    subgraph "2. Process"
        BP["BatchProcessor\n(concurrent agent calls)"]
    end

    subgraph "3. Compress"
        CC["ContextCompressor"]
        TR["TruncationStrategy"]
        SM["SummarizationStrategy"]
        MR["MapReduceStrategy"]
    end

    subgraph "Utilities"
        TE["TokenEstimator"]
        SW["SlidingWindowManager"]
    end

    TC --> BP
    DS --> BP
    IT --> BP
    BP --> CC
    CC --> TR
    CC --> SM
    CC --> MR
    TE -.-> TC
    TE -.-> CC
    TE -.-> SW

Text Chunking

TextChunker splits text into overlapping chunks using one of three strategies:

from fireflyframework_agentic.content.chunking import TextChunker

chunker = TextChunker(
    chunk_size=4000, # Max tokens per chunk
    chunk_overlap=200, # Overlap between consecutive chunks
    strategy="paragraph", # "token" | "sentence" | "paragraph"
)

chunks = chunker.chunk(long_document_text)
for chunk in chunks:
    print(f"Chunk {chunk.index}/{chunk.total_chunks}: {len(chunk.content)} chars")
    # chunk.metadata contains additional info

Each chunk is a Chunk model with content, index, total_chunks, overlap_tokens, and an open metadata dict.

Strategy guide: - "token" — Split by estimated token count. Best for arbitrary text. - "sentence" — Split at sentence boundaries. Best for natural language. - "paragraph" — Split at paragraph boundaries (\n\n). Best for structured docs.

Document Splitter

DocumentSplitter handles multi-document inputs, splitting at page breaks (\f) or horizontal rules (---):

from fireflyframework_agentic.content.chunking import DocumentSplitter

splitter = DocumentSplitter(min_length=50)
segments = splitter.split(multi_page_text)

Image Tiler

For large images that exceed a VLM's pixel budget, ImageTiler computes tile coordinates:

from fireflyframework_agentic.content.chunking import ImageTiler

tiler = ImageTiler(tile_width=1024, tile_height=1024, overlap=128)
tiles = tiler.compute_tiles(image_width=4096, image_height=6144)
# Each tile is a Chunk with metadata: x, y, width, height, row, col

Batch Processing

BatchProcessor sends chunks through an agent concurrently. By default it returns a list of per-chunk result strings; pass a result_aggregator callable to merge them into any shape you like:

from fireflyframework_agentic.content.chunking import BatchProcessor

processor = BatchProcessor(concurrency=4)
results = await processor.process(ocr_agent, chunks)  # -> list[str]

Context Compression

When concatenated chunks still exceed the extraction agent's context window, compress them:

Truncation Strategy

Hard-cuts the text at a token limit:

from fireflyframework_agentic.content.compression import ContextCompressor, TruncationStrategy

compressor = ContextCompressor(strategy=TruncationStrategy())
compressed = await compressor.compress(full_text, max_tokens=8000)

Summarization Strategy

Uses an LLM agent to intelligently summarise:

from fireflyframework_agentic.content.compression import SummarizationStrategy

compressor = ContextCompressor(
    strategy=SummarizationStrategy(summary_agent)
)
compressed = await compressor.compress(full_text, max_tokens=8000)

Map-Reduce Strategy

Chunks the text, summarises each chunk in parallel, then merges the summaries:

from fireflyframework_agentic.content.compression import MapReduceStrategy

compressor = ContextCompressor(
    strategy=MapReduceStrategy(summary_agent)
)
compressed = await compressor.compress(full_text, max_tokens=8000)

Sliding Window Manager

Maintains a sliding window over a stream of messages, keeping total token usage within budget:

from fireflyframework_agentic.content.compression import SlidingWindowManager

window = SlidingWindowManager(max_tokens=8000)
window.add("First OCR page output...")
window.add("Second OCR page output...")
current_context = window.get_context() # Only recent items that fit

Token Estimator

Estimate token counts without an API call:

from fireflyframework_agentic.content.compression import TokenEstimator

estimator = TokenEstimator() # Default ratio: 1.33 tokens per word
tokens = estimator.estimate("This is a test sentence.")

Binary Ingestion (content.binary)

The text pipeline above assumes you already have text. For real documents — PDFs, Office files, images, archives, emails — the content.binary submodule (installed via the [binary] extra) turns raw caller bytes into one or more normalised BinaryArtifact rows ready for OCR/extraction. BinaryNormalizer dispatches by sniffed media type and delegates to pluggable handlers: PdfGuard (PDF sanitisation), ImageNormalizer, OfficeConverter (build via build_office_converter, backed by GotenbergConverter, LibreOfficeConverter, or NoOpOfficeConverter), ArchiveUnpacker, and EmailUnpacker.

from fireflyframework_agentic.content.binary import (
    BinaryNormalizer, BinaryConfig, build_office_converter, sniff_media_type,
)

config = BinaryConfig()  # caps, toggles
normalizer = BinaryNormalizer(config=config, office=build_office_converter(config))

# normalise() is async and never returns empty.
artifacts = await normalizer.normalise(raw_bytes, filename="invoice.pdf")
for art in artifacts:
    print(art.media_type, art.kind, art.page_count)

Handlers are injected so a host can swap implementations (e.g. Gotenberg vs LibreOffice) without touching the framework.

IDP Tie-In: Processing Large Documents

In our IDP pipeline, the OCR phase may produce text that exceeds the extraction agent's context window. Here's how we handle it:

from fireflyframework_agentic.content.chunking import TextChunker, BatchProcessor
from fireflyframework_agentic.content.compression import ContextCompressor, MapReduceStrategy

# Step 1: Chunk the raw OCR output
chunker = TextChunker(chunk_size=3000, chunk_overlap=200, strategy="paragraph")
chunks = chunker.chunk(raw_ocr_text)

# Step 2: If needed, process chunks in parallel through OCR cleanup agent.
# By default process() returns a list of strings (one per chunk).
processor = BatchProcessor(concurrency=4)
cleaned_chunks = await processor.process(ocr_cleanup_agent, chunks)

# Step 3: Compress for the extraction agent (compress() is async — await it).
full_text = "\n".join(cleaned_chunks)
compressor = ContextCompressor(
    strategy=MapReduceStrategy(summary_agent)
)
compressed_text = await compressor.compress(full_text, max_tokens=8000)

Chapter 8: Memory

Without memory, every agent.run() call starts from scratch — the agent has no idea what happened in the previous turn, what the pipeline figured out two steps ago, or what facts a delegated sub-agent discovered. That's fine for one-shot tasks, but real applications need context: multi-turn conversations, session state, pipeline variables. The Memory module gives your agents a brain that persists across calls.

There are two kinds of memory here: - Conversation memory — the actual chat history ("what was said"), automatically trimmed to stay within your token budget. - Working memory — a key-value scratchpad ("what we know") for storing facts like doc_type = "invoice" that multiple agents or pipeline steps can read.

Architecture

graph TD
    subgraph MemoryManager
        MM["MemoryManager<br/><small>new_conversation · fork<br/>set_fact · get_fact</small>"]
    end

    subgraph Conversation
        CM["ConversationMemory<br/><small>add_turn · get_history<br/>token budget · FIFO eviction</small>"]
    end

    subgraph Working
        WM["WorkingMemory<br/><small>set · get · delete<br/>scoped namespaces</small>"]
    end

    subgraph Backends
        IMS["InMemoryStore<br/><small>dict-backed</small>"]
        FS["FileStore<br/><small>JSON file per namespace</small>"]
        CS["YourCustomStore<br/><small>implements MemoryStore protocol</small>"]
    end

    subgraph Consumers
        AGT["FireflyAgent<br/><small>auto message_history</small>"]
        DR["DelegationRouter<br/><small>auto fork on delegation</small>"]
        PIPE["PipelineContext<br/><small>propagated to steps</small>"]
        RP["ReasoningPattern<br/><small>state['memory']</small>"]
    end

    MM --> CM
    MM --> WM
    WM --> IMS
    WM --> FS
    WM --> CS
    AGT --> MM
    DR --> MM
    PIPE --> MM
    RP --> MM

The system has four layers:

  1. ConversationMemory — Token-aware, per-conversation chat history. Wraps Pydantic AI's message_history and drops the oldest turns when you exceed budget.
  2. WorkingMemory — Scoped key-value scratchpad for facts and intermediate state.
  3. MemoryStore — Pluggable persistence (InMemoryStore, FileStore, or yours).
  4. MemoryManager — Facade that composes conversation + working memory behind a single API. This is the object you wire into agents, pipelines, and patterns.

Quick Start

Here's the simplest way to give an agent a memory — attach a MemoryManager and pass a conversation_id to each call:

from fireflyframework_agentic.agents import FireflyAgent
from fireflyframework_agentic.memory import MemoryManager

# Create a memory manager with a 32K token budget for conversation history.
# When the history exceeds this, the oldest turns get dropped automatically.
memory = MemoryManager(max_conversation_tokens=32_000)

# Wire the memory into the agent. From now on, every run() call
# can participate in a persistent conversation.
agent = FireflyAgent(name="assistant", model="openai:gpt-4o", memory=memory)

# Start a new conversation — this returns a unique conversation ID.
conv_id = memory.new_conversation()

# Turn 1: the agent has no history yet.
result1 = await agent.run("What is Python?", conversation_id=conv_id)

# Turn 2: under the hood, FireflyAgent loads the message_history from Turn 1
# and passes it to Pydantic AI, so the model sees the full conversation.
result2 = await agent.run("What about its type system?", conversation_id=conv_id)
# result2 knows we were talking about Python — no context lost.

Conversation Memory

ConversationMemory manages chat history per conversation ID. It enforces a token budget by dropping the oldest turns (FIFO):

from fireflyframework_agentic.memory import ConversationMemory

conv_mem = ConversationMemory(max_tokens=16_000)
cid = conv_mem.new_conversation()

# After an agent run, store the turn
conv_mem.add_turn(
    cid,
    user_prompt="Hello",
    assistant_response="Hi there!",
    raw_messages=result.new_messages(),
)

# Before the next run, get trimmed history
history = conv_mem.get_message_history(cid)

When FireflyAgent has memory attached, this is all automatic.

Working Memory

WorkingMemory is a scoped key-value store for facts and intermediate state:

from fireflyframework_agentic.memory import WorkingMemory

wm = WorkingMemory(scope_id="idp-session-42")
wm.set("doc_type", "invoice")
wm.set("vendor", "Acme Corp")

print(wm.get("doc_type")) # "invoice"
print(wm.to_dict()) # {"doc_type": "invoice", "vendor": "Acme Corp"}

# Render as a context block for prompt injection
print(wm.to_context_string())
# Working Memory:
# - doc_type: invoice
# - vendor: Acme Corp

Multiple WorkingMemory instances can share a backend while maintaining independent namespaces:

from fireflyframework_agentic.memory import InMemoryStore, WorkingMemory

store = InMemoryStore()
agent_a_mem = WorkingMemory(store=store, scope_id="agent_a")
agent_b_mem = WorkingMemory(store=store, scope_id="agent_b")

agent_a_mem.set("key", "from A")
agent_b_mem.set("key", "from B")
assert agent_a_mem.get("key") == "from A" # Isolated

Storage Backends

InMemoryStore

Dict-backed, fast, non-persistent. Ideal for testing and short-lived sessions:

from fireflyframework_agentic.memory import InMemoryStore
store = InMemoryStore()

FileStore

JSON file persistence. Each namespace gets its own file:

from fireflyframework_agentic.memory import FileStore
store = FileStore(base_dir=".firefly_memory")

SQLiteStore

Single-file SQLite persistence — durable and queryable without a separate server:

from fireflyframework_agentic.memory import SQLiteStore
store = SQLiteStore(path=".firefly_memory/memory.db")

For larger deployments the memory_backend config field also accepts "postgres" and "mongodb" (configured via memory_postgres_url / memory_mongodb_url). Conversation memory can auto-summarise evicted turns with a summariser built by create_llm_summarizer(agent).

Custom Backends

Implement the MemoryStore protocol:

from fireflyframework_agentic.memory import MemoryStore, MemoryEntry

class RedisStore:
    def save(self, namespace: str, entry: MemoryEntry) -> None: ...
    def load(self, namespace: str) -> list[MemoryEntry]: ...
    def load_by_key(self, namespace: str, key: str) -> MemoryEntry | None: ...
    def delete(self, namespace: str, entry_id: str) -> None: ...
    def clear(self, namespace: str) -> None: ...

MemoryManager

The MemoryManager facade is the object you attach to agents, delegation routers, and pipelines:

from fireflyframework_agentic.memory import MemoryManager

mgr = MemoryManager(
    max_conversation_tokens=32_000,
    working_scope_id="main-session",
)

# Conversation
cid = mgr.new_conversation()
mgr.add_turn(cid, "hello", "hi", raw_messages)
history = mgr.get_message_history(cid)

# Working memory
mgr.set_fact("doc_type", "invoice")
mgr.get_fact("doc_type") # "invoice"

Forking

When delegating to a sub-agent or branching a pipeline, fork() creates a child manager that shares conversation memory but gets independent working memory. This means a delegated agent can store its own facts without overwriting the parent's:

# Parent memory has facts about the overall session
mgr.set_fact("session_id", "abc-123")

# Fork for a sub-agent — it gets its own working memory scope
# but can still read the same conversation history.
child = mgr.fork(working_scope_id="sub-agent-classify")
child.set_fact("classification", "invoice") # Only visible in child
assert mgr.get_fact("classification") is None # Parent is unaffected
assert child.get_fact("session_id") is None # Child has its own scope

Memory Integration Points

Memory flows through the framework in four ways. Understanding these is key to building applications where context is never lost:

1. Agent integration — When FireflyAgent has a MemoryManager attached, calling run(prompt, conversation_id=cid) automatically loads conversation history as message_history before the call and stores new messages after it. You never touch message_history manually.

2. Delegation integrationDelegationRouter automatically forks memory when routing to a sub-agent, so the delegated agent gets its own working-memory scope while sharing conversation context.

3. Pipeline integration — When you create PipelineContext(memory=mgr), both AgentStep and ReasoningStep propagate the memory to the agent and pattern respectively. Facts stored in one step are readable in subsequent steps.

4. Reasoning integration — Pass memory=mgr as a keyword argument to pattern.execute(agent, input, memory=mgr). The memory object becomes available as state["memory"] inside all pattern hooks (_reason, _act, etc.), so custom patterns can read and write working memory during reasoning iterations.

Configuration

Memory settings via environment variables:

export FIREFLY_AGENTIC_MEMORY_BACKEND=in_memory
export FIREFLY_AGENTIC_MEMORY_MAX_CONVERSATION_TOKENS=128000
export FIREFLY_AGENTIC_MEMORY_SUMMARIZE_THRESHOLD=10
export FIREFLY_AGENTIC_MEMORY_FILE_DIR=.firefly_memory

IDP Tie-In: Carrying Facts Across Pipeline Steps

Now let's see why memory matters for our IDP pipeline. When the classifier figures out that a document is an invoice, the extractor needs to know that — it selects different prompts for invoices vs. receipts. Working memory is the bridge:

from fireflyframework_agentic.memory import MemoryManager

# One memory manager for the entire pipeline run.
# Every step can read and write facts here.
memory = MemoryManager(working_scope_id="idp-run-001")

# Phase 1: The classifier stores what it learned.
# These facts persist across all subsequent pipeline steps.
classification = {"doc_type": "invoice", "language": "en", "page_count": 2}
memory.set_fact("doc_type", classification["doc_type"])
memory.set_fact("language", classification["language"])
memory.set_fact("page_count", classification["page_count"])

# Phase 3: The extractor reads what the classifier discovered.
# This works because both steps share the same MemoryManager.
doc_type = memory.get_fact("doc_type") # "invoice"
# Now we can pick the invoice-specific extraction prompt.

Chapter 9: Validation & Quality of Service

You've built an agent that extracts invoice data — but how do you trust the output? LLMs are probabilistic: they can hallucinate an invoice number that doesn't exist, format a date as "January 15th" instead of ISO 8601, or return an amount as a string instead of a number. In production, bad data propagates downstream and causes real damage.

The Validation module gives you a two-layer defence:

  • Layer 1 — Structural validation catches deterministic errors. You define rules (regex, range, format, enum, or custom functions) for each field, group them into an OutputValidator, and run them against the extracted dict or Pydantic model. This is fast, cheap (no LLM call), and catches the most common failures: wrong date formats, out-of-range amounts, invalid invoice number patterns.

  • Layer 2 — Quality of Service (QoS) catches statistical errors that rules can't see. ConfidenceScorer asks the LLM to self-rate its own output. ConsistencyChecker runs the same prompt multiple times and measures agreement — low agreement suggests hallucination. GroundingChecker verifies that extracted values actually appear in the source document (no LLM needed). QoSGuard composes all three into a single pass/fail gate.

Both layers can be wired into an OutputReviewer that wraps an agent call with automatic retry: if structural validation or QoS fails, the reviewer feeds the errors back to the agent and re-runs — up to max_retries times.

Output Validation Rules

Rules are composable predicates that check a single field value:

from fireflyframework_agentic.validation.rules import (
    RegexRule,
    FormatRule,
    RangeRule,
    EnumRule,
    CustomRule,
    FieldValidator,
)

# Regex: field value must match a pattern.
# First arg is always the field name.
inv_rule = RegexRule("invoice_number", r"^INV-\d{4,10}$")

# Format: field matches a named format (email, url, date, uuid, iso_date)
date_rule = FormatRule("due_date", "iso_date")

# Range: numeric field within bounds
amount_rule = RangeRule("total_amount", min_value=0.01, max_value=10_000_000)

# Enum: field value is one of an allowed set
status_rule = EnumRule("status", ["approved", "rejected", "pending"])

# Custom: any predicate function
custom_rule = CustomRule(
    "vendor_name",
    lambda v: isinstance(v, str) and len(v.strip()) > 0,
    description="Value must be a non-empty string",
)

Field Validator

Validates a single field against one or more rules:

validator = FieldValidator("invoice_number", [inv_rule])
# validate() takes the field's value directly, not a dict.
results = validator.validate("INV-001234")
assert all(r.passed for r in results)

Output Validator

Validates an entire structured output (dict or Pydantic model). The constructor takes a dict mapping field names to lists of rules:

from fireflyframework_agentic.validation.rules import OutputValidator

validator = OutputValidator({
    "invoice_number": [RegexRule("invoice_number", r"^INV-\d{4,10}$")],
    "vendor_name": [RegexRule("vendor_name", r".{2,}")],
    "total_amount": [RangeRule("total_amount", min_value=0.01, max_value=10_000_000)],
    "due_date": [FormatRule("due_date", "iso_date")],
    "status": [EnumRule("status", ["approved", "rejected", "pending"])],
})

# validate() accepts a dict, Pydantic model, or any object with __dict__.
report = validator.validate(extracted_data)
if not report.valid:
    for err in report.errors:
        print(f" {err.field_name}: {err.message}")

Quality of Service (QoS)

The QoS module provides post-generation quality checks. Each checker requires an agent (for LLM-based evaluation) or operates purely on text (grounding).

Confidence Scorer

Asks the agent to self-evaluate its output confidence on a 0.0–1.0 scale:

from fireflyframework_agentic.validation.qos import ConfidenceScorer

# The scorer needs an agent — it sends a self-evaluation prompt to the LLM.
scorer = ConfidenceScorer(evaluator_agent)
score = await scorer.score("The invoice total is $1,234.56") # 0.0–1.0

Consistency Checker

Runs the same prompt multiple times and measures word-level agreement across outputs (Jaccard similarity):

from fireflyframework_agentic.validation.qos import ConsistencyChecker

# The checker needs an agent and a run count. It runs the prompt num_runs times.
checker = ConsistencyChecker(extractor_agent, num_runs=3)
score, outputs = await checker.check("What is the capital of France?")
# score: float (1.0 = all answers agree), outputs: list[str]
print(f"Consistency: {score:.2f} across {len(outputs)} runs")

Grounding Checker

Verifies that extracted field values actually appear in the source document — no agent needed, purely text-based:

from fireflyframework_agentic.validation.qos import GroundingChecker

checker = GroundingChecker(min_grounding_ratio=0.8)

# check() takes the source text and a dict of field_name → extracted_value.
score, field_map = checker.check(
    source_text="Invoice #INV-001: Total Amount: $1,234.56. Vendor: Acme Corp.",
    extracted_fields={"total": "$1,234.56", "vendor": "Acme Corp", "id": "INV-999"},
)
# score: float (fraction of grounded fields), field_map: dict[str, bool]
print(f"Grounding: {score:.2f}") # 0.67 (2 of 3 grounded)
print(f"Ungrounded: {[k for k, v in field_map.items() if not v]}") # ["id"]

QoS Guard

Composes all checks into a single gate. You build the individual checkers and pass them in:

from fireflyframework_agentic.validation.qos import (
    QoSGuard, ConfidenceScorer, ConsistencyChecker, GroundingChecker,
)

# Build the individual checkers first.
guard = QoSGuard(
    confidence_scorer=ConfidenceScorer(evaluator_agent),
    consistency_checker=ConsistencyChecker(extractor_agent, num_runs=3),
    grounding_checker=GroundingChecker(),
    min_confidence=0.8,
    min_consistency=0.6,
    min_grounding=0.8,
)

# evaluate() takes the output string plus optional context for each check.
result = await guard.evaluate(
    str(extracted_data),
    prompt="Extract fields from the invoice.",
    source_text=ocr_text,
    extracted_fields={"total": "$1,234.56", "vendor": "Acme Corp"},
)
# result is a QoSResult with: .passed, .confidence, .consistency_score,
# .grounding_score, .details (dict with per-check info)
if result.passed:
    print("Quality check passed")
else:
    print(f"Failed: confidence={result.confidence:.2f}")

Output Reviewer

The OutputReviewer closes the loop between generation and validation. It wraps an agent call with schema parsing + rule validation + automatic retry. When the LLM produces output that fails Pydantic parsing or validation rules, the reviewer automatically retries with a feedback prompt describing exactly what was wrong.

Basic Usage

from pydantic import BaseModel, Field
from fireflyframework_agentic.validation import OutputReviewer

class InvoiceData(BaseModel):
    vendor: str
    amount: float = Field(ge=0)
    date: str
    invoice_number: str | None = None

reviewer = OutputReviewer(output_type=InvoiceData, max_retries=3)
result = await reviewer.review(
    agent,
    "Extract invoice data from: Acme Corp, $1,234, 2026-01-15",
)
print(result.output) # InvoiceData(vendor="Acme Corp", amount=1234.0, ...)
print(result.attempts) # 1 if first try succeeded, 2+ if retries needed

With Validation Rules

Combine schema parsing with field-level rules:

from fireflyframework_agentic.validation import OutputReviewer, OutputValidator, EnumRule

validator = OutputValidator({"vendor": [EnumRule("vendor", ["Acme Corp", "Globex"])]})
reviewer = OutputReviewer(
    output_type=InvoiceData,
    validator=validator,
    max_retries=2,
)

With Reasoning Patterns

Attach a reviewer to any reasoning pattern to validate the final output:

from fireflyframework_agentic.reasoning import ReActPattern
from fireflyframework_agentic.validation import OutputReviewer

reviewer = OutputReviewer(output_type=InvoiceData, max_retries=2)
pattern = ReActPattern(reviewer=reviewer)
result = await pattern.execute(agent, "Extract invoice data from the document.")
# Output is automatically validated and retried if needed

ReviewResult

The result contains:

  • output — The validated output.
  • attempts — Total attempts (1 = first try succeeded).
  • validation_report — The final ValidationReport if a validator was used.
  • retry_history — List of RetryAttempt objects with attempt number, raw output, and error messages.

Rubric Reviewer (LLM-as-judge)

Where OutputReviewer enforces a schema and deterministic rules, RubricReviewer evaluates free-form output against a list of natural-language criteria using a separate grader agent. When criteria are unmet, it sends a revision prompt back to the generator and loops (up to max_iterations), returning a ReviewResult:

from fireflyframework_agentic.validation import RubricReviewer

reviewer = RubricReviewer(
    rubric=[
        "All five invoice fields are present.",
        "Amounts are numeric, not strings.",
        "The due date is ISO 8601.",
    ],
    grader=evaluator_agent,  # optional; a default grader is created otherwise
    max_iterations=3,
)
result = await reviewer.review(extractor_agent, f"Extract fields from:\n{ocr_text}")

You can also load the rubric from a Markdown bullet list with RubricReviewer.from_rubric_file("rubric.md").

IDP Tie-In: Validating Extracted Invoice Data

For our IDP pipeline, we combine structural validation, QoS checks, and the output reviewer:

from fireflyframework_agentic.validation.rules import (
    OutputValidator, FieldValidator, RegexRule, FormatRule, RangeRule,
)
from fireflyframework_agentic.validation.qos import QoSGuard
from fireflyframework_agentic.validation import OutputReviewer
from pydantic import BaseModel, Field

# Define the expected output schema
class InvoiceExtraction(BaseModel):
    invoice_number: str
    vendor_name: str
    total_amount: float = Field(ge=0)
    due_date: str
    line_items: list[dict]

# Structural validation rules — dict of field_name -> list of rules
invoice_validator = OutputValidator({
    "invoice_number": [RegexRule("invoice_number", r"^INV-\d{4,10}$")],
    "vendor_name": [RegexRule("vendor_name", r".{2,}")],
    "total_amount": [RangeRule("total_amount", min_value=0.01, max_value=10_000_000)],
    "due_date": [FormatRule("due_date", "iso_date")],
})

# Output reviewer with schema + rules + retry
reviewer = OutputReviewer(
    output_type=InvoiceExtraction,
    validator=invoice_validator,
    max_retries=3,
)

# QoS guard — compose individual checkers with thresholds
from fireflyframework_agentic.validation.qos import (
    QoSGuard, ConfidenceScorer, ConsistencyChecker, GroundingChecker,
)

qos = QoSGuard(
    confidence_scorer=ConfidenceScorer(evaluator_agent),
    consistency_checker=ConsistencyChecker(extractor_agent, num_runs=2),
    grounding_checker=GroundingChecker(),
    min_confidence=0.8,
    min_consistency=0.6,
    min_grounding=0.8,
)

Part III — Orchestration & Operations


Chapter 10: Pipeline

So far we have individual agents (classify, digitise, extract), tools (OCR, vendor lookup), reasoning patterns (Plan-and-Execute, Reflexion), validation rules, and memory. Each piece works in isolation — but a real IDP system needs to wire them together into a single, reliable flow: classify → digitise → extract → validate → assemble.

The Pipeline module does exactly that. It models your processing flow as a Directed Acyclic Graph (DAG) where nodes are processing steps and edges define data flow. The engine schedules nodes by topological level — nodes at the same level run concurrently — and handles retries, timeouts, and conditional execution automatically.

Pipeline Execution Architecture

The following diagram shows how the pipeline engine executes a DAG:

graph LR
    subgraph "Pipeline Engine"
        B["PipelineBuilder"] --> DAG["DAG\n(topological sort)"]
        DAG --> L0["Level 0\n(no dependencies)"]
        DAG --> L1["Level 1\n(depends on L0)"]
        DAG --> L2["Level 2\n(depends on L1)"]
    end

    subgraph "Step Executors"
        AS["AgentStep\n(runs FireflyAgent)"]
        RS["ReasoningStep\n(runs pattern + agent)"]
        CS["CallableStep\n(runs async function)"]
        FO["FanOutStep\n(splits input)"]
        FI["FanInStep\n(merges outputs)"]
    end

    subgraph "Context & Results"
        PC["PipelineContext\n(inputs, metadata, memory)"]
        PR["PipelineResult\n(outputs, trace, duration)"]
    end

    L0 --> AS
    L1 --> RS
    L2 --> CS
    L0 & L1 & L2 -.->|concurrent within level| PC
    PC --> PR

Core Concepts

A pipeline is a Directed Acyclic Graph (DAG) where:

  • Nodes are processing steps (call an agent, run a reasoning pattern, execute a function).
  • Edges define data flow and execution order.
  • The engine schedules nodes by topological level — nodes at the same level run concurrently.

The Pipeline Builder

The fluent PipelineBuilder is the recommended way to construct pipelines:

from fireflyframework_agentic.pipeline.builder import PipelineBuilder
from fireflyframework_agentic.pipeline.steps import AgentStep, CallableStep, ReasoningStep

engine = (
    PipelineBuilder("invoice-pipeline")
    .add_node("classify", AgentStep(classifier_agent))
    .add_node("extract", AgentStep(extractor_agent))
    .add_node("validate", CallableStep(validate_fn))
    .chain("classify", "extract", "validate")
    .build()
)

result = await engine.run(inputs="<invoice text>")

Step Executors

The built-in executors (all implementing StepExecutor) cover most scenarios:

  • AgentStep — Runs a FireflyAgent with the input as prompt.
  • ReasoningStep — Runs a reasoning pattern through an agent.
  • CallableStep — Wraps any async function (context, inputs) -> output.
  • FanOutStep — Splits input into a list for parallel downstream processing.
  • FanInStep — Merges outputs from multiple upstream nodes.
  • BranchStep — Routes to one of several downstream paths by a router function.
  • BatchLLMStep — Runs an agent over a batch of inputs concurrently.
  • EmbeddingStep — Embeds text via a BaseEmbedder (see Embeddings & Vector Stores).
  • RetrievalStep — Retrieves nearest neighbours from a vector store: RetrievalStep(store, *, embedder=None, top_k=5, input_key="input").

Parallel Execution (Fan-Out / Fan-In)

Process multiple items concurrently:

from fireflyframework_agentic.pipeline.steps import FanOutStep, FanInStep

engine = (
    PipelineBuilder("parallel-ocr")
    .add_node("split", FanOutStep(lambda doc: doc.pages))
    .add_node("ocr_1", AgentStep(ocr_agent))
    .add_node("ocr_2", AgentStep(ocr_agent))
    .add_node("merge", FanInStep())
    .add_edge("split", "ocr_1")
    .add_edge("split", "ocr_2")
    .add_edge("ocr_1", "merge", input_key="page_1")
    .add_edge("ocr_2", "merge", input_key="page_2")
    .build()
)

Conditional Execution

Gate nodes with a condition function. If the condition returns False, the node is skipped:

from fireflyframework_agentic.pipeline.dag import DAGNode

dag.add_node(DAGNode(
    node_id="ocr",
    step=AgentStep(ocr_agent),
    condition=lambda ctx: ctx.metadata.get("needs_ocr", False),
))

Retries and Timeouts

Configure per-node resilience:

engine = (
    PipelineBuilder("robust-pipeline")
    .add_node("extract", AgentStep(extractor_agent), retry_max=3, timeout_seconds=30.0)
    .build()
)

Retries use linear backoff. On exhaustion, the node fails and the pipeline reports success=False.

Pipeline Context

PipelineContext is the shared data bus flowing through the DAG:

from fireflyframework_agentic.pipeline.context import PipelineContext
from fireflyframework_agentic.memory import MemoryManager

memory = MemoryManager(working_scope_id="invoice-run-42")

ctx = PipelineContext(
    inputs=document_bytes,
    metadata={"source": "email", "tenant": "acme-corp"},
    memory=memory,
)
result = await engine.run(context=ctx)

It carries: inputs, metadata, correlation_id, and results from completed upstream nodes. When a MemoryManager is attached, AgentStep and ReasoningStep automatically propagate memory to agents and patterns.

Pipeline Result

PipelineResult aggregates all outcomes:

result = await engine.run(context=ctx)

if result.success:
    print(result.final_output)
    print(f"Completed in {result.total_duration_ms:.0f} ms")
else:
    print(f"Failed nodes: {result.failed_nodes}")

# Inspect individual nodes
for node_id, node_result in result.outputs.items():
    print(f" {node_id}: {'ok' if node_result.success else 'FAILED'}")

# Execution trace for observability
for entry in result.execution_trace:
    print(f" [{entry.node_id}] {entry.status} ({entry.duration_ms:.0f} ms)")

Manual DAG Construction

For full control, build the DAG directly:

from fireflyframework_agentic.pipeline.dag import DAG, DAGNode, DAGEdge
from fireflyframework_agentic.pipeline.engine import PipelineEngine

dag = DAG("my-pipeline")
dag.add_node(DAGNode(node_id="step_a", step=my_step))
dag.add_node(DAGNode(node_id="step_b", step=other_step))
dag.add_edge(DAGEdge(source="step_a", target="step_b"))

engine = PipelineEngine(dag)
result = await engine.run(inputs="hello")

Embeddings & Vector Stores

EmbeddingStep and RetrievalStep build on two reusable framework modules. The embeddings package ships BaseEmbedder/EmbedderRegistry with 8 provider backends (OpenAI, Azure, Cohere, Google, Mistral, Voyage, Bedrock, Ollama). The vectorstores package ships BaseVectorStore with 7 backends — InMemoryVectorStore, ChromaVectorStore, PineconeVectorStore, QdrantVectorStore, PgVectorVectorStore, and SqliteVecVectorStore — plus a scoping layer (ScopedVectorStore, TenantScopedVectorStore, scope_namespace, parse_scope_namespace) for multi-tenant isolation:

from fireflyframework_agentic.vectorstores import InMemoryVectorStore
from fireflyframework_agentic.pipeline import RetrievalStep

store = InMemoryVectorStore()          # async upsert / search / search_text / delete
retrieve = RetrievalStep(store, top_k=5, input_key="input")

The framework ships these as building blocks; it does not bundle a turnkey RAG/corpus agent. BaseEmbedder, EmbedderRegistry, BaseVectorStore, and InMemoryVectorStore are re-exported from the top-level fireflyframework_agentic package.

State-Based Pipelines, Checkpointing & Audit Logs

Beyond the port-based DAG above, PipelineBuilder has a state-based mode: pass a Pydantic state= model and nodes become async (state) -> dict | None | Pause | Send | list[Send] functions over a typed shared state. Branching is a single .branch(source, router) call, and state reducers (append, extend, merge_dict, replace) control how each node's returned dict is merged into the state.

Two control signals shape the flow: Pause suspends a run (resume later), and Send dispatches dynamic fan-out work. For durability, pass a Checkpointer (FileCheckpointer(root=...) writes CheckpointRecords) so a failed run can resume or start mid-pipeline. An AuditLog records every node transition — choose FileAuditLog, LoggingAuditLog, OtelAuditLog, or QueryableAuditLog (each entry is an AuditEntry):

from fireflyframework_agentic.pipeline import (
    PipelineBuilder, FileCheckpointer, OtelAuditLog,
)

engine = PipelineBuilder(
    "stateful-idp",
    state=IdpState,                       # a pydantic.BaseModel
    checkpointer=FileCheckpointer(root=".checkpoints"),
    audit_log=OtelAuditLog(),
).build()

IDP Tie-In: Wiring the Complete Pipeline

Here's our IDP pipeline as a DAG with all five phases:

from fireflyframework_agentic.pipeline.builder import PipelineBuilder
from fireflyframework_agentic.pipeline.steps import AgentStep, ReasoningStep, CallableStep
from fireflyframework_agentic.pipeline.context import PipelineContext
from fireflyframework_agentic.reasoning import PlanAndExecutePattern
from fireflyframework_agentic.memory import MemoryManager

# Step functions for non-agent nodes
async def validate_step(context, inputs):
    extracted = inputs.get("input", {})
    report = invoice_validator.validate(extracted)
    context.metadata["validation_report"] = report
    return {"extracted": extracted, "valid": report.valid, "errors": report.errors}

async def assemble_step(context, inputs):
    data = inputs.get("input", {})
    return {
        "document_type": context.metadata.get("doc_type", "unknown"),
        "extracted_fields": data.get("extracted", {}),
        "validation_passed": data.get("valid", False),
    }

# Build the DAG
idp_pipeline = (
    PipelineBuilder("idp-pipeline")
    .add_node("classify", AgentStep(classifier_agent))
    .add_node("digitise", AgentStep(ocr_agent), retry_max=2, timeout_seconds=60)
    .add_node(
        "extract",
        ReasoningStep(PlanAndExecutePattern(max_steps=15), extractor_agent),
    )
    .add_node("validate", CallableStep(validate_step))
    .add_node("assemble", CallableStep(assemble_step))
    .chain("classify", "digitise", "extract", "validate", "assemble")
    .build()
)

# Execute
memory = MemoryManager(working_scope_id="idp-session")
ctx = PipelineContext(
    inputs=document_bytes,
    metadata={"source": "email-inbox", "tenant_id": "acme-corp"},
    memory=memory,
)
result = await idp_pipeline.run(context=ctx)

if result.success:
    print(f"Document type: {result.final_output['document_type']}")
    print(f"Extracted fields: {result.final_output['extracted_fields']}")
else:
    print(f"Pipeline failed at: {result.failed_nodes}")

Chapter 11: Observability

Your pipeline is running in production, processing thousands of invoices a day. Then latency spikes. Or accuracy drops. Or a customer reports a missing field. Without observability you're flying blind — you have no idea which agent is slow, which tool is failing, or how many tokens you're burning.

The Observability module wraps OpenTelemetry and gives you three primitives out of the box: tracing (distributed spans across agents, tools, and pipeline steps), metrics (counters and histograms for tokens, latency, cost, errors, and reasoning depth), and events (structured logs for significant occurrences). When observability is enabled, the framework instruments agent runs automatically — you get spans for free.

Tracing

FireflyTracer wraps the OpenTelemetry Tracer and adds GenAI-specific attributes. It exposes purpose-built context managers — agent_span(agent_name, *, model=..., **attrs), tool_span(...), reasoning_span(...) — plus a generic custom_span(name, **attrs), an event(name, **attrs) annotation helper, and a static set_error(span, error):

from fireflyframework_agentic.observability import FireflyTracer

tracer = FireflyTracer(service_name="idp-service")

with tracer.agent_span("classifier", model="openai:gpt-4o") as span:
    result = await classifier_agent.run("Classify this document")
    span.set_attribute("tokens.total", 150)

# Or a generic span with arbitrary attributes:
with tracer.custom_span("agent.run", phase="classify") as span:
    ...

The @traced Decorator

Automatically create a span around any function:

from fireflyframework_agentic.observability import traced

@traced(name="classify_document")
async def classify_document(text: str) -> dict:
    return await classifier_agent.run(text)

Metrics

FireflyMetrics records GenAI-specific OpenTelemetry instruments via purpose-built methods (each takes keyword args like agent=, model=, operation=, pattern= — not a generic labels= dict):

from fireflyframework_agentic.observability import FireflyMetrics

metrics = FireflyMetrics(service_name="idp-service")

metrics.record_tokens(150, agent="classifier", model="openai:gpt-4o")
metrics.record_prompt_tokens(100, agent="classifier", model="openai:gpt-4o")
metrics.record_completion_tokens(50, agent="classifier", model="openai:gpt-4o")
metrics.record_latency(142.5, operation="classify", agent="classifier")
metrics.record_cost(0.0021, agent="classifier", model="openai:gpt-4o")
metrics.record_error(operation="classify", agent="classifier", error_type="Timeout")
metrics.record_reasoning_depth(4, pattern="react")

The @metered Decorator

@metered's first parameter is operation (it records latency, and an error on exception). Use a positional string or operation=:

from fireflyframework_agentic.observability import metered

@metered("extraction")
async def extract_fields(text: str) -> dict:
    return await extractor_agent.run(text)

Events

FireflyEvents emits structured events (logged as JSON-serialisable dicts) via typed methods — there is no generic emit():

from fireflyframework_agentic.observability import FireflyEvents

events = FireflyEvents()
events.agent_started("classifier", model="openai:gpt-4o")
events.agent_completed("classifier", tokens=150, latency_ms=250)
events.tool_executed("vendor_lookup", success=True, latency_ms=12)
events.reasoning_step("react", step=1, step_type="thought")
events.agent_error("classifier", error="timeout")

Exporter Configuration

The framework emits spans and metrics purely through the OpenTelemetry API; it does not configure the OTel SDK or any exporters itself. The host application owns OTel SDK and exporter setup — wire up your TracerProvider, MeterProvider, and the exporters (OTLP collector, console, etc.) however your deployment requires, and the framework's telemetry flows through the globally configured providers automatically.

Configuration via environment variables:

export FIREFLY_AGENTIC_OBSERVABILITY_ENABLED=true
export FIREFLY_AGENTIC_LOG_LEVEL=DEBUG

Usage Tracking & Cost Estimation

The framework automatically tracks token usage and estimates cost for every agent run, reasoning step, and pipeline execution. UsageTracker accumulates UsageRecord objects with input/output tokens, cost, latency, and model details.

from fireflyframework_agentic.observability import default_usage_tracker

# After running agents, inspect accumulated usage
summary = default_usage_tracker.get_summary()
print(f"Total tokens: {summary.total_tokens}")
print(f"Total cost: ${summary.total_cost_usd:.4f}")
print(f"Requests: {summary.total_requests}")

# Filter by agent or pipeline correlation ID
agent_summary = default_usage_tracker.get_summary_for_agent("extractor")
pipeline_summary = default_usage_tracker.get_summary_for_correlation("run-123")

Cost is computed by a resolver chain (observability/cost_resolvers.py). The default chain DEFAULT_RESOLVERS tries provider_reported_cost (uses cost the provider already reported) then genai_prices_cost (prices via the bundled genai-prices data). The entry point is resolve_cost:

from fireflyframework_agentic.observability import resolve_cost, CostContext

cost = resolve_cost(
    CostContext(model="openai:gpt-4o", input_tokens=1000, output_tokens=500)
)
# Returns the USD cost, or None when no resolver can price the model.

When config.cost_strict=True, an unpriceable model raises UnknownModelCostError instead of returning None. UsageTracker applies this chain automatically for every recorded run.

Budget Enforcement

Set a hard budget limit; a warning is logged when costs exceed it:

export FIREFLY_AGENTIC_BUDGET_LIMIT_USD=10.00

Pipeline results include aggregated usage automatically:

result = await engine.run(context=ctx)
if result.usage:
    print(f"Pipeline cost: ${result.usage.total_cost_usd:.4f}")

Automatic Integration

When an agent is invoked, the framework automatically creates trace spans, records metrics, emits events, and tracks usage/cost. You don't need to instrument agent code manually unless you want additional detail.

IDP Tie-In: Instrumenting the Pipeline

from fireflyframework_agentic.observability import FireflyTracer, traced

tracer = FireflyTracer(service_name="idp-service")

@traced(name="idp.process_document")
async def process_document(document_bytes: bytes) -> dict:
    with tracer.custom_span("idp.run") as span:
        ctx = PipelineContext(inputs=document_bytes)
        result = await idp_pipeline.run(context=ctx)
        span.set_attribute("idp.success", result.success)
        # Per-agent token, latency, and cost metrics are recorded automatically
        # when observability is enabled; inspect them via default_usage_tracker.
        return result.final_output

Chapter 12: Explainability

In regulated industries — finance, healthcare, legal — "the model said so" is not an acceptable answer. Auditors, compliance officers, and customers need to see why the agent classified a document as an invoice, why it chose one vendor name over another, and what alternatives it considered. The Explainability module provides four building blocks: a trace recorder that captures every decision, an explanation generator that turns raw records into natural-language narratives, an append-only audit trail, and a report builder that compiles everything into markdown or JSON.

Trace Recorder

TraceRecorder captures decision records during execution. The method is record(category, *, agent="", detail=None, input_summary="", output_summary=""), and recorded items are exposed via the .records property:

from fireflyframework_agentic.explainability import TraceRecorder

recorder = TraceRecorder()
recorder.record(
    "reasoning_step",
    agent="extractor",
    detail={"chosen": "regex_match", "alternatives": ["llm_extraction", "template_match"]},
    input_summary="raw OCR text",
    output_summary="invoice_number=INV-2026-001",
)

Each DecisionRecord has these fields: timestamp, category, agent, detail, input_summary, and output_summary.

Explanation Generator

Transforms raw decision records into a natural-language narrative:

from fireflyframework_agentic.explainability import ExplanationGenerator

generator = ExplanationGenerator()
explanation = generator.generate(recorder.records)
print(explanation)
# Multi-line "Decision Explanation" narrative walking each record chronologically.

Audit Trail

An append-only log. Each entry captures an actor, action, resource, and outcome:

from fireflyframework_agentic.explainability import AuditTrail

trail = AuditTrail()
trail.append("extractor", "field_extraction", resource="invoice_number", outcome="success")

# Inspect or export
print(len(trail))            # number of entries
print(trail.entries)         # list[AuditEntry]
print(trail.export_json())   # JSON string

Report Builder

Compile records into a structured ExplainabilityReport, then render it:

from fireflyframework_agentic.explainability import ReportBuilder

builder = ReportBuilder(title="Invoice Extraction Report")
report = builder.build(recorder.records)   # -> ExplainabilityReport

# Render via static helpers
markdown = ReportBuilder.to_markdown(report)   # for documentation/review
json_data = ReportBuilder.to_json(report)      # for programmatic consumption

IDP Tie-In: Audit Trail for Invoice Extraction

In our IDP pipeline, we record why each field was extracted the way it was:

from fireflyframework_agentic.explainability import (
    TraceRecorder, ExplanationGenerator, AuditTrail, ReportBuilder,
)

recorder = TraceRecorder()

# During extraction, record decisions
recorder.record(
    "field_extraction",
    agent="field_extractor",
    detail={"chosen": "INV-2026-001", "alternatives": ["INV-2026-01", "2026-001"]},
    output_summary="Matched the INV-NNNN pattern with highest confidence.",
)
recorder.record(
    "field_extraction",
    agent="field_extractor",
    detail={"chosen": "1234.56", "alternatives": ["1,234.56", "$1234.56"]},
    output_summary="Normalised currency format to numeric value.",
)

# Generate explanation
generator = ExplanationGenerator()
explanation = generator.generate(recorder.records)

# Audit trail — append (actor, action) entries
trail = AuditTrail()
trail.append("field_extractor", "extract_invoice_number", resource="INV-2026-001")
trail.append("field_extractor", "extract_amount", resource="1234.56")
print(trail.export_json())

# Build and render the report
report = ReportBuilder(title="Invoice Extraction Report").build(recorder.records)
print(ReportBuilder.to_markdown(report))

Part IV — Experimentation


Chapter 13: Experiments

You've built a working extraction agent — but is GPT-4o the right model? Would Claude give better results? What about lowering the temperature from 0.5 to 0.1? Answering these questions by hand (run each variant, eyeball the output, repeat) doesn't scale. The Experiments module gives you a structured way to define variants, run them against the same inputs, collect metrics (latency, token usage, output length), and compare results — all in a few lines of code.

Defining an Experiment

An Experiment holds variants plus a dataset of test inputs. Each Variant carries name, model, temperature, and a parameters dict:

from fireflyframework_agentic.experiments import Experiment, Variant

experiment = Experiment(
    name="extraction_model_comparison",
    hypothesis="Claude 3.5 Sonnet beats GPT-4o on invoice extraction.",
    variants=[
        Variant(name="gpt4o", model="openai:gpt-4o"),
        Variant(name="claude", model="anthropic:claude-3-5-sonnet"),
    ],
    dataset=[
        "Extract fields from: Invoice #INV-001, Acme Corp, $500, 2026-01-15",
        "Extract fields from: Invoice #INV-002, Globex, $1,200, 2026-02-28",
    ],
)

Running an Experiment

ExperimentRunner.run(experiment, agent_factory, *, context=None) runs every variant against the experiment's dataset. The second positional argument is an agent_factory callable (variant) -> agent that builds an agent configured for that variant. It returns a list[VariantResult]:

from fireflyframework_agentic.experiments import ExperimentRunner
from fireflyframework_agentic.agents import FireflyAgent

def make_agent(variant):
    return FireflyAgent(
        name=f"extractor-{variant.name}",
        model=variant.model,
        instructions="You are an invoice data extraction specialist.",
    )

runner = ExperimentRunner()
results = await runner.run(experiment, make_agent)

Tracking Results

ExperimentTracker stores VariantResults in memory with optional JSON persistence (pass storage_path=). ExperimentRunner already records each result into its tracker; you can also record manually and export:

from fireflyframework_agentic.experiments import ExperimentTracker

tracker = ExperimentTracker(storage_path="./experiment_results.json")
for result in results:
    tracker.record(result)

# Query and export
subset = tracker.get_by_experiment("extraction_model_comparison")
print(tracker.export_json())

Comparing Variants

from fireflyframework_agentic.experiments import VariantComparator

comparator = VariantComparator()
metrics = comparator.compare(results)
print(comparator.summary(results))
# "gpt4o: avg_latency=1.2s, avg_output_len=145 | claude: avg_latency=0.9s, ..."

IDP Tie-In: Finding the Best Extraction Model

experiment = Experiment(
    name="idp_extraction_ab_test",
    hypothesis="Lower temperature improves IDP invoice extraction accuracy.",
    variants=[
        Variant(name="gpt4o", model="openai:gpt-4o", temperature=0.1),
        Variant(name="gpt4o_warm", model="openai:gpt-4o", temperature=0.5),
        Variant(name="claude", model="anthropic:claude-3-5-sonnet"),
    ],
    dataset=test_invoices,
)

def make_agent(variant):
    return FireflyAgent(
        name=f"extractor-{variant.name}",
        model=variant.model,
        instructions="You are an invoice data extraction specialist.",
    )

results = await ExperimentRunner().run(experiment, make_agent)
print(VariantComparator().summary(results))

Chapter 14: Lab

The Experiments module helps you compare variants — but sometimes you just need a sandbox to poke at an agent interactively, run quick benchmarks, compare side-by-side outputs, or evaluate accuracy against a labelled dataset. The Lab module is that sandbox. It provides interactive sessions (REPL-like), benchmarking helpers, model comparison tables, evaluation datasets, and pluggable scorers — everything you need to iterate on agent quality before going to production.

Interactive Sessions

Test an agent conversationally:

from fireflyframework_agentic.lab import LabSession

session = LabSession(name="extraction-dev", agent=extractor_agent)
response = await session.interact("Extract fields from: Invoice #INV-001...")
print(response)

# Review history
for entry in session.history:
    print(f"[{entry.timestamp}] {entry.prompt}{entry.response}")

Benchmarking

Measure agent performance across a set of prompts:

from fireflyframework_agentic.lab import Benchmark

bench = Benchmark(inputs=[
    "Extract from: Invoice #INV-001, Acme Corp, $500",
    "Extract from: Invoice #INV-002, Globex, $1,200",
    "Extract from: Invoice #INV-003, Initech, $3,456.78",
])
result = await bench.run(extractor_agent)
print(f"Avg latency: {result.avg_latency_ms:.1f} ms")
print(f"P95 latency: {result.p95_latency_ms:.1f} ms")

Side-by-Side Comparison

Compare multiple agents on the same prompts:

from fireflyframework_agentic.lab import ModelComparison

comparison = ModelComparison(prompts=[
    "Extract from: Invoice #INV-001, Acme Corp, $500",
])
entries = await comparison.compare({
    "gpt4o": extractor_gpt4o,
    "claude": extractor_claude,
})
for entry in entries:
    for agent_name, response in entry.responses.items():
        print(f" {agent_name}: {response}")

Datasets

Manage test inputs and expected outputs:

from fireflyframework_agentic.lab import EvalDataset, EvalCase

dataset = EvalDataset(cases=[
    EvalCase(
        input="Extract from: Invoice #INV-001, Acme Corp, $500, 2026-01-15",
        expected_output='{"invoice_number": "INV-001", "vendor_name": "Acme Corp", "total_amount": 500.0}',
    ),
])

# Or load from a JSON file
dataset = EvalDataset.from_json("test_data/invoices.json")

Evaluators

Run an agent against a dataset with a pluggable scorer:

from fireflyframework_agentic.lab import EvalOrchestrator

# Custom scorer for extraction accuracy
def extraction_scorer(expected: str, actual: str) -> float:
    import json
    try:
        exp = json.loads(expected)
        act = json.loads(actual)
        matching = sum(1 for k in exp if exp.get(k) == act.get(k))
        return matching / len(exp) if exp else 0.0
    except (json.JSONDecodeError, AttributeError):
        return 0.0

orchestrator = EvalOrchestrator(scorer=extraction_scorer)
report = await orchestrator.evaluate(extractor_agent, dataset)
print(f"Accuracy: {report.avg_score:.1%} across {report.total_cases} cases")

IDP Tie-In: Benchmarking Extraction Accuracy

# Load a dataset of real invoices with expected outputs
dataset = EvalDataset.from_json("test_data/invoices.json")

# Benchmark with custom extraction scorer
orchestrator = EvalOrchestrator(scorer=extraction_scorer)
report = await orchestrator.evaluate(extractor_agent, dataset)
print(f"Extraction accuracy: {report.avg_score:.1%}")

Part V — Advanced


Chapter 15: Template Agents

By now you've written several agents from scratch — classifier, extractor, OCR. Each time you had to think about the system prompt, output type, and registration. But many agent patterns are universal: summarise text, classify into categories, extract structured data, hold a conversation, route to sub-agents. The framework ships five template agents as factory functions that encode best practices for each pattern. You provide the domain-specific bits (categories, schemas, personality), and the factory handles prompt engineering, output typing, and registry registration.

Summarizer

from fireflyframework_agentic.agents.templates import create_summarizer_agent

agent = create_summarizer_agent(
    max_length="short", # concise | short | medium | detailed
    style="technical", # professional | casual | technical | academic
    output_format="bullets", # paragraph | bullets | numbered
    model="openai:gpt-4o",
)
result = await agent.run("Long invoice description text here...")

Classifier

Returns a structured ClassificationResult with category, confidence, and reasoning:

from fireflyframework_agentic.agents.templates import create_classifier_agent

agent = create_classifier_agent(
    categories=["invoice", "receipt", "contract", "form"],
    descriptions={
        "invoice": "Bills requesting payment for goods or services",
        "receipt": "Proof of payment or purchase",
        "contract": "Legal agreements between parties",
        "form": "Fillable forms and applications",
    },
    model="openai:gpt-4o",
)
result = await agent.run("Invoice from Acme Corp, Amount Due: $1,234.56")
# result.output → ClassificationResult(category="invoice", confidence=0.95, ...)

Extractor

Extracts structured data into a user-provided Pydantic model:

from pydantic import BaseModel
from fireflyframework_agentic.agents.templates import create_extractor_agent

class Invoice(BaseModel):
    vendor: str
    amount: float
    date: str
    invoice_number: str | None = None

agent = create_extractor_agent(
    Invoice,
    field_descriptions={
        "vendor": "The company that issued the invoice",
        "amount": "Total monetary amount",
    },
    model="openai:gpt-4o",
)
result = await agent.run("Invoice from Acme Corp, $1,234.56, 2026-01-15")
# result.output → Invoice(vendor="Acme Corp", amount=1234.56, ...)

Conversational Agent

Memory-enabled multi-turn assistant:

from fireflyframework_agentic.agents.templates import create_conversational_agent
from fireflyframework_agentic.memory import MemoryManager

memory = MemoryManager(max_conversation_tokens=32_000)
agent = create_conversational_agent(
    personality="friendly and concise",
    domain="accounts payable",
    memory=memory,
    model="openai:gpt-4o",
)

cid = memory.new_conversation()
result = await agent.run("I need help with invoice INV-001.", conversation_id=cid)
result = await agent.run("What's the payment status?", conversation_id=cid)

Router Agent

Intent-based routing to child agents:

from fireflyframework_agentic.agents.templates import create_router_agent

agent = create_router_agent(
    agent_map={
        "invoice_processor": "Handles invoice extraction and validation",
        "receipt_processor": "Handles receipt scanning and categorisation",
        "support": "General questions about the IDP system",
    },
    fallback_agent="support",
    model="openai:gpt-4o",
)
result = await agent.run("Process this invoice from Acme Corp")
# result.output → RoutingDecision(target_agent="invoice_processor", confidence=0.92, ...)

Common Parameters

All template factories accept:

  • name — Agent name for the registry (sensible defaults provided).
  • model — LLM model string; falls back to framework default.
  • extra_instructions — Text appended to the system prompt.
  • tools — Additional tools to attach.
  • auto_register — Set to False to skip registry registration.

IDP Tie-In: Using Templates Instead of Hand-Rolled Agents

Replace our manual agents with templates for cleaner code:

from fireflyframework_agentic.agents.templates import (
    create_classifier_agent,
    create_extractor_agent,
)

# Phase 1: Use the built-in classifier template
classifier_agent = create_classifier_agent(
    categories=["invoice", "receipt", "contract", "form"],
    name="document_classifier",
    model="openai:gpt-4o",
)

# Phase 3: Use the built-in extractor template
extractor_agent = create_extractor_agent(
    InvoiceExtraction,
    field_descriptions={
        "invoice_number": "Format: INV-NNNN",
        "vendor_name": "Company that issued the invoice",
        "total_amount": "Total amount due",
        "due_date": "Payment due date in ISO format",
    },
    name="field_extractor",
    model="openai:gpt-4o",
)

Chapter 16: Multi-Agent Delegation

Not every document is an invoice. Your IDP system might receive receipts, contracts, and forms — each requiring a specialised agent with different prompts, tools, and validation rules. Instead of building one mega-agent that tries to do everything, you build specialised agents and let a delegation router decide which one handles each request. The router picks an agent based on a strategy (round-robin for load balancing, capability-based for expertise matching), delegates the work, and optionally forks memory so the sub-agent gets its own working-memory scope.

Delegation Architecture

graph LR
    REQ["Incoming Request"] --> ROUTER["DelegationRouter"]

    subgraph Strategy
        RR["RoundRobinStrategy\n(load balance)"]
        CAP["CapabilityStrategy\n(match by tag)"]
    end

    ROUTER --> RR
    ROUTER --> CAP

    subgraph Agent Pool
        A1["invoice_extractor"]
        A2["receipt_extractor"]
        A3["contract_extractor"]
    end

    RR --> A1 & A2 & A3
    CAP -->|tag match| A1

    subgraph Memory
        MEM["MemoryManager"]
        FORK["fork()"]
        CHILD["Child Scope"]
    end

    ROUTER -.->|auto fork| FORK
    FORK --> CHILD
    CHILD --> A1

Delegation Router

from fireflyframework_agentic.agents.delegation import DelegationRouter, RoundRobinStrategy

router = DelegationRouter(
    agents=[agent_a, agent_b, agent_c],
    strategy=RoundRobinStrategy(),
)
result = await router.route("Process this document.")

Round Robin Strategy

Distributes requests evenly across a pool of agents. Useful for load balancing when all agents have equivalent capabilities:

from fireflyframework_agentic.agents.delegation import RoundRobinStrategy

strategy = RoundRobinStrategy()
router = DelegationRouter([agent_1, agent_2, agent_3], strategy)
# Request 1 → agent_1, Request 2 → agent_2, Request 3 → agent_3, Request 4 → agent_1...

Capability-Based Strategy

Selects the first agent whose tags include a required capability:

from fireflyframework_agentic.agents.delegation import CapabilityStrategy

strategy = CapabilityStrategy(required_tag="invoice_extraction")
router = DelegationRouter([invoice_agent, receipt_agent], strategy)
result = await router.route("Extract invoice data.")
# → Routed to invoice_agent (which has the "invoice_extraction" tag)

Other Strategies

The framework ships 7 delegation strategies, all importable from fireflyframework_agentic.agents.delegation (and re-exported from fireflyframework_agentic.agents):

  • RoundRobinStrategy — even load balancing.
  • CapabilityStrategy — match by required tag.
  • ContentBasedStrategy — route by keywords in the request content.
  • CostAwareStrategy — prefer the lowest-cost capable agent.
  • ChainStrategy — run a fixed chain of agents in order.
  • FallbackStrategy — try agents in order until one succeeds.
  • WeightedStrategy — weighted random selection.

Memory with Delegation

When a MemoryManager is attached, delegated agents receive a forked memory scope:

from fireflyframework_agentic.memory import MemoryManager

memory = MemoryManager(working_scope_id="main")
router = DelegationRouter([agent_a, agent_b], RoundRobinStrategy(), memory=memory)
result = await router.route("Process this.")
# The delegated agent gets its own working memory scope

IDP Tie-In: Routing Documents to Specialised Agents

from fireflyframework_agentic.agents.delegation import DelegationRouter, CapabilityStrategy

# Specialised agents for different document types
invoice_agent = create_extractor_agent(InvoiceSchema, name="invoice_extractor")
receipt_agent = create_extractor_agent(ReceiptSchema, name="receipt_extractor")
contract_agent = create_extractor_agent(ContractSchema, name="contract_extractor")

# Route based on capability
strategy = CapabilityStrategy(required_tag="invoice")
router = DelegationRouter(
    agents=[invoice_agent, receipt_agent, contract_agent],
    strategy=strategy,
)

Chapter 17: Plugin System

As your application grows, you'll want to share agents, tools, and reasoning patterns across projects — or let third-party teams contribute their own. The Plugin module uses Python's standard entry-point mechanism so that any installed package can register components that are discovered and loaded automatically at startup. No manual imports, no central configuration file — just install the package and go.

Discovering Plugins

from fireflyframework_agentic.plugin import PluginDiscovery

result = PluginDiscovery.discover_all()
print(f"Loaded {len(result.successful)} plugins, {len(result.failed)} failed")

Creating a Plugin

In your package's pyproject.toml, declare entry points under the framework's groups:

[project.entry-points."fireflyframework_agentic.agents"]
my_agent = "my_package.agents:MySpecialAgent"

[project.entry-points."fireflyframework_agentic.tools"]
my_tool = "my_package.tools:MyCustomTool"

[project.entry-points."fireflyframework_agentic.reasoning_patterns"]
my_pattern = "my_package.reasoning:MyCustomPattern"

The three entry-point groups are:

  • fireflyframework_agentic.agents — Custom agents.
  • fireflyframework_agentic.tools — Custom tools.
  • fireflyframework_agentic.reasoning_patterns — Custom reasoning patterns.

On discovery, the framework loads each entry point and registers it in the appropriate registry.

Configuration

# Enable auto-discovery on startup (default: True)
# FIREFLY_AGENTIC_PLUGIN_AUTO_DISCOVER=true

IDP Tie-In: Packaging IDP as a Plugin

You can package the entire IDP pipeline as a plugin that self-registers when installed:

# In idp_plugin/pyproject.toml
[project.entry-points."fireflyframework_agentic.agents"]
document_classifier = "idp_plugin.agents:classifier_agent"
field_extractor = "idp_plugin.agents:extractor_agent"
ocr_agent = "idp_plugin.agents:ocr_agent"

After uv add idp-plugin, calling PluginDiscovery.discover_all() registers all three agents automatically.


Chapter 18: Putting It All Together

You've learned every module in fireflyframework-agentic, each in isolation. Now it's time to see how they all fit together in a single, production-grade application. The diagram below shows the full system architecture — every layer, every connection:

Full System Architecture

graph TB
    subgraph "Caller"
        APP["Host application\n(in-process)"]
    end

    subgraph "Orchestration Layer"
        PIPE["Pipeline Engine\n(DAG scheduler)"]
        DELEG["Delegation Router"]
    end

    subgraph "Intelligence Layer"
        REASON["Reasoning Patterns\n(ReAct, CoT, P&E, ...)"]
        VALID["Validation & QoS"]
        REVIEW["OutputReviewer"]
    end

    subgraph "Agent Layer"
        FA["FireflyAgent"]
        REG["AgentRegistry"]
        TPL["Template Agents"]
    end

    subgraph "Agent Support"
        TOOLS["Tools + ToolKit\n(guards, builtins, registry)"]
        PROMPTS["Prompts\n(Jinja2, versioned, composed)"]
        MEM["Memory\n(conversation + working)"]
        CONTENT["Content Processing\n(chunk, compress, batch)"]
    end

    subgraph "Ops Layer"
        OBS["Observability\n(traces, metrics, events)"]
        EXPL["Explainability\n(audit trail, reports)"]
        EXP["Experiments\n(A/B testing)"]
        LAB["Lab\n(benchmarks, eval)"]
    end

    subgraph "Foundation"
        PAI["Pydantic AI\n(model calls, streaming)"]
        CFG["FireflyAgenticConfig\n(env-driven settings)"]
        PLUG["Plugin System\n(entry-point discovery)"]
    end

    APP --> PIPE & DELEG
    PIPE --> FA
    DELEG --> FA
    FA --> REASON
    REASON --> FA
    FA --> VALID
    VALID --> REVIEW
    REVIEW --> FA
    FA --> PAI
    FA --> TOOLS
    FA --> PROMPTS
    FA --> MEM
    FA --> CONTENT
    FA --> REG
    TPL --> FA
    OBS -.-> FA & PIPE & REASON
    EXPL -.-> FA
    EXP -.-> FA
    LAB -.-> FA
    PLUG -.-> REG & TOOLS
    CFG -.-> FA & PIPE & MEM & OBS

Let's assemble the complete IDP pipeline using everything we've learned. This is the full, production-ready implementation.

Project Structure

idp-service/
├── pyproject.toml
├── .env
├── prompts/
│ ├── classification.jinja2
│ └── extraction.jinja2
├── src/
│ └── idp_service/
│ ├── __init__.py
│ ├── agents.py # Agent definitions
│ ├── tools.py # Tool definitions
│ ├── pipeline.py # Pipeline wiring
│ ├── validation.py # Validation rules
│ └── main.py # In-process entry point
└── tests/
    └── test_pipeline.py

Configuration (.env)

FIREFLY_AGENTIC_DEFAULT_MODEL=openai:gpt-4o
FIREFLY_AGENTIC_DEFAULT_TEMPERATURE=0.1
FIREFLY_AGENTIC_MAX_RETRIES=3
FIREFLY_AGENTIC_OBSERVABILITY_ENABLED=true
FIREFLY_AGENTIC_MEMORY_BACKEND=file
FIREFLY_AGENTIC_MEMORY_FILE_DIR=.firefly_memory
FIREFLY_AGENTIC_DEFAULT_CHUNK_SIZE=4000
FIREFLY_AGENTIC_VALIDATION_ENABLED=true

Agents (agents.py)

from pydantic import BaseModel, Field
from fireflyframework_agentic.agents.templates import (
    create_classifier_agent,
    create_extractor_agent,
    create_summarizer_agent,
)
from fireflyframework_agentic.agents import FireflyAgent

# Output schema for extraction
class InvoiceData(BaseModel):
    invoice_number: str
    vendor_name: str
    total_amount: float = Field(ge=0)
    due_date: str
    line_items: list[dict] = []

# Phase 1: Document classifier
classifier_agent = create_classifier_agent(
    categories=["invoice", "receipt", "contract", "form", "other"],
    name="document_classifier",
    model="openai:gpt-4o",
)

# Phase 2: OCR/digitisation agent
ocr_agent = FireflyAgent(
    name="ocr_agent",
    model="openai:gpt-4o",
    instructions="Extract all text from this document. Preserve layout and structure.",
)

# Phase 2.5: Summariser for compression
summary_agent = create_summarizer_agent(
    name="doc_summariser",
    max_length="medium",
    style="technical",
    model="openai:gpt-4o",
)

# Phase 3: Field extractor — with tools from tools.py attached
from .tools import extraction_kit

extractor_agent = create_extractor_agent(
    InvoiceData,
    field_descriptions={
        "invoice_number": "Format: INV-NNNN or similar",
        "vendor_name": "Company that issued the invoice",
        "total_amount": "Total monetary amount due",
        "due_date": "Payment due date (ISO 8601)",
        "line_items": "List of line items with description, quantity, unit_price",
    },
    name="field_extractor",
    model="openai:gpt-4o",
    tools=extraction_kit.as_pydantic_tools(), # Bridge Firefly tools → Pydantic AI
)

Tools (tools.py)

Tools live in their own module. Each @firefly_tool call creates a BaseTool, registers it in the global ToolRegistry, and returns the instance. The ToolKit bundles them for agent injection via as_pydantic_tools():

from fireflyframework_agentic.tools import firefly_tool, guarded, retryable, ToolKit
from fireflyframework_agentic.tools.guards import RateLimitGuard, ValidationGuard
from fireflyframework_agentic.tools.builtins import CalculatorTool

# OCR tool — rate-limited because the upstream API is metered
@retryable(max_retries=2, backoff=1.0)
@guarded(RateLimitGuard(max_calls=100, period_seconds=60))
@firefly_tool(name="ocr_extract", description="Extract text from a document image via OCR")
async def ocr_extract(image_data: str) -> str:
    """In production, call an OCR service like AWS Textract or Google Vision."""
    return "Invoice #INV-2026-001\nVendor: Acme Corp\nAmount: $1,234.56\nDate: 2026-01-15"

# Vendor lookup — validates that vendor_name is present
@guarded(ValidationGuard(required_keys=["vendor_name"]))
@firefly_tool(name="vendor_lookup", description="Look up vendor details from the ERP system")
async def vendor_lookup(vendor_name: str) -> str:
    vendors = {
        "Acme Corp": '{"id": "V-001", "tax_id": "US-12345", "payment_terms": "NET30"}',
        "Globex": '{"id": "V-002", "tax_id": "US-67890", "payment_terms": "NET60"}',
    }
    return vendors.get(vendor_name, '{"error": "Vendor not found"}')

# Calculator — a built-in tool, no decorator needed
calculator = CalculatorTool()

# Bundle the tools the extractor agent needs.
# as_pydantic_tools() bridges them into the Pydantic AI tool format.
extraction_kit = ToolKit(
    "extraction-tools",
    [ocr_extract, vendor_lookup, calculator],
    description="Tools available during invoice field extraction",
)

Validation (validation.py)

from fireflyframework_agentic.validation.rules import (
    OutputValidator, RegexRule, FormatRule, RangeRule,
)
from fireflyframework_agentic.validation.qos import (
    QoSGuard, ConfidenceScorer, ConsistencyChecker, GroundingChecker,
)
from fireflyframework_agentic.validation import OutputReviewer
from .agents import InvoiceData, extractor_agent

invoice_validator = OutputValidator({
    "invoice_number": [RegexRule("invoice_number", r"^INV-\d{4,10}$")],
    "vendor_name": [RegexRule("vendor_name", r".{2,}")],
    "total_amount": [RangeRule("total_amount", min_value=0.01, max_value=10_000_000)],
    "due_date": [FormatRule("due_date", "iso_date")],
})

reviewer = OutputReviewer(
    output_type=InvoiceData,
    validator=invoice_validator,
    max_retries=3,
)

qos_guard = QoSGuard(
    confidence_scorer=ConfidenceScorer(extractor_agent),
    consistency_checker=ConsistencyChecker(extractor_agent, num_runs=2),
    grounding_checker=GroundingChecker(),
    min_confidence=0.8,
    min_consistency=0.6,
    min_grounding=0.8,
)

Pipeline (pipeline.py)

from fireflyframework_agentic.pipeline.builder import PipelineBuilder
from fireflyframework_agentic.pipeline.steps import AgentStep, ReasoningStep, CallableStep
from fireflyframework_agentic.pipeline.context import PipelineContext
from fireflyframework_agentic.reasoning import PlanAndExecutePattern, ReflexionPattern
from fireflyframework_agentic.content.chunking import TextChunker, BatchProcessor
from fireflyframework_agentic.content.compression import ContextCompressor, MapReduceStrategy
from fireflyframework_agentic.memory import MemoryManager
from fireflyframework_agentic.observability import traced
from .agents import classifier_agent, ocr_agent, summary_agent, extractor_agent
from .validation import invoice_validator, reviewer

async def validate_step(context, inputs):
    extracted = inputs.get("input", {})
    report = invoice_validator.validate(extracted)
    context.metadata["validation_report"] = report
    if not report.valid:
        # Self-correct with Reflexion
        reflexion = ReflexionPattern(max_steps=2)
        corrected = await reflexion.execute(
            extractor_agent,
            f"Fix these errors: {report.errors}. Original data: {extracted}",
        )
        return {"extracted": corrected.output, "valid": True, "errors": []}
    return {"extracted": extracted, "valid": True, "errors": []}

async def assemble_step(context, inputs):
    data = inputs.get("input", {})
    return {
        "document_type": context.metadata.get("doc_type", "unknown"),
        "language": context.metadata.get("language", "unknown"),
        "extracted_fields": data.get("extracted", {}),
        "validation_passed": data.get("valid", False),
    }

idp_pipeline = (
    PipelineBuilder("idp-pipeline")
    .add_node("classify", AgentStep(classifier_agent))
    .add_node("digitise", AgentStep(ocr_agent), retry_max=2, timeout_seconds=60)
    .add_node(
        "extract",
        ReasoningStep(PlanAndExecutePattern(max_steps=15, allow_replan=True), extractor_agent),
    )
    .add_node("validate", CallableStep(validate_step))
    .add_node("assemble", CallableStep(assemble_step))
    .chain("classify", "digitise", "extract", "validate", "assemble")
    .build()
)

@traced(name="idp.process")
async def process_document(document_bytes: bytes, metadata: dict | None = None) -> dict:
    memory = MemoryManager(working_scope_id="idp-session")
    ctx = PipelineContext(
        inputs=document_bytes,
        metadata=metadata or {},
        memory=memory,
    )
    result = await idp_pipeline.run(context=ctx)
    return result.final_output if result.success else {"error": result.failed_nodes}

Entry Point (main.py)

fireflyframework-agentic is a pure in-process library: it serves no port and consumes no broker. Your host service owns serving and calls process_document directly. The host also owns OTel SDK and exporter configuration; the framework emits spans and metrics through the OpenTelemetry API, so they flow through whatever providers the host has set up:

import asyncio

from .pipeline import process_document


async def main(document_bytes: bytes, filename: str) -> dict:
    return await process_document(
        document_bytes,
        metadata={"filename": filename, "source": "host-service"},
    )


if __name__ == "__main__":
    with open("invoice.pdf", "rb") as fh:
        print(asyncio.run(main(fh.read(), "invoice.pdf")))

To expose this over HTTP or wire it to a message broker, embed process_document in your host service's framework of choice — the agent library stays in-process.

Production Checklist

Before deploying to production, verify:

  • [ ] Configuration — All FIREFLY_AGENTIC_* environment variables are set.
  • [ ] Model access — API keys for your LLM provider are configured.
  • [ ] Observabilityobservability_enabled is on, and your host service has configured the OTel SDK/exporters so framework spans and metrics flow to your backend.
  • [ ] Memory persistencememory_backend is set to "file" (or a custom backend) for durability.
  • [ ] ValidationOutputValidator rules match your business requirements.
  • [ ] QoS thresholdsmin_confidence and num_runs (ConsistencyChecker) are tuned for your use case.
  • [ ] Retry limits — Pipeline nodes have appropriate retry_max and timeout_seconds.
  • [ ] Experiments — You've A/B tested your prompt and model variants.
  • [ ] Audit trail — Explainability is enabled for regulated workloads.

Next Steps

Congratulations — you now know every module in fireflyframework-agentic. Here are some paths to explore further:

  • Dive deeper — Each chapter links to a detailed module guide in docs/.
  • Read the source — The framework is fully typed and well-documented in code.
  • Run the testsuv run pytest runs 1,300+ tests across ~128 files that exercise every module.
  • Build your own — Extend AbstractReasoningPattern, implement MemoryStore, create custom tools, or write a plugin.

Module Reference