Pipeline Guide¶
Copyright 2026 Firefly Software Foundation. Licensed under the Apache License 2.0.
The Pipeline module provides a DAG-based orchestrator for composing multi-step GenAI workflows. It supports parallel execution, conditional branching, retries, timeouts, and fan-out/fan-in patterns -- everything needed to model real-world enterprise processing pipelines.
PipelineBuilder has two modes:
- Port-based (legacy, parallel) — nodes communicate via
output_key/input_keyedge ports and run concurrently within each topological level. Best for ETL-shaped DAGs. Documented in the bulk of this guide. - State-based — opt-in via
PipelineBuilder("name", state=SomeModel). Nodes becomeasync (state) -> dictover a typed shared state. One.branch(source, router)call covers conditional routing;Send(target, payload)covers runtime fan-out; aCheckpointerenables resume after failure. Best for agentic workflows and ReAct-style loops. See State-Based Pipelines.
Concepts¶
A pipeline is defined as a Directed Acyclic Graph (DAG) of nodes connected by edges. Each node wraps a step executor that does the actual work (call an agent, run a reasoning pattern, execute a function). The engine schedules nodes by topological level so that independent nodes run concurrently.
graph LR
A[File Classifier] --> B[OCR / Digitizer]
B --> C[Extractor]
C --> D[Validator]
D --> E[Result Assembler]
Building a Pipeline¶
Fluent Builder API¶
The PipelineBuilder provides a chainable API for constructing pipelines:
from fireflyframework_agentic.pipeline.builder import PipelineBuilder
from fireflyframework_agentic.pipeline.steps import AgentStep, CallableStep
engine = (
PipelineBuilder("invoice-pipeline")
.add_node("classify", AgentStep(classifier_agent))
.add_node("extract", AgentStep(extractor_agent))
.add_node("validate", CallableStep(validate_fn))
.chain("classify", "extract", "validate")
.build()
)
result = await engine.run(inputs="<base64-encoded invoice>")
The builder auto-wraps compatible objects:
- A FireflyAgent is wrapped in AgentStep.
- An async callable is wrapped in CallableStep.
- A StepExecutor implementation is used directly.
Manual DAG Construction¶
For full control, construct the DAG directly:
from fireflyframework_agentic.pipeline.dag import DAG, DAGNode, DAGEdge
from fireflyframework_agentic.pipeline.engine import PipelineEngine
dag = DAG("my-pipeline")
dag.add_node(DAGNode(node_id="step_a", step=my_step))
dag.add_node(DAGNode(node_id="step_b", step=other_step))
dag.add_edge(DAGEdge(source="step_a", target="step_b"))
engine = PipelineEngine(dag)
result = await engine.run(inputs="hello")
Step Executors¶
Step executors are objects that implement the StepExecutor protocol:
class StepExecutor(Protocol):
async def execute(self, context: PipelineContext, inputs: dict[str, Any]) -> Any:
...
The framework provides these built-in executors:
- AgentStep -- Runs a
FireflyAgentwith the input as the prompt. - ReasoningStep -- Runs a reasoning pattern (ReAct, CoT, etc.) through an agent.
- CallableStep -- Wraps any
asyncfunction(context, inputs) -> output. - BatchLLMStep -- Processes multiple prompts concurrently through an agent for cost optimization. See Batch Processing below.
- BranchStep (deprecated) -- Routes execution to one of several downstream paths based on
a predicate. Use
.branch(...)in State-Based Pipelines instead. - FanOutStep (deprecated) -- Splits input into a list for parallel downstream processing.
Use
Sendin State-Based Pipelines instead. - FanInStep -- Merges outputs from multiple upstream nodes.
- EmbeddingStep -- Embeds the upstream text(s) using an embedding provider
(
EmbeddingProtocol). Accepts a single string or a list;input_keyselects which inputs key holds the text(s). - RetrievalStep -- Searches a vector store (
VectorStoreProtocol) using the upstream text as the query. Embeds the query with the suppliedembedder(or treats the input as a pre-computed embedding whenembedder=None). Parameters:store,embedder=None,top_k=5,input_key="input".
State-Based Pipelines¶
Set state= on PipelineBuilder to switch to a declarative API designed for
agentic workflows. Nodes become async (state) -> dict | None functions over
a typed shared-state object; the engine reduces each node's partial-update
dict back into the state.
from typing import Annotated
from pydantic import BaseModel
from fireflyframework_agentic.pipeline import PipelineBuilder, append
class AgentState(BaseModel):
messages: Annotated[list[str], append] = [] # reducer: append
intent: str | None = None # default reducer: replace
answer: str | None = None
async def classify(state: AgentState) -> dict:
return {"intent": "complaint" if "refund" in state.messages[-1] else "general"}
async def answer(state: AgentState) -> dict:
return {"answer": "Here is your answer."}
async def escalate(state: AgentState) -> dict:
return {"answer": "Escalated to human."}
def route(state: AgentState) -> str:
return "escalate" if state.intent == "complaint" else "answer"
pipeline = (
PipelineBuilder("support-agent", state=AgentState)
.add_node(classify) # node id derived from fn.__name__
.add_node(answer)
.add_node(escalate)
.branch(classify, route) # router returns target node id
.build()
)
result = await pipeline.invoke(AgentState(messages=["I want a refund"]))
print(result.state.answer)
Reducers¶
Reducers are declared as Annotated[T, reducer_fn] on the state schema. The
built-ins live in fireflyframework_agentic.pipeline.reducers:
| Reducer | Semantics |
|---|---|
replace |
Last-write-wins (the default for any field). |
append |
Append a single item to a list. |
extend |
Concatenate two iterables. |
merge_dict |
Shallow-merge two dicts; update wins on conflict. |
Custom reducers are any callable (current, update) -> merged.
Branching¶
.branch(source, router, mapping=None) registers a synchronous
(state) -> str | Send | list[Send] router on source:
- Returning a node id (string) routes to that node directly.
- Passing
mapping={"label": target_node, ...}lets the router return an abstract label instead of a node id. - Returning a
Sendorlist[Send]triggers runtime fan-out (see below).
Checkpoint + Resume¶
Pass a Checkpointer to persist state after each successful node. The framework
ships one backend, FileCheckpointer, plus the Checkpointer Protocol it
implements (save, load_latest, list_runs) and the CheckpointRecord model.
Anything that satisfies the Protocol is swappable without engine changes.
| Backend | Use when | Trade-off | Source |
|---|---|---|---|
FileCheckpointer |
Dev, single-host, ephemeral | No cross-process / cross-host sharing | shipped (fireflyframework_agentic.pipeline) |
| Redis-backed | Multi-worker, sub-day-scale runs | TTL eviction; not durable forever | example template (examples/software_factory/checkpointers/redis.py) |
| Postgres-backed | Long-lived runs, compliance, audit-friendly | Operational overhead of a DB | example template (examples/software_factory/checkpointers/postgres.py) |
FileCheckpointer writes one JSON file per node at
<root>/<pipeline_name>/<run_id>/<sequence>_<node_id>.json. The Redis and
Postgres variants are not importable framework classes — they are ~50–80 LOC
plug-and-play templates under examples/software_factory/ that implement the
same Checkpointer Protocol against a caller-supplied connection. Copy whichever
you need into your project and adapt it.
from fireflyframework_agentic.pipeline import FileCheckpointer
pipeline = (
PipelineBuilder("software-factory", state=BuildState,
checkpointer=FileCheckpointer("./checkpoints"))
.add_node(architect)
.add_node(python_dev)
.add_node(deployer)
.add_node(evaluator)
.chain(architect, python_dev, deployer, evaluator)
.build()
)
# Fresh run
result = await pipeline.invoke(BuildState(requirements="user-mgmt service"))
# Resume after crash — picks up at the failed node, skips completed ones
result = await pipeline.invoke(run_id=result.run_id)
# Or jump into a specific node with explicit state
result = await pipeline.invoke(state=loaded_state, start_at=deployer)
To use a durable store, copy one of the example templates into your project.
Each implements save / load_latest / list_runs against a connection you
supply, so swapping it in is a one-line change at the builder:
# After copying examples/software_factory/checkpointers/redis.py into your project:
from myproject.checkpointers.redis import RedisCheckpointer
checkpointer = RedisCheckpointer(client=my_existing_redis)
pipeline = PipelineBuilder("software-factory", state=BuildState,
checkpointer=checkpointer).add_node(architect).build()
Cycles and recursion_limit¶
State pipelines permit cycles for ReAct loops and retry-with-critique patterns.
The builder accepts recursion_limit (default 25) as a safety net — a runaway
loop surfaces as result.success=False with a clean error, not an infinite hang.
def route(state):
return "done" if state.counter >= 3 else "step"
PipelineBuilder("loop", state=LoopState, recursion_limit=25)
.add_node(step).add_node(done).branch(step, route).build()
Runtime Fan-Out via Send¶
A router may return list[Send(target, payload)] to dispatch multiple
invocations of the same (or different) workers concurrently. Each Send's
payload is applied to a copy of the current state before its target runs;
results reduce back into shared state. Replaces the legacy FanOutStep.
from fireflyframework_agentic.pipeline import Send
def dispatch(state):
return [Send("worker", {"item": x}) for x in state.items]
PipelineBuilder("mapreduce", state=MapReduceState)
.add_node(planner).add_node(worker).add_node(collect)
.add_edge(worker, collect)
.branch(planner, dispatch)
.build()
When all worker targets share a common successor, the engine continues there once the fan-out completes; the aggregator runs once with all results in shared state.
Observability¶
State pipelines emit lifecycle callbacks and OTel spans so ops can see what an agent workflow is doing in real time.
The unified EventHandler protocol (exported from
fireflyframework_agentic.pipeline) carries the run_id on every callback (so
events correlate across resumes), and on_node_start / on_node_pause carry a
per-node visit counter (so cyclic graphs and Send fan-outs are
distinguishable). The engine dispatches by inspecting each callback's declared
parameters — so a handler whose method signature omits run_id or visit
(including any legacy PipelineEventHandler implementation) keeps working
unchanged; the engine simply drops the parameters it doesn't declare. Implement
any subset of methods; missing ones are no-ops.
EventHandler callbacks: on_pipeline_start(pipeline_name, run_id),
on_node_start(pipeline_name, run_id, node_id, visit),
on_node_complete(pipeline_name, run_id, node_id, latency_ms),
on_node_error(pipeline_name, run_id, node_id, error),
on_node_skip(pipeline_name, run_id, node_id, reason),
on_node_pause(pipeline_name, run_id, node_id, reason), and
on_pipeline_complete(pipeline_name, run_id, success, duration_ms).
from fireflyframework_agentic.pipeline import PipelineBuilder
class ProgressHandler:
async def on_pipeline_start(self, pipeline_name, run_id):
print(f"▶ [{pipeline_name}] run {run_id} starting")
async def on_node_start(self, pipeline_name, run_id, node_id, visit):
print(f" ▶ {node_id} (visit #{visit})")
async def on_node_complete(self, pipeline_name, run_id, node_id, latency_ms):
print(f" ✔ {node_id} ({latency_ms:.0f}ms)")
async def on_node_error(self, pipeline_name, run_id, node_id, error):
print(f" ✗ {node_id}: {error}")
async def on_pipeline_complete(self, pipeline_name, run_id, success, duration_ms):
status = "OK" if success else "FAILED"
print(f"═ [{pipeline_name}] {status} in {duration_ms:.0f}ms")
pipeline = (
PipelineBuilder("agent", state=AgentState, event_handler=ProgressHandler())
.add_node(classify).add_node(answer).add_node(escalate)
.branch(classify, route)
.build()
)
In parallel, the pipeline emits OTel spans automatically when
observability_enabled is True and opentelemetry is installed:
- One pipeline-level span
pipeline.state.<name>around eachinvoke, attributesfirefly.pipeline,firefly.run_id. - One per-node span
pipeline.state.node.<node_id>for eachfn(state)call, parented under the pipeline span, attributesfirefly.node,firefly.visit. - For
Sendfan-out: one per-Send span as a sibling under the pipeline span.
Handler exceptions are swallowed — observability never breaks business logic.
Human-in-the-loop (Pause)¶
Any node may return Pause(reason="...") instead of a state update to halt
the pipeline cleanly. The current state is checkpointed with a paused marker;
invoke returns with result.paused=True and result.success=False.
from fireflyframework_agentic.pipeline import Pause
async def await_deploy_approval(state: DeployState) -> Pause:
return Pause(reason="awaiting human approval to deploy to production")
To resume after the external approval comes in, call invoke with the same
run_id and approve_pause=True. Without approve_pause=True, the
resume raises a PipelineError — the pause is sticky until explicitly
released. The successor of the paused node runs next; the pause node itself
is not re-executed.
first = await pipeline.invoke(DeployState(...))
assert first.paused
# ...later, after approval...
done = await pipeline.invoke(run_id=first.run_id, approve_pause=True)
assert done.success
The configured EventHandler receives an on_node_pause callback when this
happens (the callback is optional — partial handlers without it continue to
work).
Audit Log¶
Distinct from the Checkpointer (which stores the latest state for
crash recovery), an AuditLog is an append-only record of every node
visit for compliance, debugging, and replay. Wire one in via the
audit_log kwarg:
from fireflyframework_agentic.pipeline import (
PipelineBuilder, FileAuditLog, LoggingAuditLog, OtelAuditLog,
)
PipelineBuilder("agent", state=AgentState, audit_log=FileAuditLog("./audit"))
Three backends ship, each conforming to the AuditLog Protocol (which
defines a single record(entry: AuditEntry) method):
| Backend | Use when | Read API | Trace-correlated | Source |
|---|---|---|---|---|
FileAuditLog |
Dev / single-host | yes | no | shipped (one JSONL file per run) |
LoggingAuditLog |
Generic log stacks (Splunk-HEC, Loki, JSON-logging) | no (write-only) | no | shipped (stdlib logging) |
OtelAuditLog |
OTel-native stacks (Application Insights, Datadog APM, OTel Collector) | no (write-only) | yes | shipped (needs opentelemetry-sdk) |
FileAuditLog also implements QueryableAuditLog (adds
list_entries(pipeline_name, run_id)); the two write-only backends delegate
query/search to the user's existing observability stack. For a Postgres-backed
queryable audit log, copy the plug-and-play template at
examples/software_factory/audit/postgres.py — it implements
QueryableAuditLog against a caller-supplied psycopg.Connection.
Audit-log write failures are non-fatal — logged but never abort the pipeline.
Mermaid Export¶
PipelineEngine.to_mermaid() and DAG.to_mermaid() render the topology as a
Mermaid flowchart. Branch edges declared with an explicit mapping show their
label; edges carrying a condition are annotated if?.
When to use which mode¶
| Use port-based when… | Use state-based when… |
|---|---|
| Pure ETL: parallel, fan-out/fan-in, no shared state | Agentic workflow: classify → branch → respond / loop / retry |
| Each step's input is a single value from the previous step | Multiple agents reading/writing different fields of a shared object |
| You want the engine to run independent nodes concurrently | You want resume-after-failure and start-from-middle semantics |
You're happy with BranchStep + per-node condition lambdas |
You want one .branch(...) call and inspectable routing |
See examples/pipeline_state.py for a
runnable demo covering branching, software-factory checkpoint/resume, and
map-reduce fan-out.
Parallel Execution (Fan-Out / Fan-In)¶
FanOutStepis deprecated. For runtime fan-out (one dispatch per item, arbitrary count), preferSendfrom State-Based Pipelines.FanOutStepstill works for now (it emits aDeprecationWarningon construction);FanInStepis not deprecated.
graph TD
SPLIT[Fan-Out] --> W1[Worker 1]
SPLIT --> W2[Worker 2]
SPLIT --> W3[Worker 3]
W1 --> MERGE[Fan-In]
W2 --> MERGE
W3 --> MERGE
from fireflyframework_agentic.pipeline.steps import FanOutStep, FanInStep
engine = (
PipelineBuilder("parallel")
.add_node("split", FanOutStep(lambda doc: doc.pages))
.add_node("ocr_1", AgentStep(ocr_agent))
.add_node("ocr_2", AgentStep(ocr_agent))
.add_node("merge", FanInStep())
.add_edge("split", "ocr_1")
.add_edge("split", "ocr_2")
.add_edge("ocr_1", "merge", input_key="page_1")
.add_edge("ocr_2", "merge", input_key="page_2")
.build()
)
Batch Processing (BatchLLMStep)¶
BatchLLMStep processes multiple prompts through an agent concurrently, optimizing
for cost and throughput. It's ideal for bulk classification, extraction, or
summarization tasks.
from fireflyframework_agentic.pipeline.steps import BatchLLMStep
from fireflyframework_agentic.pipeline.builder import PipelineBuilder
classifier = FireflyAgent(
name="sentiment-classifier",
model="openai:gpt-4o-mini",
instructions="Classify sentiment as: positive, negative, or neutral.",
)
pipeline = (
PipelineBuilder("batch-sentiment")
.add_node("load", lambda ctx, inp: ["Review 1", "Review 2", "Review 3"])
.add_node("classify", BatchLLMStep(
classifier,
prompts_key="load", # Get prompts from "load" step output
batch_size=10, # Process up to 10 concurrently
))
.add_edge("load", "classify")
.build()
)
result = await pipeline.run(inputs={})
classifications = result.outputs["classify"].output # List of results
Parameters¶
agent— TheFireflyAgentto use for processing.prompts_key— Key to access prompts from either:- Previous step output:
context.get_node_result(prompts_key).output - Initial inputs:
inputs[prompts_key] batch_size— Maximum concurrent requests (default: 50).wait_for_completion— Whether to wait for all results (default:True).poll_interval_seconds— Polling interval for batch jobs (default: 60.0).max_wait_seconds— Maximum wait time for completion (default: 3600).on_batch_complete— Optional callback for each batch completion.
Accessing Previous Step Outputs¶
BatchLLMStep automatically tries two data sources in order:
- Node result:
context.get_node_result(prompts_key).output - Initial inputs:
inputs[prompts_key]
This allows flexible data flow in pipelines:
# Pattern 1: From previous step
async def load_documents(context, inputs):
return ["Doc 1", "Doc 2", "Doc 3"] # Returns list directly
builder.add_node("load", load_documents)
builder.add_node("classify", BatchLLMStep(agent, prompts_key="load"))
builder.add_edge("load", "classify")
# Pattern 2: From initial inputs
pipeline.run(PipelineContext(inputs={"prompts": ["Q1", "Q2", "Q3"]}))
# BatchLLMStep with prompts_key="prompts" will find them
With Middleware¶
BatchLLMStep respects all agent middleware including caching, circuit breakers, and cost guards:
from fireflyframework_agentic.agents.prompt_cache import PromptCacheMiddleware
from fireflyframework_agentic.resilience.circuit_breaker import CircuitBreakerMiddleware
classifier = FireflyAgent(
name="batch-classifier",
model="anthropic:claude-haiku-4",
instructions="Classify documents...",
middleware=[
PromptCacheMiddleware(), # Cache system prompt
CircuitBreakerMiddleware(failure_threshold=5), # Protect against failures
],
)
step = BatchLLMStep(classifier, prompts_key="documents", batch_size=20)
Error Handling¶
By default, exceptions from individual prompts are captured and returned as
{"error": "<message>"} entries in the results list, in input order. This
prevents one failure from blocking the entire batch:
results = await step.execute(context, {"prompts": ["P1", "P2", "P3"]})
# results = [<output1>, {"error": "..."}, <output3>]
for i, result in enumerate(results):
if isinstance(result, dict) and "error" in result:
print(f"Prompt {i} failed: {result['error']}")
Conditional Execution¶
Nodes can be gated by a condition function. If the condition returns False, the node
is skipped (marked as skipped=True in the result) and downstream nodes receive no
input from it.
dag.add_node(DAGNode(
node_id="ocr",
step=AgentStep(ocr_agent),
condition=lambda ctx: ctx.metadata.get("needs_ocr", False),
))
Conditional Branching (BranchStep)¶
Deprecated. Prefer State-Based Pipelines with
.branch(source, router)— one call instead ofBranchStep+ per-nodeconditionlambdas, and the topology becomes inspectable as data.BranchStepstill works (it emits aDeprecationWarningon construction); removal will be tracked in a follow-up issue once internal callers migrate.
BranchStep provides router-based conditional branching. The router callable
receives the node's input and returns a string key. Downstream nodes use
condition gates to check the branch key and execute only the matching path.
from fireflyframework_agentic.pipeline.steps import BranchStep, CallableStep
def classify_intent(inputs):
text = inputs.get("input", "")
return "positive" if "good" in text else "negative"
engine = (
PipelineBuilder("branching")
.add_node("branch", BranchStep(router=classify_intent))
.add_node(
"pos_handler",
CallableStep(handle_positive),
condition=lambda ctx: ctx.get_node_result("branch").output == "positive",
)
.add_node(
"neg_handler",
CallableStep(handle_negative),
condition=lambda ctx: ctx.get_node_result("branch").output == "negative",
)
.add_edge("branch", "pos_handler")
.add_edge("branch", "neg_handler")
.build()
)
Failure Strategies¶
Each node can be configured with a FailureStrategy that controls how the
pipeline behaves when the node fails:
PROPAGATE-- Mark the node as failed but continue executing downstream nodes (they receiveNoneinputs). This is the legacy behaviour.SKIP_DOWNSTREAM(default) -- Mark the node as failed and automatically skip all transitive downstream dependents.FAIL_PIPELINE-- Abort the entire pipeline immediately.
from fireflyframework_agentic.pipeline.dag import DAGNode, FailureStrategy
dag.add_node(DAGNode(
node_id="critical_step",
step=AgentStep(agent),
failure_strategy=FailureStrategy.FAIL_PIPELINE,
))
dag.add_node(DAGNode(
node_id="optional_step",
step=AgentStep(agent),
failure_strategy=FailureStrategy.PROPAGATE,
))
Retries and Timeouts¶
Each node can be configured with retry limits, per-node timeouts, and a
backoff_factor for exponential backoff between retries:
dag.add_node(DAGNode(
node_id="extract",
step=AgentStep(extractor_agent),
retry_max=3,
timeout_seconds=30.0,
backoff_factor=1.5,
))
Retries use exponential backoff with jitter. The delay for attempt n is
backoff_factor × 2^(n-1) plus random jitter. On exhaustion the node is
marked as failed and the pipeline reports success=False.
Pipeline Context¶
You can attach a MemoryManager to the PipelineContext so that all steps share conversation and working memory. AgentStep injects the memory into downstream agent runs; ReasoningStep passes it to patterns via the memory kwarg.
from fireflyframework_agentic.memory import MemoryManager
from fireflyframework_agentic.pipeline.context import PipelineContext
memory = MemoryManager(working_scope_id="invoice-run-42")
ctx = PipelineContext(inputs=document_bytes, metadata={"source": "email"}, memory=memory)
result = await engine.run(context=ctx)
PipelineContext is the shared data bus that flows through the DAG. It carries:
inputs-- The original pipeline input.metadata-- Arbitrary key-value pairs.correlation_id-- A unique ID for observability correlation.- Node results from completed upstream nodes.
from fireflyframework_agentic.pipeline.context import PipelineContext
ctx = PipelineContext(
inputs=document_bytes,
metadata={"source": "email", "priority": "high"},
)
result = await engine.run(context=ctx)
Pipeline Result¶
PipelineResult aggregates all node outcomes:
outputs--dict[str, NodeResult]for every node.final_output-- Output of the terminal node(s).execution_trace-- Ordered list ofExecutionTraceEntryevents.total_duration_ms-- End-to-end elapsed time.success--Trueonly if all nodes succeeded or were intentionally skipped.usage-- AggregatedUsageSummaryacross all pipeline nodes (token counts, cost, latency) when cost tracking is enabled.Nonewhen disabled or no LLM calls were made.run_id-- Identifier for this run; resume later withengine.run(run_id=...).final_state-- The final shared-state object for state-aware pipelines (and itsstatealias property).Nonewhen the engine had no state overlay. The state-based examples read e.g.result.state.answer.paused/paused_node/pause_reason-- Set when a node returnedPause;paused_nodenames the halting node. See Human-in-the-loop.failed_nodes-- Property listing IDs of nodes that failed.
result = await engine.run(inputs="test")
if result.success:
print(result.final_output)
else:
print("Failed nodes:", result.failed_nodes)
Eager Scheduling¶
By default, the pipeline engine uses eager scheduling: as soon as a node
completes and its downstream dependents have all inputs satisfied, those
dependents are scheduled immediately via asyncio.create_task() rather than
waiting for the entire execution level to finish. This reduces end-to-end
latency for pipelines with unbalanced node durations.
Eager scheduling is transparent -- no API changes are needed. The engine still respects topological ordering and condition gates.
Pipeline Event Handler¶
The legacy PipelineEventHandler protocol lets you receive real-time callbacks
as nodes start, complete, fail, or get skipped. Implement any subset of the
five hooks. New code should prefer the unified
EventHandler (which adds run_id, visit, and
on_node_pause); both are dispatched by signature inspection, so existing
PipelineEventHandler implementations keep working unchanged.
from fireflyframework_agentic.pipeline.engine import PipelineEventHandler
class MyHandler:
async def on_node_start(self, node_id: str, pipeline_name: str) -> None:
print(f"Starting: {node_id}")
async def on_node_complete(self, node_id: str, pipeline_name: str, latency_ms: float) -> None:
print(f"Completed: {node_id} in {latency_ms:.1f}ms")
async def on_node_error(self, node_id: str, pipeline_name: str, error: str) -> None:
print(f"Failed: {node_id}: {error}")
async def on_node_skip(self, node_id: str, pipeline_name: str, reason: str) -> None:
print(f"Skipped: {node_id}: {reason}")
async def on_pipeline_complete(self, pipeline_name: str, success: bool, duration_ms: float) -> None:
print(f"Pipeline {'succeeded' if success else 'failed'} in {duration_ms:.1f}ms")
Pass the handler when constructing the engine:
from fireflyframework_agentic.pipeline.engine import PipelineEngine
engine = PipelineEngine(dag, event_handler=MyHandler())
result = await engine.run(inputs="test")
This is useful for progress reporting in UIs, sending notifications on failure, or feeding events to an observability pipeline.
Triggers (FolderWatcher)¶
The framework is a pure in-process library: it does not bind a port or consume a
broker. Your host service owns inbound serving and chooses when to call
engine.run(...) / pipeline.invoke(...).
For one common file-arrival trigger, fireflyframework_agentic.pipeline.triggers
ships FolderWatcher — a watchfiles-backed async iterator over new and changed
files under a folder. It debounces events and holds each candidate back until its
size is observed unchanged across stability_polls consecutive polls (a heuristic
for "the writer has finished"), and startup_scan() enumerates pre-existing files
so callers can reconcile against a ledger.
from pathlib import Path
from fireflyframework_agentic.pipeline.triggers import FolderWatcher
watcher = FolderWatcher(Path("/inbox"), debounce_ms=500, stability_polls=2)
async for path in watcher.watch(): # requires `pip install watchfiles`
result = await pipeline.invoke(IngestState(file=str(path)))