WebFilter Chain Guide¶
PyFly uses a WebFilter chain architecture for HTTP request/response processing.
Instead of registering multiple Starlette middlewares independently, all filters
run inside a single WebFilterChainMiddleware, reducing per-middleware task-context
overhead and enabling centralized ordering and URL-pattern matching.
Table of Contents¶
- Architecture
- WebFilter Protocol
- OncePerRequestFilter Base Class
- Built-in Filters
- TransactionIdFilter
- RequestLoggingFilter
- SecurityHeadersFilter
- CsrfFilter
- OAuth2SessionSecurityFilter
- SecurityFilter
- HttpSecurityFilter
- IdempotencyWebFilter
- Filter Ordering with @order
- URL Pattern Matching
- Creating Custom Filters
- Extending OncePerRequestFilter
- Implementing WebFilter Directly
- Auto-Discovery from DI
- WebFilterChainMiddleware Internals
- Complete Example
Architecture¶
The WebFilter chain replaces direct middleware registration:
Request
|
v
WebFilterChainMiddleware (pure ASGI middleware)
|
+-- RequestContextFilter (__pyfly_order__ = HIGHEST_PRECEDENCE)
+-- CorrelationFilter (@order HIGHEST_PRECEDENCE + 50)
+-- TransactionIdFilter (@order HIGHEST_PRECEDENCE + 100)
+-- RequestLoggingFilter (@order HIGHEST_PRECEDENCE + 200)
+-- SecurityFilter (@order HIGHEST_PRECEDENCE + 220, opt-in JWT auth)
+-- OAuth2SessionSecurityFilter (@order HIGHEST_PRECEDENCE + 225, opt-in)
+-- IdempotencyWebFilter (@order HIGHEST_PRECEDENCE + 230, opt-in)
+-- SecurityHeadersFilter (@order HIGHEST_PRECEDENCE + 300)
+-- HttpSecurityFilter (@order HIGHEST_PRECEDENCE + 350, opt-in)
+-- CsrfFilter (__pyfly_order__ = -50, opt-in)
+-- [User WebFilter beans, sorted by @order]
|
v
Route Handler
|
v
Response (filters execute in reverse on the way out)
All filters run within a single middleware boundary. The chain is built from right
to left: the first filter (lowest @order value) wraps the outermost layer.
Source: src/pyfly/web/adapters/starlette/filter_chain.py
WebFilter Protocol¶
The WebFilter protocol (pyfly.web.ports.filter) defines the contract for all
filters. It is framework-agnostic, using Any for request/response types:
from pyfly.web.ports.filter import WebFilter, CallNext
@runtime_checkable
class WebFilter(Protocol):
async def do_filter(self, request: Any, call_next: CallNext) -> Any:
"""Execute filter logic. Must call await call_next(request) to proceed."""
...
def should_not_filter(self, request: Any) -> bool:
"""Return True to skip this filter for the given request."""
...
| Method | Purpose |
|---|---|
do_filter(request, call_next) |
Core filter logic. Call await call_next(request) to invoke the next filter or the route handler. |
should_not_filter(request) |
Return True to bypass this filter for the current request. |
The CallNext type alias is Callable[..., Coroutine[Any, Any, Any]].
Source: src/pyfly/web/ports/filter.py
OncePerRequestFilter Base Class¶
OncePerRequestFilter (pyfly.web.filters) is the recommended base class for
custom filters. It provides automatic URL-pattern matching via url_patterns and
exclude_patterns, so subclasses only need to implement do_filter().
from pyfly.web.filters import OncePerRequestFilter
class OncePerRequestFilter(abc.ABC):
url_patterns: list[str] = [] # Glob patterns to include
exclude_patterns: list[str] = [] # Glob patterns to exclude
def should_not_filter(self, request: Any) -> bool:
"""Automatic path matching using fnmatch."""
...
@abc.abstractmethod
async def do_filter(self, request: Any, call_next: CallNext) -> Any:
"""Subclasses implement this."""
...
| Attribute | Type | Default | Description |
|---|---|---|---|
url_patterns |
list[str] |
[] |
If set, at least one pattern must match the request path. Empty = match all. |
exclude_patterns |
list[str] |
[] |
If any pattern matches, the filter is skipped. Checked after url_patterns. |
Patterns use fnmatch glob syntax: * matches any sequence, ? matches a single
character, [seq] matches character sets.
Source: src/pyfly/web/filters.py
Built-in Filters¶
TransactionIdFilter¶
Propagates or generates a unique X-Transaction-Id header for distributed tracing.
@order(HIGHEST_PRECEDENCE + 100)
class TransactionIdFilter(OncePerRequestFilter):
async def do_filter(self, request, call_next):
tx_id = request.headers.get("X-Transaction-Id") or str(uuid.uuid4())
request.state.transaction_id = tx_id
response = await call_next(request)
response.headers["X-Transaction-Id"] = tx_id
return response
- Checks for an incoming
X-Transaction-Idheader; generates a UUID if absent. - Stores the ID on
request.state.transaction_idfor downstream access. - Adds the header to the response.
Source: src/pyfly/web/adapters/starlette/filters/transaction_id_filter.py
RequestLoggingFilter¶
Logs every HTTP request with structured fields using structlog.
@order(HIGHEST_PRECEDENCE + 200)
class RequestLoggingFilter(OncePerRequestFilter):
async def do_filter(self, request, call_next):
start = time.perf_counter()
response = await call_next(request)
duration_ms = (time.perf_counter() - start) * 1000
logger.info("http_request", method=..., path=..., status_code=..., duration_ms=...)
return response
Logged fields: method, path, status_code, duration_ms, transaction_id.
Failed requests are logged at error level with error and error_type fields.
Disabling per-request access logging. The RequestLoggingFilter is on by default,
but it is the costliest filter (the structlog emit runs on every request). To shave
that per-request footprint, set pyfly.web.request-logging.enabled to false; the
filter is then omitted from the chain entirely. The default is true, and the toggle
is read by create_app() (Starlette and FastAPI adapters) when an
ApplicationContext is supplied.
pyfly:
web:
request-logging:
enabled: false # default: true — omits RequestLoggingFilter from the chain
Source: src/pyfly/web/adapters/starlette/filters/request_logging_filter.py
(toggle wired in src/pyfly/web/adapters/starlette/app.py and .../fastapi/app.py)
SecurityHeadersFilter¶
Adds OWASP-recommended security headers to every response.
@order(HIGHEST_PRECEDENCE + 300)
class SecurityHeadersFilter(OncePerRequestFilter):
def __init__(self, config: SecurityHeadersConfig | None = None):
self._config = config or SecurityHeadersConfig()
async def do_filter(self, request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
# ... more headers from config
return response
Uses SecurityHeadersConfig defaults if no config is provided.
Source: src/pyfly/web/adapters/starlette/filters/security_headers_filter.py
CsrfFilter¶
Implements the double-submit cookie pattern for CSRF protection.
class CsrfFilter(OncePerRequestFilter):
__pyfly_order__ = -50
exclude_patterns = ["/actuator/*", "/health", "/ready"]
async def do_filter(self, request, call_next):
# Safe methods (GET, HEAD, OPTIONS, TRACE): pass through + set XSRF-TOKEN cookie
# Bearer bypass: requests with "Authorization: Bearer ..." skip CSRF
# Unsafe methods (POST, PUT, DELETE, PATCH): validate cookie vs header
...
How it works:
| Request Type | Behaviour |
|---|---|
Safe methods (GET, HEAD, OPTIONS, TRACE) |
Pass through; set/refresh XSRF-TOKEN cookie on the response |
Bearer token present (Authorization: Bearer …) |
Skip CSRF validation (stateless JWT clients are exempt) |
Unsafe methods (POST, PUT, DELETE, PATCH) |
Compare XSRF-TOKEN cookie against X-XSRF-TOKEN header using secrets.compare_digest; return 403 on mismatch or missing token |
Cookie settings: httponly=False (JavaScript must read the token), samesite="lax", secure=True, path="/".
After successful validation on unsafe methods, the token is rotated — a fresh token is set on the response cookie.
Source: src/pyfly/web/adapters/starlette/filters/csrf_filter.py
OAuth2SessionSecurityFilter¶
Restores a SecurityContext from the HTTP session for OAuth2 login-based authentication.
from pyfly.security.oauth2.session_security_filter import OAuth2SessionSecurityFilter
class OAuth2SessionSecurityFilter(OncePerRequestFilter):
__pyfly_order__ = HIGHEST_PRECEDENCE + 225
async def do_filter(self, request, call_next):
session = request.state.session
stored_ctx = session.get_attribute("SECURITY_CONTEXT")
if isinstance(stored_ctx, SecurityContext) and stored_ctx.is_authenticated:
request.state.security_context = stored_ctx
return await call_next(request)
# No session context — set anonymous if no other filter has set one
if not hasattr(request.state, "security_context"):
request.state.security_context = SecurityContext.anonymous()
return await call_next(request)
- Runs at
HIGHEST_PRECEDENCE + 225, after the JWTSecurityFilter(at +220) but before theOAuth2ResourceServerFilter(at +250). - Reads the
SECURITY_CONTEXTattribute from the HTTP session (stored byOAuth2LoginHandlerduring login). - If a valid authenticated context is found, sets it on
request.state.security_context. - Otherwise, sets an anonymous context so downstream filters always have a context available.
- Used in applications with browser-based OAuth2 login flows (authorization_code grant).
Source: src/pyfly/security/oauth2/session_security_filter.py
SecurityFilter¶
Extracts JWT Bearer tokens and populates request.state.security_context.
class SecurityFilter(OncePerRequestFilter):
def __init__(self, jwt_service: JWTService, exclude_patterns: Sequence[str] = ()):
self._jwt_service = jwt_service
self.exclude_patterns = list(exclude_patterns)
async def do_filter(self, request, call_next):
auth_header = request.headers.get("authorization", "")
if auth_header.startswith("Bearer "):
token = auth_header[7:]
security_context = self._jwt_service.to_security_context(token)
else:
security_context = SecurityContext.anonymous()
request.state.security_context = security_context
return await call_next(request)
Uses exclude_patterns to skip public endpoints (e.g., /actuator/*, /docs).
Source: src/pyfly/web/adapters/starlette/filters/security_filter.py
HttpSecurityFilter¶
Enforces URL-pattern authorization rules configured via the HttpSecurity DSL. Returns RFC 7807 problem-detail responses for unauthorized or forbidden requests.
from pyfly.web.adapters.starlette.filters.http_security_filter import HttpSecurityFilter
@order(HIGHEST_PRECEDENCE + 350)
class HttpSecurityFilter(OncePerRequestFilter):
def __init__(self, rules: Sequence[SecurityRule]) -> None:
self._rules = list(rules)
async def do_filter(self, request, call_next):
path = request.url.path
security_context = getattr(request.state, "security_context", SecurityContext.anonymous())
for rule in self._rules:
if _matches(path, rule.patterns):
# Evaluate rule type: PERMIT_ALL, DENY_ALL, AUTHENTICATED, HAS_ROLE, etc.
...
# No rule matched — allow through
return await call_next(request)
- Runs at
HIGHEST_PRECEDENCE + 350, after all authentication filters (SecurityFilter, OAuth2SessionSecurityFilter). - Evaluates rules in declaration order; first match wins.
- If no rule matches, the request is allowed through (open by default).
- Constructed via
HttpSecurity.build()rather than directly.
Rule evaluation:
| Rule Type | Unauthenticated | Authenticated (no matching role/perm) | Authenticated (matching) |
|---|---|---|---|
PERMIT_ALL |
200 (pass) | 200 (pass) | 200 (pass) |
DENY_ALL |
403 | 403 | 403 |
AUTHENTICATED |
401 | 200 (pass) | 200 (pass) |
HAS_ROLE |
401 | 403 | 200 (pass) |
HAS_ANY_ROLE |
401 | 403 | 200 (pass) |
HAS_PERMISSION |
401 | 403 | 200 (pass) |
Source: src/pyfly/web/adapters/starlette/filters/http_security_filter.py
IdempotencyWebFilter¶
Prevents duplicate processing of mutating HTTP requests that carry an
Idempotency-Key request header.
class IdempotencyWebFilter(OncePerRequestFilter):
__pyfly_order__ = HIGHEST_PRECEDENCE + 230
exclude_patterns = ["/actuator/*", "/health", "/ready"]
def __init__(self, cache: CacheAdapter, ttl_seconds: int = 86400) -> None:
...
async def do_filter(self, request, call_next):
# Safe methods (GET, HEAD, OPTIONS, TRACE): pass through
# No Idempotency-Key header present: pass through
# Idempotency-Key present on mutating method:
# cache hit → replay stored response + Idempotency-Replayed: true
# cache miss → execute handler, cache non-5xx response
...
How it works:
| Condition | Behaviour |
|---|---|
Safe method (GET, HEAD, OPTIONS, TRACE) |
Pass through — never cached |
No Idempotency-Key header |
Pass through unchanged |
| Mutating method + key, cache hit | Reconstruct and return the stored response immediately with Idempotency-Replayed: true header |
| Mutating method + key, cache miss | Execute the handler; cache the response (unless opted-out or 5xx); return the live response |
Handler annotated with @disable_idempotency |
Execute the handler; response is never stored |
| Handler returns a 5xx status | Execute the handler; response is never stored (retries re-execute the handler) |
| Streaming / empty response body | Pass through without caching |
Cache key format: idem:{METHOD}:{path}:{Idempotency-Key} — scoped to the
HTTP method, request path, and the client-supplied key value, so the same key on
different paths/methods is treated independently.
Mutating methods subject to idempotency caching: POST, PUT, PATCH, DELETE.
Enabling the filter (opt-in):
pyfly:
web:
idempotency:
enabled: true # required — filter is inactive by default
ttl-seconds: 86400 # optional — default 86400 (24 hours)
cache:
enabled: true # a CacheAdapter bean must be present
A CacheAdapter bean must be present in the DI container. If
pyfly.web.idempotency.enabled=true is set but no cache adapter is available,
startup fails with a descriptive RuntimeError.
Opting a route out with @disable_idempotency:
from pyfly.web.idempotency import disable_idempotency
@post_mapping("/payment")
@disable_idempotency
async def create_payment(self, ...) -> ...:
"""This handler's responses are never stored in the idempotency cache."""
...
The @disable_idempotency decorator sets a sentinel attribute
(__pyfly_disable_idempotency__) on the handler function. After routing, the
filter inspects request.scope["endpoint"] for this attribute; if present the
response is passed through and not written to the cache.
Preserved response headers on replay: content-type, cache-control,
etag, last-modified, location, x-request-id, x-correlation-id,
x-transaction-id. Transport-level headers (transfer-encoding,
content-length) are intentionally excluded.
Source:
- src/pyfly/web/adapters/starlette/filters/idempotency_filter.py
- src/pyfly/web/idempotency.py (disable_idempotency decorator)
- src/pyfly/web/idempotency_auto_configuration.py
Filter Ordering with @order¶
Filters execute in @order value order (lower = runs first). Built-in filters use
HIGHEST_PRECEDENCE (a large negative number) to ensure they run before user filters.
from pyfly.container.ordering import order, HIGHEST_PRECEDENCE
# Built-in order values:
# RequestContextFilter: HIGHEST_PRECEDENCE (request-scoped context)
# CorrelationFilter: HIGHEST_PRECEDENCE + 50 (W3C/correlation headers)
# TransactionIdFilter: HIGHEST_PRECEDENCE + 100
# RequestLoggingFilter: HIGHEST_PRECEDENCE + 200
# SecurityFilter: HIGHEST_PRECEDENCE + 220 (opt-in, JWT authentication)
# OAuth2SessionSecurityFilter: HIGHEST_PRECEDENCE + 225 (opt-in)
# IdempotencyWebFilter: HIGHEST_PRECEDENCE + 230 (opt-in)
# SecurityHeadersFilter: HIGHEST_PRECEDENCE + 300
# HttpSecurityFilter: HIGHEST_PRECEDENCE + 350 (opt-in)
# CsrfFilter: -50 (opt-in)
# User filters default to order 0 (run after built-ins)
@order(10)
class RateLimitFilter(OncePerRequestFilter):
"""Runs after built-in filters but before other user filters."""
...
@order(50)
class AuditLogFilter(OncePerRequestFilter):
"""Runs after RateLimitFilter."""
...
If no @order is specified, the default order value is 0.
URL Pattern Matching¶
OncePerRequestFilter supports glob-based URL pattern matching:
class ApiOnlyFilter(OncePerRequestFilter):
url_patterns = ["/api/*"] # Only run on /api/* paths
exclude_patterns = ["/api/public/*"] # But skip /api/public/*
async def do_filter(self, request, call_next):
# Only executes for /api/* paths (excluding /api/public/*)
return await call_next(request)
Pattern evaluation:
1. If url_patterns is non-empty, at least one pattern must match → otherwise skip.
2. If exclude_patterns is non-empty, any matching pattern → skip.
3. If both are empty → filter runs on all requests.
Creating Custom Filters¶
Extending OncePerRequestFilter¶
The recommended approach — gives you automatic URL-pattern matching:
from pyfly.container import component
from pyfly.container.ordering import order
from pyfly.web.filters import OncePerRequestFilter
from pyfly.web.ports.filter import CallNext
@component
@order(20)
class RateLimitFilter(OncePerRequestFilter):
"""Rate limits API requests to 100 req/min per IP."""
url_patterns = ["/api/*"]
exclude_patterns = ["/api/health"]
async def do_filter(self, request, call_next: CallNext):
client_ip = request.client.host
# Rate limiting logic here...
response = await call_next(request)
response.headers["X-RateLimit-Remaining"] = str(remaining)
return response
Implementing WebFilter Directly¶
For full control without the OncePerRequestFilter base class:
@component
@order(30)
class CorrelationIdFilter:
"""Implements WebFilter protocol directly."""
async def do_filter(self, request, call_next):
correlation_id = request.headers.get("X-Correlation-Id", str(uuid.uuid4()))
request.state.correlation_id = correlation_id
response = await call_next(request)
response.headers["X-Correlation-Id"] = correlation_id
return response
def should_not_filter(self, request) -> bool:
return False # Run on all requests
Auto-Discovery from DI¶
When create_app() is called with an ApplicationContext, it automatically
discovers any beans implementing the WebFilter protocol:
# In create_app():
builtin_filter_types = (
RequestContextFilter, CorrelationFilter,
TransactionIdFilter, RequestLoggingFilter, SecurityHeadersFilter,
)
for _cls, reg in context.container._registrations.items():
inst = reg.instance
if inst is not None and isinstance(inst, WebFilter) and not isinstance(inst, builtin_filter_types):
filters.append(inst)
# Sort all filters (built-in + user) by @order; done in-place so the live list
# (shared with WebFilterChainMiddleware) stays ordered.
filters.sort(key=lambda f: get_order(type(f)))
This means you only need to:
1. Decorate your filter with @component (or any stereotype).
2. Ensure it implements WebFilter (either directly or via OncePerRequestFilter).
3. Optionally set @order for execution priority.
The filter will be automatically included in the chain.
WebFilterChainMiddleware Internals¶
WebFilterChainMiddleware is a pure ASGI middleware (not Starlette's
BaseHTTPMiddleware). This avoids the anyio dependency that caused
ModuleNotFoundError with Granian. The chain is built at request time from right
to left:
class WebFilterChainMiddleware:
def __init__(self, app: ASGIApp, filters: Sequence[WebFilter] = ()) -> None:
self.app = app
self._filters = list(filters)
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] != "http":
await self.app(scope, receive, send)
return
request = Request(scope, receive, send)
async def _call_app(req: Any) -> Response:
# Intercept ASGI send to capture status, headers, and body
...
await self.app(scope, receive, _intercept)
response = Response(content=..., status_code=status_code)
response.raw_headers[:] = raw_headers
return response
chain: CallNext = _call_app
for f in reversed(self._filters):
chain = _wrap(f, chain)
response = await chain(request)
await response(scope, receive, send)
def _wrap(web_filter, next_call):
async def _inner(request):
if web_filter.should_not_filter(request):
return await next_call(request)
return await web_filter.do_filter(request, next_call)
return _inner
The terminal _call_app function captures the downstream ASGI response by
intercepting send calls, buffering body parts, and reconstructing a Starlette
Response object. This preserves the call_next(request) -> Response contract
that WebFilter.do_filter() expects.
The reversed iteration means the first filter in the sorted list becomes the outermost wrapper, executing first on the request and last on the response.
Source: src/pyfly/web/adapters/starlette/filter_chain.py
Complete Example¶
from pyfly.container import component
from pyfly.container.ordering import order
from pyfly.web.filters import OncePerRequestFilter
from pyfly.web.ports.filter import CallNext
@component
@order(10)
class TenantFilter(OncePerRequestFilter):
"""Extracts tenant ID from X-Tenant-Id header for multi-tenancy."""
url_patterns = ["/api/*"]
async def do_filter(self, request, call_next: CallNext):
tenant_id = request.headers.get("x-tenant-id")
if not tenant_id:
from starlette.responses import JSONResponse
return JSONResponse(
{"error": "X-Tenant-Id header is required"},
status_code=400,
)
request.state.tenant_id = tenant_id
return await call_next(request)
@component
@order(50)
class RequestTimingFilter(OncePerRequestFilter):
"""Adds X-Response-Time header to all responses."""
async def do_filter(self, request, call_next: CallNext):
import time
start = time.perf_counter()
response = await call_next(request)
duration_ms = (time.perf_counter() - start) * 1000
response.headers["X-Response-Time"] = f"{duration_ms:.2f}ms"
return response
With these beans registered, create_app() produces this filter chain:
RequestContextFilter (HIGHEST_PRECEDENCE)
CorrelationFilter (HIGHEST_PRECEDENCE + 50)
TransactionIdFilter (HIGHEST_PRECEDENCE + 100)
RequestLoggingFilter (HIGHEST_PRECEDENCE + 200)
SecurityFilter (HIGHEST_PRECEDENCE + 220) [if registered]
OAuth2SessionSecurityFilter (HIGHEST_PRECEDENCE + 225) [if registered]
IdempotencyWebFilter (HIGHEST_PRECEDENCE + 230) [if pyfly.web.idempotency.enabled=true]
SecurityHeadersFilter (HIGHEST_PRECEDENCE + 300)
HttpSecurityFilter (HIGHEST_PRECEDENCE + 350) [if registered]
CsrfFilter (-50) [if registered]
TenantFilter (10)
RequestTimingFilter (50)