The Firefly Agentic Framework — Complete Tutorial¶
Copyright 2026 Firefly Software Foundation. Licensed under the Apache License 2.0.
From Zero to Expert — This tutorial walks you through every module in fireflyframework-agentic by building a real project from scratch: an Intelligent Document Processing (IDP) pipeline that classifies, digitises, extracts, validates, and assembles data from invoices.
Each chapter introduces a concept, explains why it exists, shows how it works with architecture diagrams, and immediately applies it to the IDP pipeline. By Chapter 18 you will have a production-grade GenAI application that uses agents, tools, prompts, reasoning patterns, content processing, memory, validation, pipelines, observability, explainability, experiments, multi-agent delegation, template agents, and a plugin system — all wired together.
Table of Contents¶
Part I — Foundation
1. Introduction — What, who, why, design principles, running example
2. Installation & Project Setup — UV, extras, FireflyAgenticConfig, model providers
3. Your First Agent — FireflyAgent, @firefly_agent, registry, context, lifecycle (diagram)
4. Tools — @firefly_tool, ToolBuilder, guards, composition, built-ins, ToolKit, attaching tools to agents (diagram)
5. Prompts — Jinja2 templates, versioning, composition, validation, file loading
Part II — Intelligence
- Reasoning Patterns — ReAct, CoT, Plan-and-Execute, Reflexion, Tree of Thoughts, Goal Decomposition, custom patterns (diagram)
- Content Processing — Chunking, batch processing, compression, sliding window
- Memory — Conversation memory, working memory, backends,
MemoryManager, forking, integration points (diagram) - Validation & Quality of Service — Rules,
OutputValidator, QoS checks,OutputReviewer
Part III — Orchestration & Operations
- Pipeline — DAG engine, step executors, fan-out/fan-in, retries,
PipelineContext(diagram) - Observability — Tracing, metrics, events, OpenTelemetry exporters
- Explainability — Decisions, explanations, audit trail, reports
Part IV — Experimentation
- Experiments — A/B testing, variant comparison, tracking
- Lab — Interactive sessions, benchmarks, model comparison, eval datasets
Part V — Advanced
- Template Agents — Summariser, classifier, extractor, conversational, router
- Multi-Agent Delegation — Delegation router, strategies, memory forking (diagram)
- Plugin System — Entry-point discovery, packaging agents/tools/patterns
- Putting It All Together — Full IDP implementation, project structure, production checklist (full system diagram)
Part I — Foundation¶
Chapter 1: Introduction¶
What Is fireflyframework-agentic?¶
fireflyframework-agentic is a GenAI metaframework — it sits on top of Pydantic AI and provides the structure, patterns, and production-grade plumbing that Pydantic AI deliberately leaves to the application developer.
Think of Pydantic AI as the engine and fireflyframework-agentic as the car around it: the steering, brakes, GPS, dashboard, and everything else you need to actually drive to your destination.
Who Is It For?¶
- Backend engineers building GenAI features into existing applications.
- ML/AI engineers who want structured reasoning, validation, and observability out of the box.
The Four Design Principles¶
The framework is guided by four principles that show up in every module:
- Protocol-driven contracts — Public APIs are Python
Protocolclasses or abstract base classes. You can swap or extend any component without touching framework internals. - Convention over configuration — Sensible defaults everywhere. One
FireflyAgenticConfigobject (backed by Pydantic Settings) centralises every knob and reads from environment variables automatically. - Layered composition — Modules are organised into layers (Core, Agent, Intelligence, Experimentation, Orchestration). Higher layers depend on lower layers, never the reverse.
- Optional dependencies — Heavy libraries (embedding providers, vector store clients, storage backends) are declared as extras. The core framework imports them lazily so you only install what you use.
The Running Example: Intelligent Document Processing¶
Throughout this tutorial we build an IDP pipeline that processes invoices:
Every chapter teaches a framework concept and immediately applies it to a phase of this pipeline. By Chapter 18 you will have the complete, production-ready system.
Chapter 2: Installation & Project Setup¶
Prerequisites¶
- Python 3.13 or later.
- UV (recommended for dependency management): https://docs.astral.sh/uv/
Creating a New Project¶
This installs the core framework with its minimal dependencies: pydantic-ai,
pydantic, pydantic-settings, jinja2, and opentelemetry-api/sdk.
Installing Extras¶
The framework provides optional extras for additional capabilities:
# Embedding providers (e.g. OpenAI / Azure)
uv add "fireflyframework-agentic[openai-embeddings]"
# Vector store backends
uv add "fireflyframework-agentic[vectorstores-chroma]"
# Memory persistence backends
uv add "fireflyframework-agentic[postgres]"
# Everything
uv add "fireflyframework-agentic[all]"
For our IDP project we will eventually use several of these, so install everything:
Configuration with FireflyAgenticConfig¶
All framework settings live in a single Pydantic Settings class that reads from
environment variables prefixed with FIREFLY_AGENTIC_:
from fireflyframework_agentic import FireflyAgenticConfig, get_config
# get_config() returns a thread-safe singleton
config = get_config()
print(config.default_model) # "openai:gpt-4o"
print(config.default_temperature) # None (no temperature forced; provider default)
print(config.max_retries) # 3
Override any setting via environment variables or a .env file:
# .env
# --- Provider API keys (read by Pydantic AI automatically) ---
OPENAI_API_KEY=sk-...
# ANTHROPIC_API_KEY=sk-ant-...
# GEMINI_API_KEY=...
# GROQ_API_KEY=gsk_...
# DEEPSEEK_API_KEY=...
# --- Framework settings ---
FIREFLY_AGENTIC_DEFAULT_MODEL=openai:gpt-4o
FIREFLY_AGENTIC_DEFAULT_TEMPERATURE=0.3
FIREFLY_AGENTIC_LOG_LEVEL=DEBUG
FIREFLY_AGENTIC_OBSERVABILITY_ENABLED=true
Here are the most commonly used configuration fields:
default_model— LLM model string (e.g."openai:gpt-4o","anthropic:claude-3-5-sonnet").default_temperature— Default sampling temperature.None(default) forces no temperature, so each provider uses its own default (some models, e.g. OpenAIo1/o3, reject an explicit temperature). When set, it's merged into an agent's settings only if the caller omits one.max_retries— Default retry count for agent runs.observability_enabled— Toggle OpenTelemetry instrumentation.prompt_templates_dir— Directory for Jinja2 prompt files.default_chunk_size/default_chunk_overlap— Content chunking defaults.max_context_tokens— Maximum context window (default 128,000).validation_enabled— Enable/disable output validation.cost_tracking_enabled— Enable/disable usage and cost tracking.budget_limit_usd— Hard budget limit in USD (a warning is logged when exceeded).cost_strict— WhenTrue, cost resolution raisesUnknownModelCostErrorinstead of returningNonefor models with no known pricing (defaultFalse).memory_backend—"in_memory","file","postgres", or"mongodb".memory_max_conversation_tokens— Token budget per conversation.encryption_enabled/encryption_key— Enable AES-256-GCM encryption of memory at rest.
The singleton is created once and cached for the process lifetime. Call reset_config()
in tests to force re-creation.
Model Providers & Authentication¶
Before you can run an agent against a real LLM, you need credentials for your model provider. fireflyframework-agentic delegates model communication entirely to Pydantic AI, which supports multiple providers out of the box.
Approach 1: Environment Variables (Recommended)¶
The simplest method — set the appropriate API key as an environment variable and use the
"provider:model_name" string format:
| Provider | Env Variable | Model String Example |
|---|---|---|
| OpenAI | OPENAI_API_KEY |
"openai:gpt-4o" |
| Anthropic | ANTHROPIC_API_KEY |
"anthropic:claude-sonnet-4-20250514" |
| Google Gemini | GEMINI_API_KEY |
"google:gemini-2.0-flash" |
| Groq | GROQ_API_KEY |
"groq:llama-3.3-70b" |
| DeepSeek | DEEPSEEK_API_KEY |
"deepseek:deepseek-chat" |
| Mistral | MISTRAL_API_KEY |
"mistral:mistral-large-latest" |
| AWS Bedrock | AWS_* credentials |
"bedrock:anthropic.claude-3-5-sonnet-latest" |
| Ollama (local) | (none required) | "ollama:llama3.2" |
Pydantic AI reads these variables automatically — you do not need to pass them to the framework. Just set the key and use the model string:
from fireflyframework_agentic.agents import FireflyAgent
# Uses OPENAI_API_KEY from the environment
agent = FireflyAgent(name="my-agent", model="openai:gpt-4o")
To switch providers, change the model string and API key — no code changes required:
# .env — switch to Anthropic
ANTHROPIC_API_KEY=sk-ant-...
FIREFLY_AGENTIC_DEFAULT_MODEL=anthropic:claude-sonnet-4-20250514
Approach 2: Programmatic Model Objects¶
For scenarios that require explicit credential management — Azure OpenAI, AWS Bedrock,
custom endpoints, or when you don't want to use environment variables — construct a
Pydantic AI Model object and pass it directly to FireflyAgent:
from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.openai import OpenAIProvider
from fireflyframework_agentic.agents import FireflyAgent
# Explicit API key (e.g. loaded from a vault)
model = OpenAIChatModel(
"gpt-4o",
provider=OpenAIProvider(api_key="sk-...")
)
agent = FireflyAgent(name="my-agent", model=model)
Azure OpenAI:
from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.azure import AzureProvider
model = OpenAIChatModel(
"my-gpt4o-deployment",
provider=AzureProvider(
azure_endpoint="https://my-resource.openai.azure.com",
api_version="2025-03-01-preview",
api_key="...", # or use DefaultAzureCredential
),
)
agent = FireflyAgent(name="azure-agent", model=model)
Anthropic with explicit key:
from pydantic_ai.models.anthropic import AnthropicModel
from pydantic_ai.providers.anthropic import AnthropicProvider
model = AnthropicModel(
"claude-sonnet-4-20250514",
provider=AnthropicProvider(api_key="sk-ant-...")
)
agent = FireflyAgent(name="claude-agent", model=model)
AWS Bedrock:
from pydantic_ai.models.bedrock import BedrockConverseModel
model = BedrockConverseModel(
"anthropic.claude-3-5-sonnet-20241022-v2:0",
region_name="us-east-1",
)
agent = FireflyAgent(name="bedrock-agent", model=model)
The framework's observability layer automatically detects Bedrock-hosted
models and resolves them to the correct model family for cost tracking
(Anthropic pricing), prompt caching (Anthropic cache configuration), and
rate-limit retry (Bedrock ThrottlingException detection).
OpenAI-compatible endpoints (e.g. Ollama, vLLM, LiteLLM):
from pydantic_ai.models.openai import OpenAIChatModel
from pydantic_ai.providers.openai import OpenAIProvider
model = OpenAIChatModel(
"llama3.2",
provider=OpenAIProvider(
base_url="http://localhost:11434/v1",
api_key="ollama", # Ollama doesn't require a real key
),
)
agent = FireflyAgent(name="local-agent", model=model)
Which Approach Should I Use?¶
- Environment variables for standard cloud providers (OpenAI, Anthropic, Google, Groq, DeepSeek). This is the simplest path and works well in most deployments.
- Programmatic
Modelobjects for Azure OpenAI, AWS Bedrock, self-hosted models, OpenAI-compatible servers, or when API keys are loaded from a secrets manager at runtime.
Both approaches work identically with every framework feature — tools, reasoning
patterns, pipelines, cost tracking, prompt caching,
and all other modules. The framework's model_utils module normalizes model
identity from both strings and Model objects, so observability and resilience
features work uniformly across all providers.
IDP Tie-In¶
For our IDP project, we will use environment variables. Make sure your .env contains the
API key for whichever provider you choose:
# .env
OPENAI_API_KEY=sk-...
FIREFLY_AGENTIC_DEFAULT_MODEL=openai:gpt-4o
FIREFLY_AGENTIC_DEFAULT_TEMPERATURE=0.3
FIREFLY_AGENTIC_OBSERVABILITY_ENABLED=true
Chapter 3: Your First Agent¶
Every GenAI application starts with a single question: "How do I talk to the model?"
In raw Pydantic AI you create an Agent, give it a system prompt, and call run().
That works great for scripts — but the moment you need to register agents by name,
share them across pipelines, delegation, and reasoning patterns, attach lifecycle hooks,
or plug them into a larger system, you need a thin coordination layer on top.
That is exactly what FireflyAgent is. It wraps a Pydantic AI Agent and adds three
things the framework relies on: a global registry (so any module can look up an
agent by name), lifecycle management (init → warmup → shutdown), and metadata
(tags, descriptions, correlation IDs). The underlying Pydantic AI agent does all the
heavy lifting — model calls, tool dispatch, streaming — while FireflyAgent provides
the scaffolding that makes it a team player in a larger system.
Agent System Architecture¶
The following diagram shows how FireflyAgent sits on top of Pydantic AI and connects
to the rest of the framework:
graph TB
subgraph Application Code
DEC["@firefly_agent decorator"]
CLS["FireflyAgent class"]
end
subgraph Agent Layer
FA["FireflyAgent"]
PAI["pydantic_ai.Agent\n(model calls, tool dispatch, streaming)"]
REG["AgentRegistry\n(singleton name → agent map)"]
CTX["AgentContext\n(correlation_id, metadata, trace)"]
LC["AgentLifecycle\n(init → warmup → shutdown)"]
end
subgraph Consumers
PIPE["Pipelines"]
DELEG["Delegation Router"]
REASON["Reasoning Patterns"]
end
DEC -->|creates + registers| FA
CLS -->|creates| FA
FA -->|wraps| PAI
FA -->|registers in| REG
FA -->|carries| CTX
FA -->|hooks| LC
REG -->|lookup by name| PIPE
REG -->|lookup by name| DELEG
REG -->|lookup by name| REASON
Creating an Agent with the Decorator¶
The fastest way to create and register an agent is with the @firefly_agent decorator.
You write a function that returns the system prompt, and the framework takes care of
creating the agent, wiring the prompt, and registering it — all in one step:
from fireflyframework_agentic.agents import firefly_agent
# The decorator creates a FireflyAgent, uses this function as the dynamic
# instructions provider, and registers the agent in the global AgentRegistry.
@firefly_agent(name="greeter", model="openai:gpt-4o")
def greeter_instructions(ctx):
# This function is called at the start of every run.
# You can inspect `ctx` to customise the prompt per-request.
return "You are a friendly greeter. Say hello and ask how you can help."
What happens behind the scenes:
- A
FireflyAgentnamed"greeter"is created with model"openai:gpt-4o". - The decorated function becomes the agent's dynamic instructions provider — it is called at the start of every run and can use the context to customise the system prompt.
- The agent is automatically registered in the global
AgentRegistry, so any module (pipelines, delegation routers) can look it up by name.
Creating an Agent with the Class¶
When you need full control — custom output types, explicit tool lists, or you prefer
not to use decorators — instantiate FireflyAgent directly and register it yourself:
from fireflyframework_agentic.agents import FireflyAgent
from fireflyframework_agentic.agents.registry import agent_registry
# Create the agent with a static instructions string and a typed output.
# The `output_type` tells Pydantic AI to validate the LLM's response as a dict.
classifier = FireflyAgent(
name="document_classifier",
model="openai:gpt-4o",
instructions=(
"You are a document classification expert. "
"Given a document, determine its type (invoice, receipt, contract, form), "
"language, and page count. Return JSON with keys: doc_type, language, page_count."
),
output_type=dict,
)
# Register it so other parts of the framework (pipelines, delegation) can find it.
agent_registry.register(classifier)
Running an Agent¶
Every agent supports three execution modes — pick the one that fits your context:
# Async — the standard choice for production services.
result = await classifier.run("Classify this document: Invoice from Acme Corp...")
print(result.output) # {"doc_type": "invoice", "language": "en", "page_count": 1}
# Synchronous — handy for scripts, notebooks, and quick experiments.
result = classifier.run_sync("Classify this document: ...")
print(result.output)
# Streaming — for real-time UI feedback where you want tokens as they arrive.
# streaming_mode is "buffered" (default, chunked) or "incremental" (token-by-token).
async with classifier.run_stream(
"Classify this document: ...", streaming_mode="incremental"
) as stream:
async for chunk in stream:
print(chunk, end="", flush=True)
The Agent Registry¶
The AgentRegistry is a process-wide singleton that maps agent names to FireflyAgent
instances. This is the glue that lets any module — delegation routers, pipelines,
reasoning patterns — discover and invoke agents without
importing them directly:
from fireflyframework_agentic.agents.registry import agent_registry
# Retrieve an agent by name — returns the FireflyAgent or raises KeyError.
agent = agent_registry.get("document_classifier")
# Check existence before retrieval.
if agent_registry.has("document_classifier"):
print("Agent is registered")
# List all registered agents (useful for admin dashboards / health checks).
for info in agent_registry.list_agents():
print(f"{info.name} ({info.model})")
Agent Context¶
When a request arrives, you often need to carry metadata along the entire execution
path — who sent the request, which experiment is active, what trace ID to log.
AgentContext is that bag of request-scoped data:
from fireflyframework_agentic.agents.context import AgentContext
# Create a context with a correlation ID and arbitrary metadata.
# This context will be available in instructions providers and tool functions.
ctx = AgentContext(
correlation_id="req-abc-123",
metadata={"tenant": "acme-corp", "source": "email-inbox"},
)
The context is available inside dynamic instructions providers and tool functions, allowing you to tailor behaviour per-request without global state.
Lifecycle Management¶
Real-world agents often depend on external resources — database connections, model
caches, file handles. AgentLifecycle gives you three hooks to manage them cleanly:
init (one-time setup), warmup (pre-heat caches), and shutdown (release
resources):
from fireflyframework_agentic.agents.lifecycle import AgentLifecycle
lifecycle = AgentLifecycle()
# Each hook accepts a sync or async callable.
lifecycle.on_init(lambda: print("Loading resources..."))
lifecycle.on_warmup(lambda: print("Warming up model cache..."))
lifecycle.on_shutdown(lambda: print("Releasing resources..."))
# In your application startup/shutdown sequence:
await lifecycle.run_init()
await lifecycle.run_warmup()
# ... application serves requests ...
await lifecycle.run_shutdown()
The Middleware Stack¶
Every FireflyAgent runs each call (run, run_sync, run_stream,
run_with_reasoning) through a MiddlewareChain — a before/after pipeline whose hooks
receive a MiddlewareContext (agent name, prompt, method, model, deps). Two middlewares
are auto-wired: LoggingMiddleware is always added, and ObservabilityMiddleware is
added when config.observability_enabled is true. You can attach more from
fireflyframework_agentic.agents:
PromptGuardMiddleware/OutputGuardMiddleware— block prompt-injection / leaky output.CostGuardMiddleware— raisesBudgetExceededErrorwhen a budget is exceeded.CacheMiddleware(ResultCache) andPromptCacheMiddleware(CacheStatistics).ValidationMiddleware— validate structured output.RetryMiddleware— retry on failure (not auto-wired; rate-limit retry is built intorun()itself).ExplainabilityMiddleware— record decisions to aTraceRecorder.
from fireflyframework_agentic.agents import FireflyAgent, PromptGuardMiddleware
agent = FireflyAgent(name="assistant", model="openai:gpt-4o")
agent.middleware.add(PromptGuardMiddleware())
For provider resilience, FallbackModelWrapper / run_with_fallback let an agent fall
back to a secondary model, and CircuitBreakerMiddleware (Chapter 11) trips on repeated
failures.
IDP Tie-In: The Document Classifier Agent¶
Time to build the first piece of our IDP pipeline. The document classifier receives a raw document (text or scanned image) and outputs structured metadata — document type, language, page count, and orientation. Every subsequent stage depends on this output.
from fireflyframework_agentic.agents import FireflyAgent
# The classifier is our pipeline's entry point — it decides how the document
# will be routed through digitisation, extraction, and validation.
classifier_agent = FireflyAgent(
name="document_classifier",
model="openai:gpt-4o",
instructions=(
"You are a document classification expert. Given a document (text or image), "
"determine its type, language, and page count. "
"Return JSON: {doc_type, language, page_count, orientation}."
),
output_type=dict, # Pydantic AI will validate the LLM output as a dict
)
For multimodal input (e.g. scanned images), the framework provides typed content wrappers that ensure the LLM receives the content in the right format:
from fireflyframework_agentic.types import ImageUrl
# Pass a list of mixed content — text instructions + an image.
# The framework converts ImageUrl into the provider-specific multimodal format.
result = await classifier_agent.run([
"Classify this document:",
ImageUrl(url="data:image/png;base64,<base64-data>"),
])
Other supported types include AudioUrl, DocumentUrl, and VideoUrl.
Chapter 4: Tools¶
LLMs are powerful reasoners, but they cannot check a database, call an API, or read a file on their own. Tools bridge that gap: they are functions the model can call during a conversation to fetch data, trigger side-effects, or run computations.
Pydantic AI already supports tool functions, but fireflyframework-agentic wraps them with
a richer layer: a protocol-based type system (ToolProtocol → BaseTool),
guards that enforce validation, rate-limiting, and sandboxing
before a tool executes (human-in-the-loop approval is separate — it pauses the run
rather than rejecting it), composition primitives (sequential, fallback, conditional),
a global registry for discovery, and a ToolKit that can convert framework tools
into Pydantic AI tools for injection into any agent.
Tool System Architecture¶
The following diagram shows the full tool stack — from how you create tools at the top, through the guard chain, to how they reach an agent at the bottom:
graph TB
subgraph "Tool Creation"
DEC["@firefly_tool decorator"]
BLD["ToolBuilder (fluent API)"]
BLT["Built-in Tools\n(Http, FileSystem, DateTime, JSON, ...)"]
end
subgraph "Tool Protocol Layer"
TP["ToolProtocol"]
BT["BaseTool\n(name, description, execute)"]
end
subgraph "Guard Chain"
CG["CompositeGuard"]
VG["ValidationGuard"]
RG["RateLimitGuard"]
SG["SandboxGuard"]
end
subgraph "Composition"
SEQ["SequentialComposer"]
FB["FallbackComposer"]
COND["ConditionalComposer"]
end
subgraph "Registration & Bridging"
TR["ToolRegistry\n(global catalog)"]
TK["ToolKit\n(group + as_pydantic_tools)"]
end
subgraph "Agent Integration"
FA["FireflyAgent(tools=[...])"]
PAI["pydantic_ai.Agent\n(calls tools during LLM run)"]
end
DEC --> BT
BLD --> BT
BLT --> BT
BT --> TR
BT -.->|"guarded"| CG
CG --> VG
CG --> RG
CG --> SG
BT -.->|"compose"| SEQ
BT -.->|"compose"| FB
BT -.->|"compose"| COND
TR --> TK
TK -->|"as_pydantic_tools"| FA
FA -->|"tools list"| PAI
Creating a Tool with the Decorator¶
The fastest path — one decorator does everything:
from fireflyframework_agentic.tools import firefly_tool
# @firefly_tool creates a BaseTool, registers it in the global ToolRegistry,
# and makes it discoverable by name for ToolKit grouping and agent bridging.
@firefly_tool(name="lookup_vendor", description="Look up vendor details by name")
async def lookup_vendor(vendor_name: str) -> str:
# In production, this would query a database or API.
vendors = {"Acme Corp": "ID-001", "Globex": "ID-002"}
return vendors.get(vendor_name, "Unknown vendor")
Creating a Tool with the Builder¶
When you need more control — or want to build tools programmatically at runtime —
use the fluent ToolBuilder:
from fireflyframework_agentic.tools import ToolBuilder
async def fetch_exchange_rate(currency: str) -> float:
"""Simulated exchange rate lookup."""
rates = {"USD": 1.0, "EUR": 0.85, "GBP": 0.73}
return rates.get(currency, 0.0)
# The builder pattern lets you set each property explicitly.
# Call .build() to produce the final BaseTool instance.
exchange_tool = (
ToolBuilder("exchange_rate")
.description("Get exchange rate for a currency code")
.parameter("currency", str, description="Three-letter currency code")
.handler(fetch_exchange_rate)
.build()
)
Tool Guards¶
In production, you rarely want a tool to run unconditionally. Guards are decorators
that wrap a tool's execution with hard, synchronous policy checks — input validation,
rate-limiting, or filesystem sandboxing. They run before the handler (and optionally
after), and they stack via CompositeGuard. (Human-in-the-loop approval, which pauses
the run rather than rejecting it, is handled separately — see below.)
Validation Guard¶
Ensures that all required parameters are present before the tool handler runs:
from fireflyframework_agentic.tools import firefly_tool, guarded
from fireflyframework_agentic.tools.guards import ValidationGuard
# The guard checks that all listed keys are present in kwargs.
@guarded(ValidationGuard(required_keys=["vendor_name"]))
@firefly_tool(name="lookup_vendor", description="Look up vendor")
async def lookup_vendor(vendor_name: str) -> str:
...
Rate Limit Guard¶
Prevents a tool from being called too frequently — essential for expensive or metered external APIs:
from fireflyframework_agentic.tools.guards import RateLimitGuard
# Token-bucket limiter: 10 calls per 60-second sliding window.
@guarded(RateLimitGuard(max_calls=10, period_seconds=60))
@firefly_tool(name="web_search", description="Search the web")
async def web_search(query: str) -> str:
...
Sandbox Guard¶
Restricts tool arguments via allow/deny regex patterns — useful for preventing path traversal, secret leakage, or access to dangerous locations:
from fireflyframework_agentic.tools.guards import SandboxGuard
# Allow paths under /tmp/uploads, deny path traversal (..) and .env files.
@guarded(SandboxGuard(
allowed_patterns=[r"^/tmp/uploads"],
denied_patterns=[r"\.\.", r"\.env"],
))
@firefly_tool(name="read_file", description="Read a file")
async def read_file(path: str) -> str:
...
Human-in-the-loop approval¶
Human approval is not a guard — it pauses the run rather than failing it. Mark a tool
with requires_approval=True; the agent run then pauses before the tool executes and the
caller resumes with an approval decision. See
Human-in-the-Loop Tool Approval for the full flow.
from fireflyframework_agentic.agents import is_deferred
from fireflyframework_agentic.tools import DeferredToolResults
@firefly_tool(name="delete_record", description="Delete a database record", requires_approval=True)
async def delete_record(record_id: str) -> str:
...
result = await agent.run("Delete record 42.")
if is_deferred(result): # paused for sign-off
approvals = {c.tool_call_id: True for c in result.output.approvals} # True / ToolApproved / ToolDenied
result = await agent.run(message_history=result.all_messages(),
deferred_tool_results=DeferredToolResults(approvals=approvals))
Composing Guards¶
Chain multiple guards with CompositeGuard — all must pass (AND semantics, evaluated
in order, first failure short-circuits):
from fireflyframework_agentic.tools.guards import CompositeGuard, ValidationGuard, RateLimitGuard
guard = CompositeGuard(guards=[
ValidationGuard(required_keys=["query"]),
RateLimitGuard(max_calls=5, period_seconds=30),
])
Retryable Tools¶
The @retryable decorator wraps a tool's execute method with exponential-backoff
retry logic — useful for tools that call flaky external APIs:
from fireflyframework_agentic.tools import firefly_tool, retryable
@retryable(max_retries=3, backoff=1.0)
@firefly_tool(name="call_erp", description="Fetch data from the ERP API")
async def call_erp(query: str) -> str:
# On failure, retries up to 3 times with 1s → 2s → 4s backoff
...
@retryable stacks with @guarded — guards run first, then retries wrap the handler:
@retryable(max_retries=2)
@guarded(RateLimitGuard(max_calls=10, period_seconds=60))
@firefly_tool(name="web_search", description="Search the web")
async def web_search(query: str) -> str:
...
Tool Composition¶
Combine tools into higher-level operations:
from fireflyframework_agentic.tools import SequentialComposer, FallbackComposer, ConditionalComposer
# Sequential: output of one becomes the `input` kwarg to the next.
# First positional arg is the composed tool's name.
pipeline = SequentialComposer(
"fetch-parse-validate",
[fetch_tool, parse_tool, validate_tool],
description="Fetch, parse, then validate",
)
# Fallback: try tools in order until one succeeds; raise if all fail.
resilient = FallbackComposer(
"resilient-api",
[primary_api, secondary_api, cache_lookup],
)
# Conditional: a router_fn receives kwargs and returns a key into tool_map.
def select_parser(**kwargs) -> str:
return "json" if kwargs.get("format") == "json" else "text"
router = ConditionalComposer(
"format-router",
router_fn=select_parser,
tool_map={"json": json_parser, "text": text_parser},
)
Built-In Tools¶
The framework ships with nine ready-to-use tools in tools/builtins/:
from fireflyframework_agentic.tools.builtins import (
DateTimeTool, # Current date/time, timezone conversion
JsonTool, # Parse, validate, extract, format JSON
TextTool, # Word count, regex extract, truncate, replace
CalculatorTool, # Safe math via AST parsing (no eval)
)
# Each tool follows ToolProtocol and can be registered and guarded
datetime_tool = DateTimeTool(default_timezone="America/New_York")
result = await datetime_tool.execute(action="now")
calculator = CalculatorTool()
result = await calculator.execute(expression="sqrt(144) + pi")
The full list: HttpTool, FileSystemTool, SearchTool, DatabaseTool, ShellTool, DateTimeTool, JsonTool, TextTool, CalculatorTool.
The Tool Registry¶
from fireflyframework_agentic.tools.registry import tool_registry
tool_registry.register(my_tool)
tool = tool_registry.get("my_tool")
all_tools = tool_registry.list_tools()
ToolKit¶
A ToolKit groups related tools and can convert them to Pydantic AI tools for
direct injection into an agent:
from fireflyframework_agentic.tools import ToolKit
from fireflyframework_agentic.tools.builtins import DateTimeTool, CalculatorTool
datetime_tool = DateTimeTool()
calculator = CalculatorTool()
kit = ToolKit("utility-tools", [datetime_tool, calculator], description="Common utilities")
# Register all tools in the toolkit at once
from fireflyframework_agentic.tools import tool_registry
kit.register_all(tool_registry)
Attaching Tools to Agents¶
The Firefly tool system (ToolRegistry, @firefly_tool, ToolKit) is a separate layer
from the Pydantic AI tool system baked into each agent. Here is how they connect:
Approach 1: Pass Pydantic AI tool functions directly to FireflyAgent
The tools parameter on FireflyAgent accepts any objects that Pydantic AI
recognises as tools — plain functions, pydantic_ai.Tool objects, etc.:
from fireflyframework_agentic.agents import FireflyAgent
async def lookup_vendor(ctx, vendor_name: str) -> str:
return f"Details for {vendor_name}"
agent = FireflyAgent(
name="extractor",
model="openai:gpt-4o",
tools=[lookup_vendor], # Pydantic AI tool functions
)
Approach 2: Register tools after creation with decorator proxies
FireflyAgent exposes .tool() and .tool_plain() decorator proxies that delegate
to the underlying Pydantic AI agent:
agent = FireflyAgent(name="assistant", model="openai:gpt-4o")
@agent.tool_plain
async def calculate(expression: str) -> str:
"""Evaluate a math expression."""
return str(eval(expression)) # simplified example
@agent.tool
async def get_user(ctx, user_id: str) -> str:
"""Look up a user by ID (receives RunContext)."""
return f"User {user_id}"
Approach 3: Bridge Firefly tools via ToolKit.as_pydantic_tools()
Firefly BaseTool instances (created with @firefly_tool, ToolBuilder, or built-ins)
live in the ToolRegistry. To feed them into an agent, convert via as_pydantic_tools():
from fireflyframework_agentic.tools import ToolKit
from fireflyframework_agentic.tools.builtins import DateTimeTool, JsonTool
from fireflyframework_agentic.agents import FireflyAgent
kit = ToolKit("utilities", [DateTimeTool(), JsonTool()])
agent = FireflyAgent(
name="helper",
model="openai:gpt-4o",
tools=kit.as_pydantic_tools(), # Convert to Pydantic AI tools
)
Key distinction:
ToolRegistryis a framework-level catalog for discovery and metadata. An agent only calls tools that are in its own Pydantic AI tools list. UseToolKit.as_pydantic_tools()oragent.tool()to bridge between the two.
IDP Tie-In: OCR and Vendor Lookup Tools¶
For our IDP pipeline, we need tools the extraction agent can call. We define them
with @firefly_tool, group them into a ToolKit, and attach them to the agent
via as_pydantic_tools(). This is the pattern you will see end-to-end in
Chapter 6 (reasoning patterns) and Chapter 18 (full IDP application).
Step 1 — Define the tools:
from fireflyframework_agentic.tools import firefly_tool, guarded
from fireflyframework_agentic.tools.guards import RateLimitGuard
@guarded(RateLimitGuard(max_calls=100, period_seconds=60))
@firefly_tool(name="ocr_extract", description="Extract text from a document image via OCR")
async def ocr_extract(image_data: str) -> str:
"""In production, call an OCR service like AWS Textract or Google Vision."""
return "Invoice #INV-2026-001\nVendor: Acme Corp\nAmount: $1,234.56\nDate: 2026-01-15"
@firefly_tool(name="vendor_lookup", description="Look up vendor details from the ERP system")
async def vendor_lookup(vendor_name: str) -> str:
vendors = {
"Acme Corp": '{"id": "V-001", "tax_id": "US-12345", "payment_terms": "NET30"}',
"Globex": '{"id": "V-002", "tax_id": "US-67890", "payment_terms": "NET60"}',
}
return vendors.get(vendor_name, '{"error": "Vendor not found"}')
Step 2 — Group into a ToolKit and attach to the extraction agent:
from fireflyframework_agentic.tools import ToolKit
from fireflyframework_agentic.tools.builtins import CalculatorTool
from fireflyframework_agentic.agents import FireflyAgent
extraction_kit = ToolKit(
"idp-extraction",
[ocr_extract, vendor_lookup, CalculatorTool()],
description="Tools for IDP invoice extraction",
)
extractor_agent = FireflyAgent(
name="extractor",
model="openai:gpt-4o",
instructions="You are an invoice data extraction specialist.",
tools=extraction_kit.as_pydantic_tools(), # Bridge Firefly tools → Pydantic AI
)
What happens next: In Chapter 6 we pass
extractor_agent(with its tools already attached) to reasoning patterns like Plan-and-Execute and Reflexion. The pattern callsagent.run()internally — the tools are available because they were bound here. Chapter 18 shows the complete production module (idp/tools.py) with retries, guards, and the full ToolKit.
Chapter 5: Prompts¶
If tools are an agent's hands, prompts are its brain. The exact wording of a system prompt can mean the difference between extracting 60% of invoice fields and 98%. In production you need to version prompts (so you can A/B test), compose them (system + context + task), validate them (catch missing variables before runtime), and load them from files (so non-engineers can edit them).
The Prompts module provides all of this through a Jinja2-based template engine. Every template is a first-class object with a name, a version, declared variables, and a render method — not just a raw string.
Creating a Prompt Template¶
A PromptTemplate takes three positional arguments — name, system_template, and
user_template — plus keyword-only version, description, required_variables, and
metadata. Both templates are Jinja2 source:
from fireflyframework_agentic.prompts import PromptTemplate
extraction_prompt = PromptTemplate(
"invoice_extraction",
# system_template
"You are an invoice data extraction specialist.\n"
"Always return valid JSON matching the requested schema.",
# user_template
"Extract the following fields from the document text below:\n"
"- invoice_number\n"
"- vendor_name\n"
"- total_amount (numeric)\n"
"- due_date (ISO format)\n"
"- line_items (list of {description, quantity, unit_price})\n\n"
"Document text:\n{{ document_text }}",
version="1.0.0",
required_variables=["document_text"],
)
# render() returns a Prompt object with .system and .user fields (NOT a string).
prompt = extraction_prompt.render(document_text="Invoice #INV-001 from Acme Corp...")
print(prompt.system) # rendered system_template
print(prompt.user) # rendered user_template
Templates use Jinja2 syntax — {{ variable }}, {% if %}, {% for %}, filters, and
macros all work. required_variables is validated at render time: rendering without a
required variable raises PromptValidationError.
Versioning¶
The PromptRegistry supports multiple versions of the same template. This is crucial
for A/B testing different prompt strategies:
from fireflyframework_agentic.prompts import PromptRegistry
registry = PromptRegistry()
registry.register(extraction_prompt_v1)
registry.register(extraction_prompt_v2)
# Get the latest version
latest = registry.get("invoice_extraction")
# Get a specific version
v1 = registry.get("invoice_extraction", "1.0.0")
Composition¶
Templates can be composed using three strategies:
Sequential Composition¶
Render templates in order and join them — useful for building system + context + task prompts.
The system parts are joined together and the user parts are joined together, and
render() returns a single Prompt:
from fireflyframework_agentic.prompts.composer import SequentialComposer
# By default, templates are joined with "\n\n". Override with the keyword-only `separator=`.
composer = SequentialComposer(
[system_prompt, context_prompt, task_prompt],
separator="\n\n",
)
prompt = composer.render(document_text="Invoice #INV-001...") # -> Prompt(.system, .user)
Conditional Composition¶
Select a template based on a runtime condition. The condition_fn receives the
render kwargs and returns a string key that maps into template_map:
from fireflyframework_agentic.prompts.composer import ConditionalComposer
# The condition function inspects the kwargs and returns a template key.
# Both args are positional: (condition_fn, template_map).
composer = ConditionalComposer(
lambda **kwargs: "invoice" if kwargs.get("doc_type") == "invoice" else "generic",
{
"invoice": invoice_prompt,
"generic": generic_prompt,
},
)
prompt = composer.render(doc_type="invoice", document_text="...") # -> Prompt
Merge Composition¶
Render templates and merge with a custom function — full control over how pieces combine:
from fireflyframework_agentic.prompts.composer import MergeComposer
# The merge_fn is applied separately to the system parts and to the user parts.
# Both args are positional: (templates, merge_fn).
composer = MergeComposer(
[header, body, footer],
lambda parts: "\n---\n".join(parts),
)
prompt = composer.render(document_text="...") # -> Prompt
Validation¶
The PromptValidator checks rendered prompts against configurable constraints —
token limits and required sections — catching problems before they reach the LLM:
from fireflyframework_agentic.prompts import PromptValidator
# Validate that the rendered prompt fits within 4,000 tokens
# and contains the required "valid JSON" section.
validator = PromptValidator(max_tokens=4000, required_sections=["valid JSON"])
prompt = extraction_prompt.render(document_text="Invoice #INV-001 from Acme Corp...")
# validate() takes a string — pass the rendered user (or system) text.
result = validator.validate(prompt.user)
if not result.valid:
print(f"Prompt issues: {result.errors}")
Using Prompts with Agents¶
So far we've created templates, versioned them, and composed them — but none of that is useful until the rendered text reaches an agent. Here is how the two systems connect.
Direct rendering → agent.run() — The simplest path. Render a template and pass
the rendered user text as the prompt. (render() returns a Prompt; an agent's
run() takes the user text — the system prompt is usually set as the agent's
instructions.)
from fireflyframework_agentic.agents import FireflyAgent
from fireflyframework_agentic.prompts import PromptTemplate
extraction_prompt = PromptTemplate(
"invoice_extraction",
"You are a precise invoice data extraction assistant.", # system_template
"Extract invoice_number, vendor_name, total_amount, due_date from:\n\n"
"{{ document_text }}\n\nReturn valid JSON.", # user_template
)
# Use the template's system text as the agent's instructions.
prompt = extraction_prompt.render(document_text=ocr_output)
agent = FireflyAgent(
name="extractor", model="openai:gpt-4o", instructions=prompt.system, output_type=dict
)
result = await agent.run(prompt.user)
print(result.output) # {"invoice_number": "INV-001", ...}
Composed prompts → agent.run() — Use a composer when you need to assemble
multiple templates (system instructions + context + task) into one prompt:
from fireflyframework_agentic.prompts.composer import SequentialComposer
# Each template provides system + user halves; here the work lives in the user half.
system = PromptTemplate("system", "You are a precise data extraction assistant.", "")
context = PromptTemplate("context", "", "Document type: {{ doc_type }}")
task = PromptTemplate("task", "", "Extract fields from:\n{{ document_text }}")
composer = SequentialComposer([system, context, task])
prompt = composer.render(doc_type="invoice", document_text=ocr_output)
result = await agent.run(prompt.user)
Reasoning patterns use prompts internally — Every reasoning pattern (Chapter 6)
has named prompt slots backed by PromptTemplate instances. When a ReAct pattern
calls agent.run(), it first renders its "thought" template, passes the result
to the agent, and records the output in the trace. You can override any slot:
from fireflyframework_agentic.reasoning import ReActPattern
# Override the built-in thought prompt with your own template.
custom = PromptTemplate("my:thought", "You are a careful reasoner.", "Think about: {{ context }}")
pattern = ReActPattern(prompts={"thought": custom})
See Chapter 6 → Configurable Prompts for the full list of prompt slots per pattern.
Loading from Files¶
For large prompts or team workflows, store templates as files:
PromptLoader exposes three static factory methods — from_string(),
from_file(), and from_directory():
from fireflyframework_agentic.prompts import PromptLoader
# From an inline string (name, system_template, user_template).
template = PromptLoader.from_string(
"invoice_extraction",
"You are an invoice parser.",
"Extract fields from:\n{{ document_text }}",
)
# From a YAML file (keys map to PromptTemplate fields; name defaults to the file stem).
template = PromptLoader.from_file("prompts/invoice_extraction.yaml")
# Load an entire directory (defaults to the *.j2 glob).
all_templates = PromptLoader.from_directory("prompts/")
When a name is not supplied, the loader infers it from the file stem (e.g.
invoice_extraction.yaml becomes "invoice_extraction").
IDP Tie-In: Versioned Extraction Prompts¶
For our IDP pipeline, we create versioned prompts that can be A/B tested later:
from fireflyframework_agentic.prompts import PromptTemplate, PromptRegistry
prompt_registry = PromptRegistry()
# Version 1: Simple extraction
extraction_v1 = PromptTemplate(
"idp_extraction",
"You are an invoice parser. Return valid JSON.", # system_template
"Extract these fields from the invoice:\n" # user_template
"- invoice_number, vendor_name, total_amount, due_date, line_items\n\n"
"Text: {{ document_text }}",
version="1.0.0",
required_variables=["document_text"],
)
# Version 2: More structured with examples
extraction_v2 = PromptTemplate(
"idp_extraction",
"You are an expert invoice parser. Extract structured data and return valid JSON.",
"Required fields:\n"
" invoice_number: string (format: INV-NNNN)\n"
" vendor_name: string\n"
" total_amount: float\n"
" due_date: string (ISO 8601)\n"
" line_items: list of {description: str, quantity: int, unit_price: float}\n\n"
"Example output:\n"
'{"invoice_number": "INV-0001", "vendor_name": "Example", "total_amount": 100.0, '
'"due_date": "2026-01-01", "line_items": [{"description": "Widget", "quantity": 2, '
'"unit_price": 50.0}]}\n\n'
"Document text:\n{{ document_text }}",
version="2.0.0",
required_variables=["document_text"],
)
prompt_registry.register(extraction_v1)
prompt_registry.register(extraction_v2)
# In production, select the version based on experiment configuration
template = prompt_registry.get("idp_extraction") # Returns v2 (latest)
prompt = template.render(document_text=ocr_output) # -> Prompt(.system, .user)
Part II — Intelligence¶
Chapter 6: Reasoning Patterns¶
Here's a hard truth about LLMs: if you throw a complex question at an agent in a single prompt, you're gambling. Sometimes the model nails it; sometimes it hallucinated half the answer. Reasoning patterns fix that by giving the agent a structured way to think before it answers — loops of thinking, acting, observing, and reflecting, each step recorded so you can see exactly what happened.
The framework ships six patterns out of the box (ReAct, Chain of Thought, Plan-and-Execute, Reflexion, Tree of Thoughts, Goal Decomposition). Each one is a different strategy for the same problem: how do you turn "figure this out" into a repeatable, observable, debuggable process?
The Architecture¶
All patterns share the same core engine. AbstractReasoningPattern implements the
Template Method design pattern: the base class runs the outer loop (step counting,
trace recording, max-steps enforcement, optional output review), and each concrete
pattern overrides five hooks that define its behaviour:
_reason(state) → Generate a thought ("what do I think?")
_act(state) → Perform an action ("what should I do?")
_observe(state, act) → Process the action's result ("what happened?")
_should_continue() → Decide whether to loop again ("am I done?")
_extract_output() → Produce the final answer ("what's the result?")
Here is how all the pieces fit together:
graph TD
subgraph AbstractReasoningPattern
EX["execute(agent, input)"] --> R["_reason()"]
R --> STOP{"_should_stop?"}
STOP -->|no| A["_act()"]
A --> O["_observe()"]
O --> CONT{"_should_continue?"}
CONT -->|yes| R
CONT -->|no| OUT["_extract_output()"]
STOP -->|yes| OUT
OUT --> REV{"reviewer?"}
REV -->|yes| REVIEW["OutputReviewer.review()"]
REV -->|no| RESULT["ReasoningResult"]
REVIEW --> RESULT
end
subgraph Six Patterns
REACT["ReAct"]
COT["Chain of Thought"]
PAE["Plan-and-Execute"]
REF["Reflexion"]
TOT["Tree of Thoughts"]
GD["Goal Decomposition"]
end
REACT -->|extends| EX
COT -->|extends| EX
PAE -->|extends| EX
REF -->|extends| EX
TOT -->|extends| EX
GD -->|extends| EX
How Reasoning Patterns Use Agent Tools¶
An important thing to understand: reasoning patterns don't manage tools. They
manage thinking. When a pattern needs to interact with the outside world (search a
database, call an API, do math), it delegates to agent.run() — and the agent's
configured Pydantic AI tools handle the rest.
This means you must attach tools to the agent before passing it to a pattern. Here is the canonical flow, using the framework's tool system end-to-end:
# ── Step 1: Define tools with @firefly_tool (see Chapter 4) ──────────────
# These are automatically registered in the global ToolRegistry.
from fireflyframework_agentic.tools import firefly_tool, ToolKit
@firefly_tool(name="vendor_lookup", description="Look up vendor in the ERP system")
async def vendor_lookup(vendor_name: str) -> str:
return '{"id": "V-001", "tax_id": "US-12345"}'
@firefly_tool(name="calculate", description="Evaluate a math expression safely")
async def calculate(expression: str) -> str:
return str(eval(expression)) # simplified
# ── Step 2: Group tools in a ToolKit and bridge to Pydantic AI ───────────
# ToolKit.as_pydantic_tools() converts framework BaseTool instances into
# pydantic_ai.Tool objects that the LLM can call during agent.run().
extraction_tools = ToolKit(
"extraction-tools",
[vendor_lookup, calculate],
description="Tools available during invoice extraction",
)
# ── Step 3: Create an agent WITH the bridged tools ──────────────────────
from fireflyframework_agentic.agents import FireflyAgent
extractor = FireflyAgent(
name="extractor",
model="openai:gpt-4o",
tools=extraction_tools.as_pydantic_tools(), # Bridge: Firefly → Pydantic AI
)
# ── Step 4: Pass the tool-equipped agent to a reasoning pattern ─────────
# The pattern calls extractor.run() internally, which triggers Pydantic AI's
# tool dispatch — the LLM decides when to call vendor_lookup or calculate.
from fireflyframework_agentic.reasoning import ReActPattern
react = ReActPattern(max_steps=5)
result = await react.execute(extractor, "What is the total with tax for vendor Acme Corp?")
The runtime flow is: Pattern → agent.run(prompt) → Pydantic AI → LLM decides to call tools → tool results → back to pattern loop.
You can also skip the framework tool system and pass plain Pydantic AI tool functions
directly to FireflyAgent(tools=[...]) — see Chapter 4 → Attaching Tools to Agents
for all three approaches.
Architecture: Why Tools Are Explicitly Bound to Agents¶
You may wonder: "If @firefly_tool already registers a tool in the global
ToolRegistry, why doesn't every agent automatically see every tool?"
This is by design. The framework has two separate layers with different purposes:
-
ToolRegistry(discovery layer) — A global catalog of all tools in the system. Used for admin dashboards, documentation generation, plugin discovery, and pipeline wiring. Think of it as a phone book: it lists everything that exists. -
Agent tool binding (execution layer) — The specific set of tools an agent can invoke during an LLM run. Only tools explicitly passed via
FireflyAgent(tools=[])oragent.tool()are available to the model.
Why explicit binding?
- Security — An agent that processes user input should not have access to
ShellToolordelete_record. Implicit auto-injection of all registered tools would create a dangerous attack surface. - Predictability — When you read an agent definition, you can see exactly which tools it can call. No surprises from a plugin that registered a tool at import time.
- Cost control — Each tool in a Pydantic AI agent's tool list adds tokens to the system prompt. Injecting 50 tools when an agent only needs 3 wastes tokens and confuses the model.
- Principle of least privilege — Each agent gets the minimum set of tools required for its task, not the maximum set available.
Use ToolKit to curate which subset of registered tools each agent receives:
from fireflyframework_agentic.tools import ToolKit
from fireflyframework_agentic.tools.registry import tool_registry
# Curate: only extraction-related tools for this agent
extraction_kit = ToolKit("extraction", [
tool_registry.get("vendor_lookup"),
tool_registry.get("calculate"),
])
extractor = FireflyAgent(
name="extractor",
model="openai:gpt-4o",
tools=extraction_kit.as_pydantic_tools(),
)
Memory in Reasoning Patterns¶
Reasoning patterns can also access the framework's memory system. When you pass a
MemoryManager via the memory keyword argument, it becomes available in the
pattern's internal state as state["memory"]. This lets pattern hooks read and write
working memory during iterations:
from fireflyframework_agentic.reasoning import PlanAndExecutePattern
from fireflyframework_agentic.memory import MemoryManager
memory = MemoryManager(working_scope_id="extraction-session")
# Store a fact that the pattern's hooks can read
memory.set_fact("doc_type", "invoice")
# The memory kwarg is forwarded into state["memory"] inside the pattern
pattern = PlanAndExecutePattern(max_steps=15)
result = await pattern.execute(
extractor_agent,
"Extract invoice fields from the OCR text.",
memory=memory, # Available as state["memory"] in _reason(), _act(), etc.
)
In pipelines, this happens automatically — ReasoningStep passes
PipelineContext.memory to the pattern (see Chapter 10).
Structured Output Models¶
Patterns use typed Pydantic models instead of raw text. This is a big deal: instead of parsing free-form strings for magic words like "FINISH", the model returns structured objects with explicit fields. No more fragile regex parsing:
from fireflyframework_agentic.reasoning.models import (
ReasoningThought,
ReasoningAction,
ReasoningPlan,
PlanStepDef,
StepStatus,
ReflectionVerdict,
BranchEvaluation,
GoalDecompositionResult,
GoalPhase,
)
# A structured thought — is_final replaces magic phrases like "FINISH"
thought = ReasoningThought(
content="The invoice number is INV-2026-001",
is_final=True,
final_answer="INV-2026-001",
confidence=0.95,
)
# A structured plan with step tracking
plan = ReasoningPlan(
goal="Extract all invoice fields",
steps=[
PlanStepDef(id="s1", description="Find invoice number"),
PlanStepDef(id="s2", description="Find vendor name", dependencies=["s1"]),
PlanStepDef(id="s3", description="Find amount and date", dependencies=["s1"]),
],
)
# Each step has a status: pending → running → completed/failed/skipped
The Six Built-In Patterns¶
1. ReAct (Reasoning + Acting)¶
Interleaves thinking and tool use. Each iteration produces a ReasoningThought; when
is_final=True, the loop stops.
from fireflyframework_agentic.reasoning import ReActPattern
react = ReActPattern(max_steps=5)
result = await react.execute(my_agent, "What is the total on invoice INV-001?")
print(result.output)
print(f"Steps taken: {len(result.trace.steps)}")
When to use: Tasks that need external information (tool calls, searches, API lookups) combined with reasoning.
2. Chain of Thought¶
The agent reasons step by step through a problem. Each step is a ReasoningThought.
from fireflyframework_agentic.reasoning import ChainOfThoughtPattern
cot = ChainOfThoughtPattern(max_steps=10)
result = await cot.execute(my_agent, "Calculate the VAT on $1,234.56 at 20%.")
When to use: Math, logic puzzles, multi-step deductions where the agent needs to show its work.
3. Plan-and-Execute¶
The agent first generates a ReasoningPlan with PlanStepDef items, then executes
each step with status tracking (pending → running → completed/failed). Supports
replanning when steps fail.
from fireflyframework_agentic.reasoning import PlanAndExecutePattern
planner = PlanAndExecutePattern(max_steps=15, allow_replan=True)
result = await planner.execute(
extractor_agent,
"Extract all fields from this invoice:\n" + ocr_text,
)
When to use: Complex, multi-step tasks where you want visibility into progress and the ability to recover from failures.
4. Reflexion¶
Adds a self-evaluation loop. After generating output, the agent critiques itself via
a ReflectionVerdict. If is_satisfactory=False, the issues and suggestions are fed
back into a retry prompt.
from fireflyframework_agentic.reasoning import ReflexionPattern
reflexion = ReflexionPattern(max_steps=3)
result = await reflexion.execute(my_agent, "Write a unit test for binary search.")
When to use: Tasks where quality matters and the agent can meaningfully self-correct (writing code, generating reports, data extraction).
5. Tree of Thoughts¶
Explores multiple reasoning branches and evaluates each with a BranchEvaluation
(score + reasoning). The highest-scoring branch is selected.
from fireflyframework_agentic.reasoning import TreeOfThoughtsPattern
tot = TreeOfThoughtsPattern(branching_factor=3, max_depth=3)
result = await tot.execute(my_agent, "Design an API for a task management system.")
When to use: Open-ended problems with multiple valid approaches where you want to explore and compare alternatives.
6. Goal Decomposition¶
Breaks a high-level goal into structured GoalPhase objects, each with concrete tasks.
from fireflyframework_agentic.reasoning import GoalDecompositionPattern
decomposer = GoalDecompositionPattern(max_steps=20)
result = await decomposer.execute(my_agent, "Build an IDP pipeline for invoice processing.")
When to use: Strategic planning, project decomposition, breaking down large goals into actionable phases.
Configurable Prompts¶
Every pattern uses PromptTemplate instances for its LLM calls. You can override any
prompt by passing a prompts dict:
from fireflyframework_agentic.prompts.template import PromptTemplate
from fireflyframework_agentic.reasoning import ReActPattern
custom_thought = PromptTemplate(
"my:react:thought",
"You are a careful reasoner.", # system_template
"Think carefully about: {{ context }}", # user_template
required_variables=["context"],
)
pattern = ReActPattern(prompts={"thought": custom_thought})
Each pattern has named prompt slots:
- ReAct:
"thought" - Chain of Thought:
"step" - Plan-and-Execute:
"plan","execute_step","replan" - Reflexion:
"critique","retry" - Tree of Thoughts:
"branch","evaluate" - Goal Decomposition:
"decompose","plan_phase"
All built-in prompts are registered in the global prompt_registry under the
reasoning: namespace (e.g. "reasoning:react:thought").
Reasoning Pipeline¶
Chain patterns sequentially — the output of one becomes the input to the next:
from fireflyframework_agentic.reasoning import ReasoningPipeline, PlanAndExecutePattern, ReActPattern
pipeline = ReasoningPipeline([
PlanAndExecutePattern(max_steps=15),
ReActPattern(max_steps=10),
])
result = await pipeline.execute(my_agent, "Analyse the invoice and verify the totals.")
Reasoning Trace¶
Every pattern records its work in a ReasoningTrace. The trace contains typed steps:
- ThoughtStep — The agent's reasoning.
- ActionStep — A tool invocation.
- ObservationStep — The result of an action.
- ReflectionStep — Self-evaluation.
- PlanStep — A plan or sub-goal.
Creating a Custom Pattern¶
Extend AbstractReasoningPattern and override the template methods:
from fireflyframework_agentic.reasoning.base import AbstractReasoningPattern
class VerifyAndCorrectPattern(AbstractReasoningPattern):
def __init__(self, *, max_steps: int = 5):
super().__init__("verify_and_correct", max_steps=max_steps)
async def _reason(self, state):
# Generate a thought about the current state
...
async def _act(self, state):
# Perform verification action
...
async def _observe(self, state, action):
# Process verification result
...
async def _should_continue(self, state):
# Continue until verification passes or max steps reached
...
async def _extract_output(self, state):
# Return the verified/corrected output
...
Register it to make it available framework-wide:
from fireflyframework_agentic.reasoning.registry import reasoning_registry
reasoning_registry.register("verify_and_correct", VerifyAndCorrectPattern)
IDP Tie-In: Extraction with Plan-and-Execute + Reflexion¶
Now let's put reasoning patterns to work in our invoice pipeline. In Chapter 4
we defined IDP tools (ocr_extract, vendor_lookup) and grouped them into
extraction_kit. We also created extractor_agent with those tools attached
via extraction_kit.as_pydantic_tools(). Here we pass that agent — tools and
all — to reasoning patterns.
The extraction phase is the hardest part — we need to find invoice numbers, vendor names, amounts, and line items from messy OCR text. A single-shot prompt might miss something, so we use Plan-and-Execute to break it into steps, and Reflexion as a safety net when validation catches errors:
from fireflyframework_agentic.reasoning import PlanAndExecutePattern, ReflexionPattern
# ── Recall from Chapter 4 ───────────────────────────────────────────
# extractor_agent = FireflyAgent(
# name="extractor",
# model="openai:gpt-4o",
# tools=extraction_kit.as_pydantic_tools(), # ocr_extract, vendor_lookup, CalculatorTool
# )
# The tools are already bound — reasoning patterns call agent.run() internally,
# so the agent can invoke any of its tools during each reasoning step.
# ─────────────────────────────────────────────────────────────────────
# Phase 3: Systematic extraction.
# Plan-and-Execute generates a plan ("find invoice number", "find vendor", ...)
# and executes each step with status tracking. If a step fails, it can replan.
extraction_pattern = PlanAndExecutePattern(max_steps=15, allow_replan=True)
extraction_result = await extraction_pattern.execute(
extractor_agent, # Tools already attached in Ch4 via ToolKit.as_pydantic_tools()
f"Extract invoice fields from:\n{ocr_text}",
)
# Phase 4: If validation catches errors (e.g. invalid date format), use Reflexion
# to let the agent critique its own output and self-correct.
if not validation_passed:
reflexion = ReflexionPattern(max_steps=2)
corrected = await reflexion.execute(
extractor_agent,
f"Previous extraction had errors: {errors}. Re-extract from:\n{ocr_text}",
)
Architecture recap: Reasoning patterns never see tools directly. They receive an agent (which owns its tools) and call
agent.run(). This is why tools must be bound to the agent before passing it to a pattern — see the "Attaching Tools to Agents" section in Chapter 4 and the fullidp/tools.pymodule in Chapter 18.
Chapter 7: Content Processing¶
Here's a problem you'll hit fast: your 50-page invoice PDF produces OCR text that is 200,000 tokens long — but your model's context window is 128K, and you're paying per token. You can't just shove the entire document into one call.
The Content module solves this with a three-stage pipeline:
- Chunk — Split oversized content into overlapping pieces that each fit within a
token budget.
TextChunkerhandles text (by token, sentence, or paragraph),DocumentSplitterhandles multi-page documents, andImageTilerhandles large images. - Process — Send chunks through an agent concurrently via
BatchProcessor. This is where you do per-chunk OCR cleanup, translation, or summarisation. - Compress — Merge the results back into a single context that fits the
downstream agent's window.
ContextCompressorsupports three strategies: truncation (cheap, lossy), summarisation (LLM-based, preserves meaning), and map-reduce (chunk → summarise each → merge).
Together they let you process documents of any size without losing information or blowing up your bill.
Content Processing Pipeline¶
graph LR
subgraph "1. Chunk"
TC["TextChunker"]
DS["DocumentSplitter"]
IT["ImageTiler"]
end
subgraph "2. Process"
BP["BatchProcessor\n(concurrent agent calls)"]
end
subgraph "3. Compress"
CC["ContextCompressor"]
TR["TruncationStrategy"]
SM["SummarizationStrategy"]
MR["MapReduceStrategy"]
end
subgraph "Utilities"
TE["TokenEstimator"]
SW["SlidingWindowManager"]
end
TC --> BP
DS --> BP
IT --> BP
BP --> CC
CC --> TR
CC --> SM
CC --> MR
TE -.-> TC
TE -.-> CC
TE -.-> SW
Text Chunking¶
TextChunker splits text into overlapping chunks using one of three strategies:
from fireflyframework_agentic.content.chunking import TextChunker
chunker = TextChunker(
chunk_size=4000, # Max tokens per chunk
chunk_overlap=200, # Overlap between consecutive chunks
strategy="paragraph", # "token" | "sentence" | "paragraph"
)
chunks = chunker.chunk(long_document_text)
for chunk in chunks:
print(f"Chunk {chunk.index}/{chunk.total_chunks}: {len(chunk.content)} chars")
# chunk.metadata contains additional info
Each chunk is a Chunk model with content, index, total_chunks,
overlap_tokens, and an open metadata dict.
Strategy guide:
- "token" — Split by estimated token count. Best for arbitrary text.
- "sentence" — Split at sentence boundaries. Best for natural language.
- "paragraph" — Split at paragraph boundaries (\n\n). Best for structured docs.
Document Splitter¶
DocumentSplitter handles multi-document inputs, splitting at page breaks (\f) or
horizontal rules (---):
from fireflyframework_agentic.content.chunking import DocumentSplitter
splitter = DocumentSplitter(min_length=50)
segments = splitter.split(multi_page_text)
Image Tiler¶
For large images that exceed a VLM's pixel budget, ImageTiler computes tile
coordinates:
from fireflyframework_agentic.content.chunking import ImageTiler
tiler = ImageTiler(tile_width=1024, tile_height=1024, overlap=128)
tiles = tiler.compute_tiles(image_width=4096, image_height=6144)
# Each tile is a Chunk with metadata: x, y, width, height, row, col
Batch Processing¶
BatchProcessor sends chunks through an agent concurrently. By default it returns a
list of per-chunk result strings; pass a result_aggregator callable to merge them
into any shape you like:
from fireflyframework_agentic.content.chunking import BatchProcessor
processor = BatchProcessor(concurrency=4)
results = await processor.process(ocr_agent, chunks) # -> list[str]
Context Compression¶
When concatenated chunks still exceed the extraction agent's context window, compress them:
Truncation Strategy¶
Hard-cuts the text at a token limit:
from fireflyframework_agentic.content.compression import ContextCompressor, TruncationStrategy
compressor = ContextCompressor(strategy=TruncationStrategy())
compressed = await compressor.compress(full_text, max_tokens=8000)
Summarization Strategy¶
Uses an LLM agent to intelligently summarise:
from fireflyframework_agentic.content.compression import SummarizationStrategy
compressor = ContextCompressor(
strategy=SummarizationStrategy(summary_agent)
)
compressed = await compressor.compress(full_text, max_tokens=8000)
Map-Reduce Strategy¶
Chunks the text, summarises each chunk in parallel, then merges the summaries:
from fireflyframework_agentic.content.compression import MapReduceStrategy
compressor = ContextCompressor(
strategy=MapReduceStrategy(summary_agent)
)
compressed = await compressor.compress(full_text, max_tokens=8000)
Sliding Window Manager¶
Maintains a sliding window over a stream of messages, keeping total token usage within budget:
from fireflyframework_agentic.content.compression import SlidingWindowManager
window = SlidingWindowManager(max_tokens=8000)
window.add("First OCR page output...")
window.add("Second OCR page output...")
current_context = window.get_context() # Only recent items that fit
Token Estimator¶
Estimate token counts without an API call:
from fireflyframework_agentic.content.compression import TokenEstimator
estimator = TokenEstimator() # Default ratio: 1.33 tokens per word
tokens = estimator.estimate("This is a test sentence.")
Binary Ingestion (content.binary)¶
The text pipeline above assumes you already have text. For real documents — PDFs, Office
files, images, archives, emails — the content.binary submodule (installed via the
[binary] extra) turns raw caller bytes into one or more normalised BinaryArtifact
rows ready for OCR/extraction. BinaryNormalizer dispatches by sniffed media type and
delegates to pluggable handlers: PdfGuard (PDF sanitisation), ImageNormalizer,
OfficeConverter (build via build_office_converter, backed by GotenbergConverter,
LibreOfficeConverter, or NoOpOfficeConverter), ArchiveUnpacker, and EmailUnpacker.
from fireflyframework_agentic.content.binary import (
BinaryNormalizer, BinaryConfig, build_office_converter, sniff_media_type,
)
config = BinaryConfig() # caps, toggles
normalizer = BinaryNormalizer(config=config, office=build_office_converter(config))
# normalise() is async and never returns empty.
artifacts = await normalizer.normalise(raw_bytes, filename="invoice.pdf")
for art in artifacts:
print(art.media_type, art.kind, art.page_count)
Handlers are injected so a host can swap implementations (e.g. Gotenberg vs LibreOffice) without touching the framework.
IDP Tie-In: Processing Large Documents¶
In our IDP pipeline, the OCR phase may produce text that exceeds the extraction agent's context window. Here's how we handle it:
from fireflyframework_agentic.content.chunking import TextChunker, BatchProcessor
from fireflyframework_agentic.content.compression import ContextCompressor, MapReduceStrategy
# Step 1: Chunk the raw OCR output
chunker = TextChunker(chunk_size=3000, chunk_overlap=200, strategy="paragraph")
chunks = chunker.chunk(raw_ocr_text)
# Step 2: If needed, process chunks in parallel through OCR cleanup agent.
# By default process() returns a list of strings (one per chunk).
processor = BatchProcessor(concurrency=4)
cleaned_chunks = await processor.process(ocr_cleanup_agent, chunks)
# Step 3: Compress for the extraction agent (compress() is async — await it).
full_text = "\n".join(cleaned_chunks)
compressor = ContextCompressor(
strategy=MapReduceStrategy(summary_agent)
)
compressed_text = await compressor.compress(full_text, max_tokens=8000)
Chapter 8: Memory¶
Without memory, every agent.run() call starts from scratch — the agent has no idea
what happened in the previous turn, what the pipeline figured out two steps ago, or
what facts a delegated sub-agent discovered. That's fine for one-shot tasks, but
real applications need context: multi-turn conversations, session state, pipeline
variables. The Memory module gives your agents a brain that persists across calls.
There are two kinds of memory here:
- Conversation memory — the actual chat history ("what was said"), automatically
trimmed to stay within your token budget.
- Working memory — a key-value scratchpad ("what we know") for storing facts like
doc_type = "invoice" that multiple agents or pipeline steps can read.
Architecture¶
graph TD
subgraph MemoryManager
MM["MemoryManager<br/><small>new_conversation · fork<br/>set_fact · get_fact</small>"]
end
subgraph Conversation
CM["ConversationMemory<br/><small>add_turn · get_history<br/>token budget · FIFO eviction</small>"]
end
subgraph Working
WM["WorkingMemory<br/><small>set · get · delete<br/>scoped namespaces</small>"]
end
subgraph Backends
IMS["InMemoryStore<br/><small>dict-backed</small>"]
FS["FileStore<br/><small>JSON file per namespace</small>"]
CS["YourCustomStore<br/><small>implements MemoryStore protocol</small>"]
end
subgraph Consumers
AGT["FireflyAgent<br/><small>auto message_history</small>"]
DR["DelegationRouter<br/><small>auto fork on delegation</small>"]
PIPE["PipelineContext<br/><small>propagated to steps</small>"]
RP["ReasoningPattern<br/><small>state['memory']</small>"]
end
MM --> CM
MM --> WM
WM --> IMS
WM --> FS
WM --> CS
AGT --> MM
DR --> MM
PIPE --> MM
RP --> MM
The system has four layers:
- ConversationMemory — Token-aware, per-conversation chat history. Wraps
Pydantic AI's
message_historyand drops the oldest turns when you exceed budget. - WorkingMemory — Scoped key-value scratchpad for facts and intermediate state.
- MemoryStore — Pluggable persistence (
InMemoryStore,FileStore, or yours). - MemoryManager — Facade that composes conversation + working memory behind a single API. This is the object you wire into agents, pipelines, and patterns.
Quick Start¶
Here's the simplest way to give an agent a memory — attach a MemoryManager and pass
a conversation_id to each call:
from fireflyframework_agentic.agents import FireflyAgent
from fireflyframework_agentic.memory import MemoryManager
# Create a memory manager with a 32K token budget for conversation history.
# When the history exceeds this, the oldest turns get dropped automatically.
memory = MemoryManager(max_conversation_tokens=32_000)
# Wire the memory into the agent. From now on, every run() call
# can participate in a persistent conversation.
agent = FireflyAgent(name="assistant", model="openai:gpt-4o", memory=memory)
# Start a new conversation — this returns a unique conversation ID.
conv_id = memory.new_conversation()
# Turn 1: the agent has no history yet.
result1 = await agent.run("What is Python?", conversation_id=conv_id)
# Turn 2: under the hood, FireflyAgent loads the message_history from Turn 1
# and passes it to Pydantic AI, so the model sees the full conversation.
result2 = await agent.run("What about its type system?", conversation_id=conv_id)
# result2 knows we were talking about Python — no context lost.
Conversation Memory¶
ConversationMemory manages chat history per conversation ID. It enforces a token
budget by dropping the oldest turns (FIFO):
from fireflyframework_agentic.memory import ConversationMemory
conv_mem = ConversationMemory(max_tokens=16_000)
cid = conv_mem.new_conversation()
# After an agent run, store the turn
conv_mem.add_turn(
cid,
user_prompt="Hello",
assistant_response="Hi there!",
raw_messages=result.new_messages(),
)
# Before the next run, get trimmed history
history = conv_mem.get_message_history(cid)
When FireflyAgent has memory attached, this is all automatic.
Working Memory¶
WorkingMemory is a scoped key-value store for facts and intermediate state:
from fireflyframework_agentic.memory import WorkingMemory
wm = WorkingMemory(scope_id="idp-session-42")
wm.set("doc_type", "invoice")
wm.set("vendor", "Acme Corp")
print(wm.get("doc_type")) # "invoice"
print(wm.to_dict()) # {"doc_type": "invoice", "vendor": "Acme Corp"}
# Render as a context block for prompt injection
print(wm.to_context_string())
# Working Memory:
# - doc_type: invoice
# - vendor: Acme Corp
Multiple WorkingMemory instances can share a backend while maintaining independent
namespaces:
from fireflyframework_agentic.memory import InMemoryStore, WorkingMemory
store = InMemoryStore()
agent_a_mem = WorkingMemory(store=store, scope_id="agent_a")
agent_b_mem = WorkingMemory(store=store, scope_id="agent_b")
agent_a_mem.set("key", "from A")
agent_b_mem.set("key", "from B")
assert agent_a_mem.get("key") == "from A" # Isolated
Storage Backends¶
InMemoryStore¶
Dict-backed, fast, non-persistent. Ideal for testing and short-lived sessions:
FileStore¶
JSON file persistence. Each namespace gets its own file:
SQLiteStore¶
Single-file SQLite persistence — durable and queryable without a separate server:
from fireflyframework_agentic.memory import SQLiteStore
store = SQLiteStore(path=".firefly_memory/memory.db")
For larger deployments the memory_backend config field also accepts "postgres" and
"mongodb" (configured via memory_postgres_url / memory_mongodb_url). Conversation
memory can auto-summarise evicted turns with a summariser built by
create_llm_summarizer(agent).
Custom Backends¶
Implement the MemoryStore protocol:
from fireflyframework_agentic.memory import MemoryStore, MemoryEntry
class RedisStore:
def save(self, namespace: str, entry: MemoryEntry) -> None: ...
def load(self, namespace: str) -> list[MemoryEntry]: ...
def load_by_key(self, namespace: str, key: str) -> MemoryEntry | None: ...
def delete(self, namespace: str, entry_id: str) -> None: ...
def clear(self, namespace: str) -> None: ...
MemoryManager¶
The MemoryManager facade is the object you attach to agents, delegation routers, and
pipelines:
from fireflyframework_agentic.memory import MemoryManager
mgr = MemoryManager(
max_conversation_tokens=32_000,
working_scope_id="main-session",
)
# Conversation
cid = mgr.new_conversation()
mgr.add_turn(cid, "hello", "hi", raw_messages)
history = mgr.get_message_history(cid)
# Working memory
mgr.set_fact("doc_type", "invoice")
mgr.get_fact("doc_type") # "invoice"
Forking¶
When delegating to a sub-agent or branching a pipeline, fork() creates a child
manager that shares conversation memory but gets independent working memory.
This means a delegated agent can store its own facts without overwriting the parent's:
# Parent memory has facts about the overall session
mgr.set_fact("session_id", "abc-123")
# Fork for a sub-agent — it gets its own working memory scope
# but can still read the same conversation history.
child = mgr.fork(working_scope_id="sub-agent-classify")
child.set_fact("classification", "invoice") # Only visible in child
assert mgr.get_fact("classification") is None # Parent is unaffected
assert child.get_fact("session_id") is None # Child has its own scope
Memory Integration Points¶
Memory flows through the framework in four ways. Understanding these is key to building applications where context is never lost:
1. Agent integration — When FireflyAgent has a MemoryManager attached,
calling run(prompt, conversation_id=cid) automatically loads conversation
history as message_history before the call and stores new messages after it.
You never touch message_history manually.
2. Delegation integration — DelegationRouter automatically forks memory
when routing to a sub-agent, so the delegated agent gets its own working-memory
scope while sharing conversation context.
3. Pipeline integration — When you create PipelineContext(memory=mgr),
both AgentStep and ReasoningStep propagate the memory to the agent and
pattern respectively. Facts stored in one step are readable in subsequent steps.
4. Reasoning integration — Pass memory=mgr as a keyword argument to
pattern.execute(agent, input, memory=mgr). The memory object becomes
available as state["memory"] inside all pattern hooks (_reason, _act, etc.),
so custom patterns can read and write working memory during reasoning iterations.
Configuration¶
Memory settings via environment variables:
export FIREFLY_AGENTIC_MEMORY_BACKEND=in_memory
export FIREFLY_AGENTIC_MEMORY_MAX_CONVERSATION_TOKENS=128000
export FIREFLY_AGENTIC_MEMORY_SUMMARIZE_THRESHOLD=10
export FIREFLY_AGENTIC_MEMORY_FILE_DIR=.firefly_memory
IDP Tie-In: Carrying Facts Across Pipeline Steps¶
Now let's see why memory matters for our IDP pipeline. When the classifier figures out that a document is an invoice, the extractor needs to know that — it selects different prompts for invoices vs. receipts. Working memory is the bridge:
from fireflyframework_agentic.memory import MemoryManager
# One memory manager for the entire pipeline run.
# Every step can read and write facts here.
memory = MemoryManager(working_scope_id="idp-run-001")
# Phase 1: The classifier stores what it learned.
# These facts persist across all subsequent pipeline steps.
classification = {"doc_type": "invoice", "language": "en", "page_count": 2}
memory.set_fact("doc_type", classification["doc_type"])
memory.set_fact("language", classification["language"])
memory.set_fact("page_count", classification["page_count"])
# Phase 3: The extractor reads what the classifier discovered.
# This works because both steps share the same MemoryManager.
doc_type = memory.get_fact("doc_type") # "invoice"
# Now we can pick the invoice-specific extraction prompt.
Chapter 9: Validation & Quality of Service¶
You've built an agent that extracts invoice data — but how do you trust the output? LLMs are probabilistic: they can hallucinate an invoice number that doesn't exist, format a date as "January 15th" instead of ISO 8601, or return an amount as a string instead of a number. In production, bad data propagates downstream and causes real damage.
The Validation module gives you a two-layer defence:
-
Layer 1 — Structural validation catches deterministic errors. You define rules (regex, range, format, enum, or custom functions) for each field, group them into an
OutputValidator, and run them against the extracted dict or Pydantic model. This is fast, cheap (no LLM call), and catches the most common failures: wrong date formats, out-of-range amounts, invalid invoice number patterns. -
Layer 2 — Quality of Service (QoS) catches statistical errors that rules can't see.
ConfidenceScorerasks the LLM to self-rate its own output.ConsistencyCheckerruns the same prompt multiple times and measures agreement — low agreement suggests hallucination.GroundingCheckerverifies that extracted values actually appear in the source document (no LLM needed).QoSGuardcomposes all three into a single pass/fail gate.
Both layers can be wired into an OutputReviewer that wraps an agent call with
automatic retry: if structural validation or QoS fails, the reviewer feeds the
errors back to the agent and re-runs — up to max_retries times.
Output Validation Rules¶
Rules are composable predicates that check a single field value:
from fireflyframework_agentic.validation.rules import (
RegexRule,
FormatRule,
RangeRule,
EnumRule,
CustomRule,
FieldValidator,
)
# Regex: field value must match a pattern.
# First arg is always the field name.
inv_rule = RegexRule("invoice_number", r"^INV-\d{4,10}$")
# Format: field matches a named format (email, url, date, uuid, iso_date)
date_rule = FormatRule("due_date", "iso_date")
# Range: numeric field within bounds
amount_rule = RangeRule("total_amount", min_value=0.01, max_value=10_000_000)
# Enum: field value is one of an allowed set
status_rule = EnumRule("status", ["approved", "rejected", "pending"])
# Custom: any predicate function
custom_rule = CustomRule(
"vendor_name",
lambda v: isinstance(v, str) and len(v.strip()) > 0,
description="Value must be a non-empty string",
)
Field Validator¶
Validates a single field against one or more rules:
validator = FieldValidator("invoice_number", [inv_rule])
# validate() takes the field's value directly, not a dict.
results = validator.validate("INV-001234")
assert all(r.passed for r in results)
Output Validator¶
Validates an entire structured output (dict or Pydantic model). The constructor takes a dict mapping field names to lists of rules:
from fireflyframework_agentic.validation.rules import OutputValidator
validator = OutputValidator({
"invoice_number": [RegexRule("invoice_number", r"^INV-\d{4,10}$")],
"vendor_name": [RegexRule("vendor_name", r".{2,}")],
"total_amount": [RangeRule("total_amount", min_value=0.01, max_value=10_000_000)],
"due_date": [FormatRule("due_date", "iso_date")],
"status": [EnumRule("status", ["approved", "rejected", "pending"])],
})
# validate() accepts a dict, Pydantic model, or any object with __dict__.
report = validator.validate(extracted_data)
if not report.valid:
for err in report.errors:
print(f" {err.field_name}: {err.message}")
Quality of Service (QoS)¶
The QoS module provides post-generation quality checks. Each checker requires an agent (for LLM-based evaluation) or operates purely on text (grounding).
Confidence Scorer¶
Asks the agent to self-evaluate its output confidence on a 0.0–1.0 scale:
from fireflyframework_agentic.validation.qos import ConfidenceScorer
# The scorer needs an agent — it sends a self-evaluation prompt to the LLM.
scorer = ConfidenceScorer(evaluator_agent)
score = await scorer.score("The invoice total is $1,234.56") # 0.0–1.0
Consistency Checker¶
Runs the same prompt multiple times and measures word-level agreement across outputs (Jaccard similarity):
from fireflyframework_agentic.validation.qos import ConsistencyChecker
# The checker needs an agent and a run count. It runs the prompt num_runs times.
checker = ConsistencyChecker(extractor_agent, num_runs=3)
score, outputs = await checker.check("What is the capital of France?")
# score: float (1.0 = all answers agree), outputs: list[str]
print(f"Consistency: {score:.2f} across {len(outputs)} runs")
Grounding Checker¶
Verifies that extracted field values actually appear in the source document — no agent needed, purely text-based:
from fireflyframework_agentic.validation.qos import GroundingChecker
checker = GroundingChecker(min_grounding_ratio=0.8)
# check() takes the source text and a dict of field_name → extracted_value.
score, field_map = checker.check(
source_text="Invoice #INV-001: Total Amount: $1,234.56. Vendor: Acme Corp.",
extracted_fields={"total": "$1,234.56", "vendor": "Acme Corp", "id": "INV-999"},
)
# score: float (fraction of grounded fields), field_map: dict[str, bool]
print(f"Grounding: {score:.2f}") # 0.67 (2 of 3 grounded)
print(f"Ungrounded: {[k for k, v in field_map.items() if not v]}") # ["id"]
QoS Guard¶
Composes all checks into a single gate. You build the individual checkers and pass them in:
from fireflyframework_agentic.validation.qos import (
QoSGuard, ConfidenceScorer, ConsistencyChecker, GroundingChecker,
)
# Build the individual checkers first.
guard = QoSGuard(
confidence_scorer=ConfidenceScorer(evaluator_agent),
consistency_checker=ConsistencyChecker(extractor_agent, num_runs=3),
grounding_checker=GroundingChecker(),
min_confidence=0.8,
min_consistency=0.6,
min_grounding=0.8,
)
# evaluate() takes the output string plus optional context for each check.
result = await guard.evaluate(
str(extracted_data),
prompt="Extract fields from the invoice.",
source_text=ocr_text,
extracted_fields={"total": "$1,234.56", "vendor": "Acme Corp"},
)
# result is a QoSResult with: .passed, .confidence, .consistency_score,
# .grounding_score, .details (dict with per-check info)
if result.passed:
print("Quality check passed")
else:
print(f"Failed: confidence={result.confidence:.2f}")
Output Reviewer¶
The OutputReviewer closes the loop between generation and validation. It wraps an
agent call with schema parsing + rule validation + automatic retry. When the LLM
produces output that fails Pydantic parsing or validation rules, the reviewer
automatically retries with a feedback prompt describing exactly what was wrong.
Basic Usage¶
from pydantic import BaseModel, Field
from fireflyframework_agentic.validation import OutputReviewer
class InvoiceData(BaseModel):
vendor: str
amount: float = Field(ge=0)
date: str
invoice_number: str | None = None
reviewer = OutputReviewer(output_type=InvoiceData, max_retries=3)
result = await reviewer.review(
agent,
"Extract invoice data from: Acme Corp, $1,234, 2026-01-15",
)
print(result.output) # InvoiceData(vendor="Acme Corp", amount=1234.0, ...)
print(result.attempts) # 1 if first try succeeded, 2+ if retries needed
With Validation Rules¶
Combine schema parsing with field-level rules:
from fireflyframework_agentic.validation import OutputReviewer, OutputValidator, EnumRule
validator = OutputValidator({"vendor": [EnumRule("vendor", ["Acme Corp", "Globex"])]})
reviewer = OutputReviewer(
output_type=InvoiceData,
validator=validator,
max_retries=2,
)
With Reasoning Patterns¶
Attach a reviewer to any reasoning pattern to validate the final output:
from fireflyframework_agentic.reasoning import ReActPattern
from fireflyframework_agentic.validation import OutputReviewer
reviewer = OutputReviewer(output_type=InvoiceData, max_retries=2)
pattern = ReActPattern(reviewer=reviewer)
result = await pattern.execute(agent, "Extract invoice data from the document.")
# Output is automatically validated and retried if needed
ReviewResult¶
The result contains:
output— The validated output.attempts— Total attempts (1 = first try succeeded).validation_report— The finalValidationReportif a validator was used.retry_history— List ofRetryAttemptobjects with attempt number, raw output, and error messages.
Rubric Reviewer (LLM-as-judge)¶
Where OutputReviewer enforces a schema and deterministic rules, RubricReviewer
evaluates free-form output against a list of natural-language criteria using a
separate grader agent. When criteria are unmet, it sends a revision prompt back to the
generator and loops (up to max_iterations), returning a ReviewResult:
from fireflyframework_agentic.validation import RubricReviewer
reviewer = RubricReviewer(
rubric=[
"All five invoice fields are present.",
"Amounts are numeric, not strings.",
"The due date is ISO 8601.",
],
grader=evaluator_agent, # optional; a default grader is created otherwise
max_iterations=3,
)
result = await reviewer.review(extractor_agent, f"Extract fields from:\n{ocr_text}")
You can also load the rubric from a Markdown bullet list with
RubricReviewer.from_rubric_file("rubric.md").
IDP Tie-In: Validating Extracted Invoice Data¶
For our IDP pipeline, we combine structural validation, QoS checks, and the output reviewer:
from fireflyframework_agentic.validation.rules import (
OutputValidator, FieldValidator, RegexRule, FormatRule, RangeRule,
)
from fireflyframework_agentic.validation.qos import QoSGuard
from fireflyframework_agentic.validation import OutputReviewer
from pydantic import BaseModel, Field
# Define the expected output schema
class InvoiceExtraction(BaseModel):
invoice_number: str
vendor_name: str
total_amount: float = Field(ge=0)
due_date: str
line_items: list[dict]
# Structural validation rules — dict of field_name -> list of rules
invoice_validator = OutputValidator({
"invoice_number": [RegexRule("invoice_number", r"^INV-\d{4,10}$")],
"vendor_name": [RegexRule("vendor_name", r".{2,}")],
"total_amount": [RangeRule("total_amount", min_value=0.01, max_value=10_000_000)],
"due_date": [FormatRule("due_date", "iso_date")],
})
# Output reviewer with schema + rules + retry
reviewer = OutputReviewer(
output_type=InvoiceExtraction,
validator=invoice_validator,
max_retries=3,
)
# QoS guard — compose individual checkers with thresholds
from fireflyframework_agentic.validation.qos import (
QoSGuard, ConfidenceScorer, ConsistencyChecker, GroundingChecker,
)
qos = QoSGuard(
confidence_scorer=ConfidenceScorer(evaluator_agent),
consistency_checker=ConsistencyChecker(extractor_agent, num_runs=2),
grounding_checker=GroundingChecker(),
min_confidence=0.8,
min_consistency=0.6,
min_grounding=0.8,
)
Part III — Orchestration & Operations¶
Chapter 10: Pipeline¶
So far we have individual agents (classify, digitise, extract), tools (OCR, vendor lookup), reasoning patterns (Plan-and-Execute, Reflexion), validation rules, and memory. Each piece works in isolation — but a real IDP system needs to wire them together into a single, reliable flow: classify → digitise → extract → validate → assemble.
The Pipeline module does exactly that. It models your processing flow as a Directed Acyclic Graph (DAG) where nodes are processing steps and edges define data flow. The engine schedules nodes by topological level — nodes at the same level run concurrently — and handles retries, timeouts, and conditional execution automatically.
Pipeline Execution Architecture¶
The following diagram shows how the pipeline engine executes a DAG:
graph LR
subgraph "Pipeline Engine"
B["PipelineBuilder"] --> DAG["DAG\n(topological sort)"]
DAG --> L0["Level 0\n(no dependencies)"]
DAG --> L1["Level 1\n(depends on L0)"]
DAG --> L2["Level 2\n(depends on L1)"]
end
subgraph "Step Executors"
AS["AgentStep\n(runs FireflyAgent)"]
RS["ReasoningStep\n(runs pattern + agent)"]
CS["CallableStep\n(runs async function)"]
FO["FanOutStep\n(splits input)"]
FI["FanInStep\n(merges outputs)"]
end
subgraph "Context & Results"
PC["PipelineContext\n(inputs, metadata, memory)"]
PR["PipelineResult\n(outputs, trace, duration)"]
end
L0 --> AS
L1 --> RS
L2 --> CS
L0 & L1 & L2 -.->|concurrent within level| PC
PC --> PR
Core Concepts¶
A pipeline is a Directed Acyclic Graph (DAG) where:
- Nodes are processing steps (call an agent, run a reasoning pattern, execute a function).
- Edges define data flow and execution order.
- The engine schedules nodes by topological level — nodes at the same level run concurrently.
The Pipeline Builder¶
The fluent PipelineBuilder is the recommended way to construct pipelines:
from fireflyframework_agentic.pipeline.builder import PipelineBuilder
from fireflyframework_agentic.pipeline.steps import AgentStep, CallableStep, ReasoningStep
engine = (
PipelineBuilder("invoice-pipeline")
.add_node("classify", AgentStep(classifier_agent))
.add_node("extract", AgentStep(extractor_agent))
.add_node("validate", CallableStep(validate_fn))
.chain("classify", "extract", "validate")
.build()
)
result = await engine.run(inputs="<invoice text>")
Step Executors¶
The built-in executors (all implementing StepExecutor) cover most scenarios:
AgentStep— Runs aFireflyAgentwith the input as prompt.ReasoningStep— Runs a reasoning pattern through an agent.CallableStep— Wraps anyasyncfunction(context, inputs) -> output.FanOutStep— Splits input into a list for parallel downstream processing.FanInStep— Merges outputs from multiple upstream nodes.BranchStep— Routes to one of several downstream paths by a router function.BatchLLMStep— Runs an agent over a batch of inputs concurrently.EmbeddingStep— Embeds text via aBaseEmbedder(see Embeddings & Vector Stores).RetrievalStep— Retrieves nearest neighbours from a vector store:RetrievalStep(store, *, embedder=None, top_k=5, input_key="input").
Parallel Execution (Fan-Out / Fan-In)¶
Process multiple items concurrently:
from fireflyframework_agentic.pipeline.steps import FanOutStep, FanInStep
engine = (
PipelineBuilder("parallel-ocr")
.add_node("split", FanOutStep(lambda doc: doc.pages))
.add_node("ocr_1", AgentStep(ocr_agent))
.add_node("ocr_2", AgentStep(ocr_agent))
.add_node("merge", FanInStep())
.add_edge("split", "ocr_1")
.add_edge("split", "ocr_2")
.add_edge("ocr_1", "merge", input_key="page_1")
.add_edge("ocr_2", "merge", input_key="page_2")
.build()
)
Conditional Execution¶
Gate nodes with a condition function. If the condition returns False, the node is
skipped:
from fireflyframework_agentic.pipeline.dag import DAGNode
dag.add_node(DAGNode(
node_id="ocr",
step=AgentStep(ocr_agent),
condition=lambda ctx: ctx.metadata.get("needs_ocr", False),
))
Retries and Timeouts¶
Configure per-node resilience:
engine = (
PipelineBuilder("robust-pipeline")
.add_node("extract", AgentStep(extractor_agent), retry_max=3, timeout_seconds=30.0)
.build()
)
Retries use linear backoff. On exhaustion, the node fails and the pipeline reports
success=False.
Pipeline Context¶
PipelineContext is the shared data bus flowing through the DAG:
from fireflyframework_agentic.pipeline.context import PipelineContext
from fireflyframework_agentic.memory import MemoryManager
memory = MemoryManager(working_scope_id="invoice-run-42")
ctx = PipelineContext(
inputs=document_bytes,
metadata={"source": "email", "tenant": "acme-corp"},
memory=memory,
)
result = await engine.run(context=ctx)
It carries: inputs, metadata, correlation_id, and results from completed upstream
nodes. When a MemoryManager is attached, AgentStep and ReasoningStep
automatically propagate memory to agents and patterns.
Pipeline Result¶
PipelineResult aggregates all outcomes:
result = await engine.run(context=ctx)
if result.success:
print(result.final_output)
print(f"Completed in {result.total_duration_ms:.0f} ms")
else:
print(f"Failed nodes: {result.failed_nodes}")
# Inspect individual nodes
for node_id, node_result in result.outputs.items():
print(f" {node_id}: {'ok' if node_result.success else 'FAILED'}")
# Execution trace for observability
for entry in result.execution_trace:
print(f" [{entry.node_id}] {entry.status} ({entry.duration_ms:.0f} ms)")
Manual DAG Construction¶
For full control, build the DAG directly:
from fireflyframework_agentic.pipeline.dag import DAG, DAGNode, DAGEdge
from fireflyframework_agentic.pipeline.engine import PipelineEngine
dag = DAG("my-pipeline")
dag.add_node(DAGNode(node_id="step_a", step=my_step))
dag.add_node(DAGNode(node_id="step_b", step=other_step))
dag.add_edge(DAGEdge(source="step_a", target="step_b"))
engine = PipelineEngine(dag)
result = await engine.run(inputs="hello")
Embeddings & Vector Stores¶
EmbeddingStep and RetrievalStep build on two reusable framework modules. The
embeddings package ships BaseEmbedder/EmbedderRegistry with 8 provider backends
(OpenAI, Azure, Cohere, Google, Mistral, Voyage, Bedrock, Ollama). The vectorstores
package ships BaseVectorStore with 7 backends — InMemoryVectorStore,
ChromaVectorStore, PineconeVectorStore, QdrantVectorStore, PgVectorVectorStore,
and SqliteVecVectorStore — plus a scoping layer (ScopedVectorStore,
TenantScopedVectorStore, scope_namespace, parse_scope_namespace) for multi-tenant
isolation:
from fireflyframework_agentic.vectorstores import InMemoryVectorStore
from fireflyframework_agentic.pipeline import RetrievalStep
store = InMemoryVectorStore() # async upsert / search / search_text / delete
retrieve = RetrievalStep(store, top_k=5, input_key="input")
The framework ships these as building blocks; it does not bundle a turnkey RAG/corpus
agent. BaseEmbedder, EmbedderRegistry, BaseVectorStore, and InMemoryVectorStore
are re-exported from the top-level fireflyframework_agentic package.
State-Based Pipelines, Checkpointing & Audit Logs¶
Beyond the port-based DAG above, PipelineBuilder has a state-based mode: pass a
Pydantic state= model and nodes become async (state) -> dict | None | Pause | Send |
list[Send] functions over a typed shared state. Branching is a single .branch(source,
router) call, and state reducers (append, extend, merge_dict, replace)
control how each node's returned dict is merged into the state.
Two control signals shape the flow: Pause suspends a run (resume later), and Send
dispatches dynamic fan-out work. For durability, pass a Checkpointer
(FileCheckpointer(root=...) writes CheckpointRecords) so a failed run can resume or
start mid-pipeline. An AuditLog records every node transition — choose FileAuditLog,
LoggingAuditLog, OtelAuditLog, or QueryableAuditLog (each entry is an AuditEntry):
from fireflyframework_agentic.pipeline import (
PipelineBuilder, FileCheckpointer, OtelAuditLog,
)
engine = PipelineBuilder(
"stateful-idp",
state=IdpState, # a pydantic.BaseModel
checkpointer=FileCheckpointer(root=".checkpoints"),
audit_log=OtelAuditLog(),
).build()
IDP Tie-In: Wiring the Complete Pipeline¶
Here's our IDP pipeline as a DAG with all five phases:
from fireflyframework_agentic.pipeline.builder import PipelineBuilder
from fireflyframework_agentic.pipeline.steps import AgentStep, ReasoningStep, CallableStep
from fireflyframework_agentic.pipeline.context import PipelineContext
from fireflyframework_agentic.reasoning import PlanAndExecutePattern
from fireflyframework_agentic.memory import MemoryManager
# Step functions for non-agent nodes
async def validate_step(context, inputs):
extracted = inputs.get("input", {})
report = invoice_validator.validate(extracted)
context.metadata["validation_report"] = report
return {"extracted": extracted, "valid": report.valid, "errors": report.errors}
async def assemble_step(context, inputs):
data = inputs.get("input", {})
return {
"document_type": context.metadata.get("doc_type", "unknown"),
"extracted_fields": data.get("extracted", {}),
"validation_passed": data.get("valid", False),
}
# Build the DAG
idp_pipeline = (
PipelineBuilder("idp-pipeline")
.add_node("classify", AgentStep(classifier_agent))
.add_node("digitise", AgentStep(ocr_agent), retry_max=2, timeout_seconds=60)
.add_node(
"extract",
ReasoningStep(PlanAndExecutePattern(max_steps=15), extractor_agent),
)
.add_node("validate", CallableStep(validate_step))
.add_node("assemble", CallableStep(assemble_step))
.chain("classify", "digitise", "extract", "validate", "assemble")
.build()
)
# Execute
memory = MemoryManager(working_scope_id="idp-session")
ctx = PipelineContext(
inputs=document_bytes,
metadata={"source": "email-inbox", "tenant_id": "acme-corp"},
memory=memory,
)
result = await idp_pipeline.run(context=ctx)
if result.success:
print(f"Document type: {result.final_output['document_type']}")
print(f"Extracted fields: {result.final_output['extracted_fields']}")
else:
print(f"Pipeline failed at: {result.failed_nodes}")
Chapter 11: Observability¶
Your pipeline is running in production, processing thousands of invoices a day. Then latency spikes. Or accuracy drops. Or a customer reports a missing field. Without observability you're flying blind — you have no idea which agent is slow, which tool is failing, or how many tokens you're burning.
The Observability module wraps OpenTelemetry and gives you three primitives out of the box: tracing (distributed spans across agents, tools, and pipeline steps), metrics (counters and histograms for tokens, latency, cost, errors, and reasoning depth), and events (structured logs for significant occurrences). When observability is enabled, the framework instruments agent runs automatically — you get spans for free.
Tracing¶
FireflyTracer wraps the OpenTelemetry Tracer and adds GenAI-specific attributes. It
exposes purpose-built context managers — agent_span(agent_name, *, model=..., **attrs),
tool_span(...), reasoning_span(...) — plus a generic custom_span(name, **attrs), an
event(name, **attrs) annotation helper, and a static set_error(span, error):
from fireflyframework_agentic.observability import FireflyTracer
tracer = FireflyTracer(service_name="idp-service")
with tracer.agent_span("classifier", model="openai:gpt-4o") as span:
result = await classifier_agent.run("Classify this document")
span.set_attribute("tokens.total", 150)
# Or a generic span with arbitrary attributes:
with tracer.custom_span("agent.run", phase="classify") as span:
...
The @traced Decorator¶
Automatically create a span around any function:
from fireflyframework_agentic.observability import traced
@traced(name="classify_document")
async def classify_document(text: str) -> dict:
return await classifier_agent.run(text)
Metrics¶
FireflyMetrics records GenAI-specific OpenTelemetry instruments via purpose-built
methods (each takes keyword args like agent=, model=, operation=, pattern= — not
a generic labels= dict):
from fireflyframework_agentic.observability import FireflyMetrics
metrics = FireflyMetrics(service_name="idp-service")
metrics.record_tokens(150, agent="classifier", model="openai:gpt-4o")
metrics.record_prompt_tokens(100, agent="classifier", model="openai:gpt-4o")
metrics.record_completion_tokens(50, agent="classifier", model="openai:gpt-4o")
metrics.record_latency(142.5, operation="classify", agent="classifier")
metrics.record_cost(0.0021, agent="classifier", model="openai:gpt-4o")
metrics.record_error(operation="classify", agent="classifier", error_type="Timeout")
metrics.record_reasoning_depth(4, pattern="react")
The @metered Decorator¶
@metered's first parameter is operation (it records latency, and an error on
exception). Use a positional string or operation=:
from fireflyframework_agentic.observability import metered
@metered("extraction")
async def extract_fields(text: str) -> dict:
return await extractor_agent.run(text)
Events¶
FireflyEvents emits structured events (logged as JSON-serialisable dicts) via typed
methods — there is no generic emit():
from fireflyframework_agentic.observability import FireflyEvents
events = FireflyEvents()
events.agent_started("classifier", model="openai:gpt-4o")
events.agent_completed("classifier", tokens=150, latency_ms=250)
events.tool_executed("vendor_lookup", success=True, latency_ms=12)
events.reasoning_step("react", step=1, step_type="thought")
events.agent_error("classifier", error="timeout")
Exporter Configuration¶
The framework emits spans and metrics purely through the OpenTelemetry API; it
does not configure the OTel SDK or any exporters itself. The host application
owns OTel SDK and exporter setup — wire up your TracerProvider,
MeterProvider, and the exporters (OTLP collector, console, etc.) however your
deployment requires, and the framework's telemetry flows through the globally
configured providers automatically.
Configuration via environment variables:
Usage Tracking & Cost Estimation¶
The framework automatically tracks token usage and estimates cost for every agent
run, reasoning step, and pipeline execution. UsageTracker accumulates
UsageRecord objects with input/output tokens, cost, latency, and model details.
from fireflyframework_agentic.observability import default_usage_tracker
# After running agents, inspect accumulated usage
summary = default_usage_tracker.get_summary()
print(f"Total tokens: {summary.total_tokens}")
print(f"Total cost: ${summary.total_cost_usd:.4f}")
print(f"Requests: {summary.total_requests}")
# Filter by agent or pipeline correlation ID
agent_summary = default_usage_tracker.get_summary_for_agent("extractor")
pipeline_summary = default_usage_tracker.get_summary_for_correlation("run-123")
Cost is computed by a resolver chain (observability/cost_resolvers.py). The default
chain DEFAULT_RESOLVERS tries provider_reported_cost (uses cost the provider already
reported) then genai_prices_cost (prices via the bundled genai-prices data). The
entry point is resolve_cost:
from fireflyframework_agentic.observability import resolve_cost, CostContext
cost = resolve_cost(
CostContext(model="openai:gpt-4o", input_tokens=1000, output_tokens=500)
)
# Returns the USD cost, or None when no resolver can price the model.
When config.cost_strict=True, an unpriceable model raises UnknownModelCostError
instead of returning None. UsageTracker applies this chain automatically for every
recorded run.
Budget Enforcement¶
Set a hard budget limit; a warning is logged when costs exceed it:
Pipeline results include aggregated usage automatically:
result = await engine.run(context=ctx)
if result.usage:
print(f"Pipeline cost: ${result.usage.total_cost_usd:.4f}")
Automatic Integration¶
When an agent is invoked, the framework automatically creates trace spans, records metrics, emits events, and tracks usage/cost. You don't need to instrument agent code manually unless you want additional detail.
IDP Tie-In: Instrumenting the Pipeline¶
from fireflyframework_agentic.observability import FireflyTracer, traced
tracer = FireflyTracer(service_name="idp-service")
@traced(name="idp.process_document")
async def process_document(document_bytes: bytes) -> dict:
with tracer.custom_span("idp.run") as span:
ctx = PipelineContext(inputs=document_bytes)
result = await idp_pipeline.run(context=ctx)
span.set_attribute("idp.success", result.success)
# Per-agent token, latency, and cost metrics are recorded automatically
# when observability is enabled; inspect them via default_usage_tracker.
return result.final_output
Chapter 12: Explainability¶
In regulated industries — finance, healthcare, legal — "the model said so" is not an acceptable answer. Auditors, compliance officers, and customers need to see why the agent classified a document as an invoice, why it chose one vendor name over another, and what alternatives it considered. The Explainability module provides four building blocks: a trace recorder that captures every decision, an explanation generator that turns raw records into natural-language narratives, an append-only audit trail, and a report builder that compiles everything into markdown or JSON.
Trace Recorder¶
TraceRecorder captures decision records during execution. The method is
record(category, *, agent="", detail=None, input_summary="", output_summary=""), and
recorded items are exposed via the .records property:
from fireflyframework_agentic.explainability import TraceRecorder
recorder = TraceRecorder()
recorder.record(
"reasoning_step",
agent="extractor",
detail={"chosen": "regex_match", "alternatives": ["llm_extraction", "template_match"]},
input_summary="raw OCR text",
output_summary="invoice_number=INV-2026-001",
)
Each DecisionRecord has these fields: timestamp, category, agent, detail,
input_summary, and output_summary.
Explanation Generator¶
Transforms raw decision records into a natural-language narrative:
from fireflyframework_agentic.explainability import ExplanationGenerator
generator = ExplanationGenerator()
explanation = generator.generate(recorder.records)
print(explanation)
# Multi-line "Decision Explanation" narrative walking each record chronologically.
Audit Trail¶
An append-only log. Each entry captures an actor, action, resource, and outcome:
from fireflyframework_agentic.explainability import AuditTrail
trail = AuditTrail()
trail.append("extractor", "field_extraction", resource="invoice_number", outcome="success")
# Inspect or export
print(len(trail)) # number of entries
print(trail.entries) # list[AuditEntry]
print(trail.export_json()) # JSON string
Report Builder¶
Compile records into a structured ExplainabilityReport, then render it:
from fireflyframework_agentic.explainability import ReportBuilder
builder = ReportBuilder(title="Invoice Extraction Report")
report = builder.build(recorder.records) # -> ExplainabilityReport
# Render via static helpers
markdown = ReportBuilder.to_markdown(report) # for documentation/review
json_data = ReportBuilder.to_json(report) # for programmatic consumption
IDP Tie-In: Audit Trail for Invoice Extraction¶
In our IDP pipeline, we record why each field was extracted the way it was:
from fireflyframework_agentic.explainability import (
TraceRecorder, ExplanationGenerator, AuditTrail, ReportBuilder,
)
recorder = TraceRecorder()
# During extraction, record decisions
recorder.record(
"field_extraction",
agent="field_extractor",
detail={"chosen": "INV-2026-001", "alternatives": ["INV-2026-01", "2026-001"]},
output_summary="Matched the INV-NNNN pattern with highest confidence.",
)
recorder.record(
"field_extraction",
agent="field_extractor",
detail={"chosen": "1234.56", "alternatives": ["1,234.56", "$1234.56"]},
output_summary="Normalised currency format to numeric value.",
)
# Generate explanation
generator = ExplanationGenerator()
explanation = generator.generate(recorder.records)
# Audit trail — append (actor, action) entries
trail = AuditTrail()
trail.append("field_extractor", "extract_invoice_number", resource="INV-2026-001")
trail.append("field_extractor", "extract_amount", resource="1234.56")
print(trail.export_json())
# Build and render the report
report = ReportBuilder(title="Invoice Extraction Report").build(recorder.records)
print(ReportBuilder.to_markdown(report))
Part IV — Experimentation¶
Chapter 13: Experiments¶
You've built a working extraction agent — but is GPT-4o the right model? Would Claude give better results? What about lowering the temperature from 0.5 to 0.1? Answering these questions by hand (run each variant, eyeball the output, repeat) doesn't scale. The Experiments module gives you a structured way to define variants, run them against the same inputs, collect metrics (latency, token usage, output length), and compare results — all in a few lines of code.
Defining an Experiment¶
An Experiment holds variants plus a dataset of test inputs. Each Variant carries
name, model, temperature, and a parameters dict:
from fireflyframework_agentic.experiments import Experiment, Variant
experiment = Experiment(
name="extraction_model_comparison",
hypothesis="Claude 3.5 Sonnet beats GPT-4o on invoice extraction.",
variants=[
Variant(name="gpt4o", model="openai:gpt-4o"),
Variant(name="claude", model="anthropic:claude-3-5-sonnet"),
],
dataset=[
"Extract fields from: Invoice #INV-001, Acme Corp, $500, 2026-01-15",
"Extract fields from: Invoice #INV-002, Globex, $1,200, 2026-02-28",
],
)
Running an Experiment¶
ExperimentRunner.run(experiment, agent_factory, *, context=None) runs every variant
against the experiment's dataset. The second positional argument is an
agent_factory callable (variant) -> agent that builds an agent configured for that
variant. It returns a list[VariantResult]:
from fireflyframework_agentic.experiments import ExperimentRunner
from fireflyframework_agentic.agents import FireflyAgent
def make_agent(variant):
return FireflyAgent(
name=f"extractor-{variant.name}",
model=variant.model,
instructions="You are an invoice data extraction specialist.",
)
runner = ExperimentRunner()
results = await runner.run(experiment, make_agent)
Tracking Results¶
ExperimentTracker stores VariantResults in memory with optional JSON persistence
(pass storage_path=). ExperimentRunner already records each result into its tracker;
you can also record manually and export:
from fireflyframework_agentic.experiments import ExperimentTracker
tracker = ExperimentTracker(storage_path="./experiment_results.json")
for result in results:
tracker.record(result)
# Query and export
subset = tracker.get_by_experiment("extraction_model_comparison")
print(tracker.export_json())
Comparing Variants¶
from fireflyframework_agentic.experiments import VariantComparator
comparator = VariantComparator()
metrics = comparator.compare(results)
print(comparator.summary(results))
# "gpt4o: avg_latency=1.2s, avg_output_len=145 | claude: avg_latency=0.9s, ..."
IDP Tie-In: Finding the Best Extraction Model¶
experiment = Experiment(
name="idp_extraction_ab_test",
hypothesis="Lower temperature improves IDP invoice extraction accuracy.",
variants=[
Variant(name="gpt4o", model="openai:gpt-4o", temperature=0.1),
Variant(name="gpt4o_warm", model="openai:gpt-4o", temperature=0.5),
Variant(name="claude", model="anthropic:claude-3-5-sonnet"),
],
dataset=test_invoices,
)
def make_agent(variant):
return FireflyAgent(
name=f"extractor-{variant.name}",
model=variant.model,
instructions="You are an invoice data extraction specialist.",
)
results = await ExperimentRunner().run(experiment, make_agent)
print(VariantComparator().summary(results))
Chapter 14: Lab¶
The Experiments module helps you compare variants — but sometimes you just need a sandbox to poke at an agent interactively, run quick benchmarks, compare side-by-side outputs, or evaluate accuracy against a labelled dataset. The Lab module is that sandbox. It provides interactive sessions (REPL-like), benchmarking helpers, model comparison tables, evaluation datasets, and pluggable scorers — everything you need to iterate on agent quality before going to production.
Interactive Sessions¶
Test an agent conversationally:
from fireflyframework_agentic.lab import LabSession
session = LabSession(name="extraction-dev", agent=extractor_agent)
response = await session.interact("Extract fields from: Invoice #INV-001...")
print(response)
# Review history
for entry in session.history:
print(f"[{entry.timestamp}] {entry.prompt} → {entry.response}")
Benchmarking¶
Measure agent performance across a set of prompts:
from fireflyframework_agentic.lab import Benchmark
bench = Benchmark(inputs=[
"Extract from: Invoice #INV-001, Acme Corp, $500",
"Extract from: Invoice #INV-002, Globex, $1,200",
"Extract from: Invoice #INV-003, Initech, $3,456.78",
])
result = await bench.run(extractor_agent)
print(f"Avg latency: {result.avg_latency_ms:.1f} ms")
print(f"P95 latency: {result.p95_latency_ms:.1f} ms")
Side-by-Side Comparison¶
Compare multiple agents on the same prompts:
from fireflyframework_agentic.lab import ModelComparison
comparison = ModelComparison(prompts=[
"Extract from: Invoice #INV-001, Acme Corp, $500",
])
entries = await comparison.compare({
"gpt4o": extractor_gpt4o,
"claude": extractor_claude,
})
for entry in entries:
for agent_name, response in entry.responses.items():
print(f" {agent_name}: {response}")
Datasets¶
Manage test inputs and expected outputs:
from fireflyframework_agentic.lab import EvalDataset, EvalCase
dataset = EvalDataset(cases=[
EvalCase(
input="Extract from: Invoice #INV-001, Acme Corp, $500, 2026-01-15",
expected_output='{"invoice_number": "INV-001", "vendor_name": "Acme Corp", "total_amount": 500.0}',
),
])
# Or load from a JSON file
dataset = EvalDataset.from_json("test_data/invoices.json")
Evaluators¶
Run an agent against a dataset with a pluggable scorer:
from fireflyframework_agentic.lab import EvalOrchestrator
# Custom scorer for extraction accuracy
def extraction_scorer(expected: str, actual: str) -> float:
import json
try:
exp = json.loads(expected)
act = json.loads(actual)
matching = sum(1 for k in exp if exp.get(k) == act.get(k))
return matching / len(exp) if exp else 0.0
except (json.JSONDecodeError, AttributeError):
return 0.0
orchestrator = EvalOrchestrator(scorer=extraction_scorer)
report = await orchestrator.evaluate(extractor_agent, dataset)
print(f"Accuracy: {report.avg_score:.1%} across {report.total_cases} cases")
IDP Tie-In: Benchmarking Extraction Accuracy¶
# Load a dataset of real invoices with expected outputs
dataset = EvalDataset.from_json("test_data/invoices.json")
# Benchmark with custom extraction scorer
orchestrator = EvalOrchestrator(scorer=extraction_scorer)
report = await orchestrator.evaluate(extractor_agent, dataset)
print(f"Extraction accuracy: {report.avg_score:.1%}")
Part V — Advanced¶
Chapter 15: Template Agents¶
By now you've written several agents from scratch — classifier, extractor, OCR. Each time you had to think about the system prompt, output type, and registration. But many agent patterns are universal: summarise text, classify into categories, extract structured data, hold a conversation, route to sub-agents. The framework ships five template agents as factory functions that encode best practices for each pattern. You provide the domain-specific bits (categories, schemas, personality), and the factory handles prompt engineering, output typing, and registry registration.
Summarizer¶
from fireflyframework_agentic.agents.templates import create_summarizer_agent
agent = create_summarizer_agent(
max_length="short", # concise | short | medium | detailed
style="technical", # professional | casual | technical | academic
output_format="bullets", # paragraph | bullets | numbered
model="openai:gpt-4o",
)
result = await agent.run("Long invoice description text here...")
Classifier¶
Returns a structured ClassificationResult with category, confidence, and reasoning:
from fireflyframework_agentic.agents.templates import create_classifier_agent
agent = create_classifier_agent(
categories=["invoice", "receipt", "contract", "form"],
descriptions={
"invoice": "Bills requesting payment for goods or services",
"receipt": "Proof of payment or purchase",
"contract": "Legal agreements between parties",
"form": "Fillable forms and applications",
},
model="openai:gpt-4o",
)
result = await agent.run("Invoice from Acme Corp, Amount Due: $1,234.56")
# result.output → ClassificationResult(category="invoice", confidence=0.95, ...)
Extractor¶
Extracts structured data into a user-provided Pydantic model:
from pydantic import BaseModel
from fireflyframework_agentic.agents.templates import create_extractor_agent
class Invoice(BaseModel):
vendor: str
amount: float
date: str
invoice_number: str | None = None
agent = create_extractor_agent(
Invoice,
field_descriptions={
"vendor": "The company that issued the invoice",
"amount": "Total monetary amount",
},
model="openai:gpt-4o",
)
result = await agent.run("Invoice from Acme Corp, $1,234.56, 2026-01-15")
# result.output → Invoice(vendor="Acme Corp", amount=1234.56, ...)
Conversational Agent¶
Memory-enabled multi-turn assistant:
from fireflyframework_agentic.agents.templates import create_conversational_agent
from fireflyframework_agentic.memory import MemoryManager
memory = MemoryManager(max_conversation_tokens=32_000)
agent = create_conversational_agent(
personality="friendly and concise",
domain="accounts payable",
memory=memory,
model="openai:gpt-4o",
)
cid = memory.new_conversation()
result = await agent.run("I need help with invoice INV-001.", conversation_id=cid)
result = await agent.run("What's the payment status?", conversation_id=cid)
Router Agent¶
Intent-based routing to child agents:
from fireflyframework_agentic.agents.templates import create_router_agent
agent = create_router_agent(
agent_map={
"invoice_processor": "Handles invoice extraction and validation",
"receipt_processor": "Handles receipt scanning and categorisation",
"support": "General questions about the IDP system",
},
fallback_agent="support",
model="openai:gpt-4o",
)
result = await agent.run("Process this invoice from Acme Corp")
# result.output → RoutingDecision(target_agent="invoice_processor", confidence=0.92, ...)
Common Parameters¶
All template factories accept:
- name — Agent name for the registry (sensible defaults provided).
- model — LLM model string; falls back to framework default.
- extra_instructions — Text appended to the system prompt.
- tools — Additional tools to attach.
- auto_register — Set to
Falseto skip registry registration.
IDP Tie-In: Using Templates Instead of Hand-Rolled Agents¶
Replace our manual agents with templates for cleaner code:
from fireflyframework_agentic.agents.templates import (
create_classifier_agent,
create_extractor_agent,
)
# Phase 1: Use the built-in classifier template
classifier_agent = create_classifier_agent(
categories=["invoice", "receipt", "contract", "form"],
name="document_classifier",
model="openai:gpt-4o",
)
# Phase 3: Use the built-in extractor template
extractor_agent = create_extractor_agent(
InvoiceExtraction,
field_descriptions={
"invoice_number": "Format: INV-NNNN",
"vendor_name": "Company that issued the invoice",
"total_amount": "Total amount due",
"due_date": "Payment due date in ISO format",
},
name="field_extractor",
model="openai:gpt-4o",
)
Chapter 16: Multi-Agent Delegation¶
Not every document is an invoice. Your IDP system might receive receipts, contracts, and forms — each requiring a specialised agent with different prompts, tools, and validation rules. Instead of building one mega-agent that tries to do everything, you build specialised agents and let a delegation router decide which one handles each request. The router picks an agent based on a strategy (round-robin for load balancing, capability-based for expertise matching), delegates the work, and optionally forks memory so the sub-agent gets its own working-memory scope.
Delegation Architecture¶
graph LR
REQ["Incoming Request"] --> ROUTER["DelegationRouter"]
subgraph Strategy
RR["RoundRobinStrategy\n(load balance)"]
CAP["CapabilityStrategy\n(match by tag)"]
end
ROUTER --> RR
ROUTER --> CAP
subgraph Agent Pool
A1["invoice_extractor"]
A2["receipt_extractor"]
A3["contract_extractor"]
end
RR --> A1 & A2 & A3
CAP -->|tag match| A1
subgraph Memory
MEM["MemoryManager"]
FORK["fork()"]
CHILD["Child Scope"]
end
ROUTER -.->|auto fork| FORK
FORK --> CHILD
CHILD --> A1
Delegation Router¶
from fireflyframework_agentic.agents.delegation import DelegationRouter, RoundRobinStrategy
router = DelegationRouter(
agents=[agent_a, agent_b, agent_c],
strategy=RoundRobinStrategy(),
)
result = await router.route("Process this document.")
Round Robin Strategy¶
Distributes requests evenly across a pool of agents. Useful for load balancing when all agents have equivalent capabilities:
from fireflyframework_agentic.agents.delegation import RoundRobinStrategy
strategy = RoundRobinStrategy()
router = DelegationRouter([agent_1, agent_2, agent_3], strategy)
# Request 1 → agent_1, Request 2 → agent_2, Request 3 → agent_3, Request 4 → agent_1...
Capability-Based Strategy¶
Selects the first agent whose tags include a required capability:
from fireflyframework_agentic.agents.delegation import CapabilityStrategy
strategy = CapabilityStrategy(required_tag="invoice_extraction")
router = DelegationRouter([invoice_agent, receipt_agent], strategy)
result = await router.route("Extract invoice data.")
# → Routed to invoice_agent (which has the "invoice_extraction" tag)
Other Strategies¶
The framework ships 7 delegation strategies, all importable from
fireflyframework_agentic.agents.delegation (and re-exported from
fireflyframework_agentic.agents):
RoundRobinStrategy— even load balancing.CapabilityStrategy— match by required tag.ContentBasedStrategy— route by keywords in the request content.CostAwareStrategy— prefer the lowest-cost capable agent.ChainStrategy— run a fixed chain of agents in order.FallbackStrategy— try agents in order until one succeeds.WeightedStrategy— weighted random selection.
Memory with Delegation¶
When a MemoryManager is attached, delegated agents receive a forked memory scope:
from fireflyframework_agentic.memory import MemoryManager
memory = MemoryManager(working_scope_id="main")
router = DelegationRouter([agent_a, agent_b], RoundRobinStrategy(), memory=memory)
result = await router.route("Process this.")
# The delegated agent gets its own working memory scope
IDP Tie-In: Routing Documents to Specialised Agents¶
from fireflyframework_agentic.agents.delegation import DelegationRouter, CapabilityStrategy
# Specialised agents for different document types
invoice_agent = create_extractor_agent(InvoiceSchema, name="invoice_extractor")
receipt_agent = create_extractor_agent(ReceiptSchema, name="receipt_extractor")
contract_agent = create_extractor_agent(ContractSchema, name="contract_extractor")
# Route based on capability
strategy = CapabilityStrategy(required_tag="invoice")
router = DelegationRouter(
agents=[invoice_agent, receipt_agent, contract_agent],
strategy=strategy,
)
Chapter 17: Plugin System¶
As your application grows, you'll want to share agents, tools, and reasoning patterns across projects — or let third-party teams contribute their own. The Plugin module uses Python's standard entry-point mechanism so that any installed package can register components that are discovered and loaded automatically at startup. No manual imports, no central configuration file — just install the package and go.
Discovering Plugins¶
from fireflyframework_agentic.plugin import PluginDiscovery
result = PluginDiscovery.discover_all()
print(f"Loaded {len(result.successful)} plugins, {len(result.failed)} failed")
Creating a Plugin¶
In your package's pyproject.toml, declare entry points under the framework's groups:
[project.entry-points."fireflyframework_agentic.agents"]
my_agent = "my_package.agents:MySpecialAgent"
[project.entry-points."fireflyframework_agentic.tools"]
my_tool = "my_package.tools:MyCustomTool"
[project.entry-points."fireflyframework_agentic.reasoning_patterns"]
my_pattern = "my_package.reasoning:MyCustomPattern"
The three entry-point groups are:
fireflyframework_agentic.agents— Custom agents.fireflyframework_agentic.tools— Custom tools.fireflyframework_agentic.reasoning_patterns— Custom reasoning patterns.
On discovery, the framework loads each entry point and registers it in the appropriate registry.
Configuration¶
IDP Tie-In: Packaging IDP as a Plugin¶
You can package the entire IDP pipeline as a plugin that self-registers when installed:
# In idp_plugin/pyproject.toml
[project.entry-points."fireflyframework_agentic.agents"]
document_classifier = "idp_plugin.agents:classifier_agent"
field_extractor = "idp_plugin.agents:extractor_agent"
ocr_agent = "idp_plugin.agents:ocr_agent"
After uv add idp-plugin, calling PluginDiscovery.discover_all() registers all three
agents automatically.
Chapter 18: Putting It All Together¶
You've learned every module in fireflyframework-agentic, each in isolation. Now it's time to see how they all fit together in a single, production-grade application. The diagram below shows the full system architecture — every layer, every connection:
Full System Architecture¶
graph TB
subgraph "Caller"
APP["Host application\n(in-process)"]
end
subgraph "Orchestration Layer"
PIPE["Pipeline Engine\n(DAG scheduler)"]
DELEG["Delegation Router"]
end
subgraph "Intelligence Layer"
REASON["Reasoning Patterns\n(ReAct, CoT, P&E, ...)"]
VALID["Validation & QoS"]
REVIEW["OutputReviewer"]
end
subgraph "Agent Layer"
FA["FireflyAgent"]
REG["AgentRegistry"]
TPL["Template Agents"]
end
subgraph "Agent Support"
TOOLS["Tools + ToolKit\n(guards, builtins, registry)"]
PROMPTS["Prompts\n(Jinja2, versioned, composed)"]
MEM["Memory\n(conversation + working)"]
CONTENT["Content Processing\n(chunk, compress, batch)"]
end
subgraph "Ops Layer"
OBS["Observability\n(traces, metrics, events)"]
EXPL["Explainability\n(audit trail, reports)"]
EXP["Experiments\n(A/B testing)"]
LAB["Lab\n(benchmarks, eval)"]
end
subgraph "Foundation"
PAI["Pydantic AI\n(model calls, streaming)"]
CFG["FireflyAgenticConfig\n(env-driven settings)"]
PLUG["Plugin System\n(entry-point discovery)"]
end
APP --> PIPE & DELEG
PIPE --> FA
DELEG --> FA
FA --> REASON
REASON --> FA
FA --> VALID
VALID --> REVIEW
REVIEW --> FA
FA --> PAI
FA --> TOOLS
FA --> PROMPTS
FA --> MEM
FA --> CONTENT
FA --> REG
TPL --> FA
OBS -.-> FA & PIPE & REASON
EXPL -.-> FA
EXP -.-> FA
LAB -.-> FA
PLUG -.-> REG & TOOLS
CFG -.-> FA & PIPE & MEM & OBS
Let's assemble the complete IDP pipeline using everything we've learned. This is the full, production-ready implementation.
Project Structure¶
idp-service/
├── pyproject.toml
├── .env
├── prompts/
│ ├── classification.jinja2
│ └── extraction.jinja2
├── src/
│ └── idp_service/
│ ├── __init__.py
│ ├── agents.py # Agent definitions
│ ├── tools.py # Tool definitions
│ ├── pipeline.py # Pipeline wiring
│ ├── validation.py # Validation rules
│ └── main.py # In-process entry point
└── tests/
└── test_pipeline.py
Configuration (.env)¶
FIREFLY_AGENTIC_DEFAULT_MODEL=openai:gpt-4o
FIREFLY_AGENTIC_DEFAULT_TEMPERATURE=0.1
FIREFLY_AGENTIC_MAX_RETRIES=3
FIREFLY_AGENTIC_OBSERVABILITY_ENABLED=true
FIREFLY_AGENTIC_MEMORY_BACKEND=file
FIREFLY_AGENTIC_MEMORY_FILE_DIR=.firefly_memory
FIREFLY_AGENTIC_DEFAULT_CHUNK_SIZE=4000
FIREFLY_AGENTIC_VALIDATION_ENABLED=true
Agents (agents.py)¶
from pydantic import BaseModel, Field
from fireflyframework_agentic.agents.templates import (
create_classifier_agent,
create_extractor_agent,
create_summarizer_agent,
)
from fireflyframework_agentic.agents import FireflyAgent
# Output schema for extraction
class InvoiceData(BaseModel):
invoice_number: str
vendor_name: str
total_amount: float = Field(ge=0)
due_date: str
line_items: list[dict] = []
# Phase 1: Document classifier
classifier_agent = create_classifier_agent(
categories=["invoice", "receipt", "contract", "form", "other"],
name="document_classifier",
model="openai:gpt-4o",
)
# Phase 2: OCR/digitisation agent
ocr_agent = FireflyAgent(
name="ocr_agent",
model="openai:gpt-4o",
instructions="Extract all text from this document. Preserve layout and structure.",
)
# Phase 2.5: Summariser for compression
summary_agent = create_summarizer_agent(
name="doc_summariser",
max_length="medium",
style="technical",
model="openai:gpt-4o",
)
# Phase 3: Field extractor — with tools from tools.py attached
from .tools import extraction_kit
extractor_agent = create_extractor_agent(
InvoiceData,
field_descriptions={
"invoice_number": "Format: INV-NNNN or similar",
"vendor_name": "Company that issued the invoice",
"total_amount": "Total monetary amount due",
"due_date": "Payment due date (ISO 8601)",
"line_items": "List of line items with description, quantity, unit_price",
},
name="field_extractor",
model="openai:gpt-4o",
tools=extraction_kit.as_pydantic_tools(), # Bridge Firefly tools → Pydantic AI
)
Tools (tools.py)¶
Tools live in their own module. Each @firefly_tool call creates a BaseTool,
registers it in the global ToolRegistry, and returns the instance. The ToolKit
bundles them for agent injection via as_pydantic_tools():
from fireflyframework_agentic.tools import firefly_tool, guarded, retryable, ToolKit
from fireflyframework_agentic.tools.guards import RateLimitGuard, ValidationGuard
from fireflyframework_agentic.tools.builtins import CalculatorTool
# OCR tool — rate-limited because the upstream API is metered
@retryable(max_retries=2, backoff=1.0)
@guarded(RateLimitGuard(max_calls=100, period_seconds=60))
@firefly_tool(name="ocr_extract", description="Extract text from a document image via OCR")
async def ocr_extract(image_data: str) -> str:
"""In production, call an OCR service like AWS Textract or Google Vision."""
return "Invoice #INV-2026-001\nVendor: Acme Corp\nAmount: $1,234.56\nDate: 2026-01-15"
# Vendor lookup — validates that vendor_name is present
@guarded(ValidationGuard(required_keys=["vendor_name"]))
@firefly_tool(name="vendor_lookup", description="Look up vendor details from the ERP system")
async def vendor_lookup(vendor_name: str) -> str:
vendors = {
"Acme Corp": '{"id": "V-001", "tax_id": "US-12345", "payment_terms": "NET30"}',
"Globex": '{"id": "V-002", "tax_id": "US-67890", "payment_terms": "NET60"}',
}
return vendors.get(vendor_name, '{"error": "Vendor not found"}')
# Calculator — a built-in tool, no decorator needed
calculator = CalculatorTool()
# Bundle the tools the extractor agent needs.
# as_pydantic_tools() bridges them into the Pydantic AI tool format.
extraction_kit = ToolKit(
"extraction-tools",
[ocr_extract, vendor_lookup, calculator],
description="Tools available during invoice field extraction",
)
Validation (validation.py)¶
from fireflyframework_agentic.validation.rules import (
OutputValidator, RegexRule, FormatRule, RangeRule,
)
from fireflyframework_agentic.validation.qos import (
QoSGuard, ConfidenceScorer, ConsistencyChecker, GroundingChecker,
)
from fireflyframework_agentic.validation import OutputReviewer
from .agents import InvoiceData, extractor_agent
invoice_validator = OutputValidator({
"invoice_number": [RegexRule("invoice_number", r"^INV-\d{4,10}$")],
"vendor_name": [RegexRule("vendor_name", r".{2,}")],
"total_amount": [RangeRule("total_amount", min_value=0.01, max_value=10_000_000)],
"due_date": [FormatRule("due_date", "iso_date")],
})
reviewer = OutputReviewer(
output_type=InvoiceData,
validator=invoice_validator,
max_retries=3,
)
qos_guard = QoSGuard(
confidence_scorer=ConfidenceScorer(extractor_agent),
consistency_checker=ConsistencyChecker(extractor_agent, num_runs=2),
grounding_checker=GroundingChecker(),
min_confidence=0.8,
min_consistency=0.6,
min_grounding=0.8,
)
Pipeline (pipeline.py)¶
from fireflyframework_agentic.pipeline.builder import PipelineBuilder
from fireflyframework_agentic.pipeline.steps import AgentStep, ReasoningStep, CallableStep
from fireflyframework_agentic.pipeline.context import PipelineContext
from fireflyframework_agentic.reasoning import PlanAndExecutePattern, ReflexionPattern
from fireflyframework_agentic.content.chunking import TextChunker, BatchProcessor
from fireflyframework_agentic.content.compression import ContextCompressor, MapReduceStrategy
from fireflyframework_agentic.memory import MemoryManager
from fireflyframework_agentic.observability import traced
from .agents import classifier_agent, ocr_agent, summary_agent, extractor_agent
from .validation import invoice_validator, reviewer
async def validate_step(context, inputs):
extracted = inputs.get("input", {})
report = invoice_validator.validate(extracted)
context.metadata["validation_report"] = report
if not report.valid:
# Self-correct with Reflexion
reflexion = ReflexionPattern(max_steps=2)
corrected = await reflexion.execute(
extractor_agent,
f"Fix these errors: {report.errors}. Original data: {extracted}",
)
return {"extracted": corrected.output, "valid": True, "errors": []}
return {"extracted": extracted, "valid": True, "errors": []}
async def assemble_step(context, inputs):
data = inputs.get("input", {})
return {
"document_type": context.metadata.get("doc_type", "unknown"),
"language": context.metadata.get("language", "unknown"),
"extracted_fields": data.get("extracted", {}),
"validation_passed": data.get("valid", False),
}
idp_pipeline = (
PipelineBuilder("idp-pipeline")
.add_node("classify", AgentStep(classifier_agent))
.add_node("digitise", AgentStep(ocr_agent), retry_max=2, timeout_seconds=60)
.add_node(
"extract",
ReasoningStep(PlanAndExecutePattern(max_steps=15, allow_replan=True), extractor_agent),
)
.add_node("validate", CallableStep(validate_step))
.add_node("assemble", CallableStep(assemble_step))
.chain("classify", "digitise", "extract", "validate", "assemble")
.build()
)
@traced(name="idp.process")
async def process_document(document_bytes: bytes, metadata: dict | None = None) -> dict:
memory = MemoryManager(working_scope_id="idp-session")
ctx = PipelineContext(
inputs=document_bytes,
metadata=metadata or {},
memory=memory,
)
result = await idp_pipeline.run(context=ctx)
return result.final_output if result.success else {"error": result.failed_nodes}
Entry Point (main.py)¶
fireflyframework-agentic is a pure in-process library: it serves no port and consumes
no broker. Your host service owns serving and calls process_document directly. The host
also owns OTel SDK and exporter configuration; the framework emits spans and metrics
through the OpenTelemetry API, so they flow through whatever providers the host has set up:
import asyncio
from .pipeline import process_document
async def main(document_bytes: bytes, filename: str) -> dict:
return await process_document(
document_bytes,
metadata={"filename": filename, "source": "host-service"},
)
if __name__ == "__main__":
with open("invoice.pdf", "rb") as fh:
print(asyncio.run(main(fh.read(), "invoice.pdf")))
To expose this over HTTP or wire it to a message broker, embed process_document in
your host service's framework of choice — the agent library stays in-process.
Production Checklist¶
Before deploying to production, verify:
- [ ] Configuration — All
FIREFLY_AGENTIC_*environment variables are set. - [ ] Model access — API keys for your LLM provider are configured.
- [ ] Observability —
observability_enabledis on, and your host service has configured the OTel SDK/exporters so framework spans and metrics flow to your backend. - [ ] Memory persistence —
memory_backendis set to"file"(or a custom backend) for durability. - [ ] Validation —
OutputValidatorrules match your business requirements. - [ ] QoS thresholds —
min_confidenceandnum_runs(ConsistencyChecker) are tuned for your use case. - [ ] Retry limits — Pipeline nodes have appropriate
retry_maxandtimeout_seconds. - [ ] Experiments — You've A/B tested your prompt and model variants.
- [ ] Audit trail — Explainability is enabled for regulated workloads.
Next Steps¶
Congratulations — you now know every module in fireflyframework-agentic. Here are some paths to explore further:
- Dive deeper — Each chapter links to a detailed module guide in
docs/. - Read the source — The framework is fully typed and well-documented in code.
- Run the tests —
uv run pytestruns 1,300+ tests across ~128 files that exercise every module. - Build your own — Extend
AbstractReasoningPattern, implementMemoryStore, create custom tools, or write a plugin.