OpenFilter Observability System
This document describes the comprehensive observability system in OpenFilter that provides safe, aggregated metrics without PII leakage, automatic histogram bucket generation, and optional raw data export.
Architecture OverviewDirect link to Architecture Overview
graph TB
A[Filter with MetricSpecs] --> B[TelemetryRegistry]
B --> C[OpenTelemetry SDK]
C --> D[MeterProvider]
D --> E[PeriodicExportingMetricReader]
E --> F[OTelLineageExporter]
F --> G[OpenLineage Client]
G --> H[Oleander/Marquez]
I[Frame Processing] --> J[process_frames_metadata]
J --> K[Raw Data Accumulation]
K --> L[OPENLINEAGE_EXPORT_RAW_DATA]
L --> M[Heartbeat Facets]
style A fill:#e1f5fe
style F fill:#fff3e0
style G fill:#f3e5f5
Core PrinciplesDirect link to Core Principles
- No PII in metrics - Only numeric data leaves the process through the allowlist
- Declarative metrics - Filters declare what they want to measure, not how
- Open standards - Uses OpenTelemetry for collection and OpenLineage for export
- Automatic optimization - Smart histogram bucket generation based on metric type
- Allowlist security - Only explicitly approved metrics are exported
- Conditional initialization - OpenLineage only starts when configured
High-Level FlowDirect link to High-Level Flow
1. Filter DeclarationDirect link to 1. Filter Declaration
class MyFilter(Filter):
metric_specs = [
MetricSpec(
name="frames_processed",
instrument="counter",
value_fn=lambda d: 1
),
MetricSpec(
name="detection_confidence",
instrument="histogram",
value_fn=lambda d: d.get("confidence", 0.0),
num_buckets=8 # Auto-generate 8 buckets
)
]
2. Frame ProcessingDirect link to 2. Frame Processing
def process_frames_metadata(self, frames):
# Automatic metric recording based on MetricSpecs
if hasattr(self, '_telemetry_registry'):
self._telemetry_registry.record_metrics(frames)
# Optional raw data accumulation
if self._export_raw_data and hasattr(self, 'emitter') and self.emitter is not None:
# Accumulate frame data for potential export
self._accumulate_raw_data(frames)
3. OpenTelemetry ProcessingDirect link to 3. OpenTelemetry Processing
# OpenTelemetry SDK handles metric collection and export
# - Counters: Running totals (monotonic values)
# - Histograms: Value distributions with bucket boundaries
# - Gauges: Current/latest values (non-monotonic)
4. Bridge ExportDirect link to 4. Bridge Export
class OTelLineageExporter(MetricExporter):
def export(self, metrics_data, **kwargs):
# Convert OpenTelemetry metrics to OpenLineage facets
facets = {}
for metric in metrics_data:
if self._is_allowed(metric.name): # Check allowlist
# Convert histograms with numeric buckets/counts
if metric.type == "histogram":
facets[f"{metric.name}_histogram"] = {
"buckets": [float(b) for b in metric.buckets],
"counts": [int(c) for c in metric.counts],
"count": metric.count,
"sum": metric.sum
}
# Send to OpenLineage
self._lineage.update_heartbeat(facets)
Detailed Component AnalysisDirect link to Detailed Component Analysis
MetricSpec DeclarationDirect link to MetricSpec Declaration
The MetricSpec dataclass provides a declarative way to define metrics:
@dataclass
class MetricSpec:
name: str # Metric name
instrument: str # 'counter', 'histogram', 'gauge'
value_fn: Callable[[dict], Union[int, float, None]] # Value extraction
boundaries: Optional[List[Union[int, float]]] = None # Custom histogram buckets
num_buckets: int = 10 # Auto-generated buckets
_otel_inst: Optional[Instrument] = None # Internal OpenTelemetry instrument
Value Extraction FunctionsDirect link to Value Extraction Functions
# Simple counter
value_fn=lambda d: 1
# Conditional counter
value_fn=lambda d: 1 if d.get("detections") else 0
# Array length
value_fn=lambda d: len(d.get("detections", []))
# Nested value
value_fn=lambda d: d.get("results", {}).get("confidence", 0.0)
# Computed value
value_fn=lambda d: d.get("processing_time", 0) * 1000 # Convert to ms
Automatic Histogram Bucket GenerationDirect link to Automatic Histogram Bucket Generation
The system automatically generates histogram buckets based on metric type:
def generate_histogram_buckets(num_buckets: int, min_val: float = 0.0, max_val: float = 100.0):
"""Generate logarithmic bucket boundaries."""
if min_val <= 0:
min_val = 0.1 # Avoid log(0)
num_boundaries = num_buckets - 1
log_min = math.log(min_val)
log_max = math.log(max_val)
log_step = (log_max - log_min) / num_boundaries
boundaries = []
for i in range(num_boundaries):
boundary = math.exp(log_min + i * log_step)
boundaries.append(boundary)
return boundaries
Bucket Generation by Metric TypeDirect link to Bucket Generation by Metric Type
| Metric Name Pattern | Min | Max | Example Buckets |
|---|---|---|---|
*confidence* | 0.0 | 1.0 | [0.1, 0.3, 0.5, 0.7, 0.9] |
*detection* | 0.0 | 50.0 | [0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 25.0] |
*time* | 0.0 | 1000.0 | [1.0, 5.0, 10.0, 25.0, 50.0, 100.0, 250.0, 500.0] |
*size* | 0.0 | 1.0 | [0.01, 0.05, 0.1, 0.2, 0.5, 0.8] |
TelemetryRegistryDirect link to TelemetryRegistry
The TelemetryRegistry manages metric recording:
class TelemetryRegistry:
def __init__(self, metric_specs: List[MetricSpec]):
self.meter = get_meter(__name__)
self.specs = {}
for spec in metric_specs:
# Create OpenTelemetry instrument
if spec.instrument == "counter":
inst = self.meter.create_counter(spec.name)
elif spec.instrument == "histogram":
# Use provided boundaries or auto-generate
boundaries = spec.boundaries or self._generate_buckets(spec)
inst = self.meter.create_histogram(spec.name, boundaries=boundaries)
elif spec.instrument == "gauge":
inst = self.meter.create_observable_gauge(spec.name)
spec._otel_inst = inst
self.specs[spec.name] = spec
def record_metrics(self, frames: Dict[str, Frame]):
"""Record metrics for all frames."""
for frame_id, frame in frames.items():
if hasattr(frame, 'data') and isinstance(frame.data, dict):
for spec in self.specs.values():
value = spec.value_fn(frame.data)
if value is not None:
if spec.instrument == "counter":
spec._otel_inst.add(value)
elif spec.instrument == "histogram":
spec._otel_inst.record(value)
Raw Data Export FlowDirect link to Raw Data Export Flow
When OPENLINEAGE_EXPORT_RAW_DATA=true:
# 1. Frame processing accumulates raw data
def process_frames_metadata(self, frames):
if self._export_raw_data and hasattr(self, 'emitter') and self.emitter is not None:
raw_frame_data = {}
timestamp = time.time()
for frame_id, frame in frames.items():
if hasattr(frame, 'data') and isinstance(frame.data, dict):
unique_key = f"{frame_id}_{self._frame_counter}"
frame_data_copy = frame.data.copy()
frame_data_copy.update({
'_timestamp': timestamp,
'_frame_id': frame_id,
'_unique_key': unique_key,
'_frame_number': self._frame_counter
})
raw_frame_data[unique_key] = frame_data_copy
self._frame_counter += 1
# Accumulate in emitter
if not hasattr(self.emitter, '_last_frame_data'):
self.emitter._last_frame_data = {}
self.emitter._last_frame_data.update(raw_frame_data)
# Limit stored frames to prevent memory issues
if len(self.emitter._last_frame_data) > 100:
keys_to_remove = list(self.emitter._last_frame_data.keys())[:-100]
for key in keys_to_remove:
del self.emitter._last_frame_data[key]
# 2. Bridge exports raw data in heartbeats
def export(self, metrics_data, **kwargs):
facets = self._convert_metrics_to_facets(metrics_data)
if self._export_raw_data and hasattr(self._lineage, '_last_frame_data'):
raw_data = getattr(self._lineage, '_last_frame_data', {})
if raw_data:
facets["raw_subject_data"] = raw_data
# Clear after sending to prevent memory buildup
self._lineage._last_frame_data = {}
return self._lineage.update_heartbeat(facets)
Safe Metrics Allowlist SystemDirect link to Safe Metrics Allowlist System
PurposeDirect link to Purpose
The allowlist system controls exactly which metrics are exported to OpenLineage, providing security and reducing data volume. Only explicitly approved metrics leave the process.
How It WorksDirect link to How It Works
- Metric Declaration: Filters declare metrics via
MetricSpec(e.g.,detection_confidence) - Allowlist Check: During export, each metric name is checked against allowlist patterns
- Wildcard Support: Patterns like
customprocessor_*and*_fpsare supported - Histogram Naming: For histograms, use base names (bridge adds
_histogramsuffix automatically) - Export Control: Only matching metrics are included in OpenLineage facets
ExamplesDirect link to Examples
Basic AllowlistDirect link to Basic Allowlist
# safe_metrics.yaml
safe_metrics:
- frames_processed # Counter
- frames_with_detections # Counter
- detection_confidence # Histogram (becomes detection_confidence_histogram)
- processing_time_ms # Histogram (becomes processing_time_ms_histogram)
Wildcard PatternsDirect link to Wildcard Patterns
safe_metrics:
- customprocessor_* # All metrics starting with customprocessor_
- "*_fps" # All metrics ending with _fps
- "*_histogram" # All histogram metrics
- "detection_*" # All detection-related metrics
All-in-One ConfigurationDirect link to All-in-One Configuration
# OpenLineage Configuration
openlineage:
url: "https://oleander.dev"
api_key: "your_api_key"
heartbeat_interval: 10
# Safe Metrics
safe_metrics:
- frames_processed
- detection_confidence
- customprocessor_*
Configuration OptionsDirect link to Configuration Options
Environment VariablesDirect link to Environment Variables
Core ObservabilityDirect link to Core Observability
# Enable/disable telemetry system
export TELEMETRY_EXPORTER_ENABLED=true
# Safe metrics allowlist - only these metrics are exported to OpenLineage
export OF_SAFE_METRICS="frames_processed,frames_with_detections,detection_confidence"
# Or use YAML file for complex patterns (recommended)
export OF_SAFE_METRICS_FILE=/path/to/safe_metrics.yaml
OpenLineage IntegrationDirect link to OpenLineage Integration
# Server configuration
export OPENLINEAGE_URL="https://oleander.dev"
export OPENLINEAGE_API_KEY="your_api_key"
export OPENLINEAGE_ENDPOINT="/api/v1/lineage"
export OPENLINEAGE_PRODUCER="https://github.com/PlainsightAI/openfilter"
# Heartbeat interval (seconds)
export OPENLINEAGE__HEART__BEAT__INTERVAL=10
# Optional: Export raw subject data (disabled by default)
export OPENLINEAGE_EXPORT_RAW_DATA=false
OpenTelemetry ExportDirect link to OpenTelemetry Export
# Export type: console, gcm, otlp_http, otlp_grpc, prometheus
export TELEMETRY_EXPORTER_TYPE=console
export EXPORT_INTERVAL=3000 # milliseconds
# For OTLP exporters (send to observability platforms)
export TELEMETRY_EXPORTER_OTLP_ENDPOINT="http://localhost:4317" # gRPC endpoint
export OTEL_EXPORTER_OTLP_HTTP_ENDPOINT="http://localhost:4318" # HTTP endpoint
# For Google Cloud Monitoring
export PROJECT_ID="your-gcp-project"
# For Prometheus
export PROMETHEUS_PORT=8888
OTLP IntegrationDirect link to OTLP Integration
Supported PlatformsDirect link to Supported Platforms
OTLP (OpenTelemetry Protocol) allows sending metrics to various observability platforms:
- Jaeger - Distributed tracing
- Grafana/Prometheus - Metrics and dashboards
- Datadog - Application monitoring
- New Relic - Full-stack observability
- Honeycomb - Observability for modern applications
- Any OTEL Collector - Gateway to multiple backends
OTLP Configuration ExamplesDirect link to OTLP Configuration Examples
Jaeger (HTTP)Direct link to Jaeger (HTTP)
export TELEMETRY_EXPORTER_TYPE=otlp_http
export TELEMETRY_EXPORTER_OTLP_ENDPOINT="http://jaeger:14268/api/traces"
Grafana Cloud (gRPC)Direct link to Grafana Cloud (gRPC)
export TELEMETRY_EXPORTER_TYPE=otlp_grpc
export TELEMETRY_EXPORTER_OTLP_ENDPOINT="https://otlp-gateway.grafana.net:443"
export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Basic <base64-encoded-token>"
Local OTEL CollectorDirect link to Local OTEL Collector
export TELEMETRY_EXPORTER_TYPE=otlp_grpc
export TELEMETRY_EXPORTER_OTLP_ENDPOINT="http://localhost:4317"
YAML ConfigurationDirect link to YAML Configuration
# safe_metrics.yaml
safe_metrics:
- frames_*
- *_histogram
- *_counter
- detection_confidence
- processing_time_ms
- memory_usage_mb
Security FeaturesDirect link to Security Features
1. Allowlist EnforcementDirect link to 1. Allowlist Enforcement
def _is_metric_allowed(self, metric_name: str) -> bool:
"""Check if metric is in allowlist."""
if not self._safe_metrics:
return False # Lock-down mode
for pattern in self._safe_metrics:
if fnmatch.fnmatch(metric_name, pattern):
return True
return False
2. PII PreventionDirect link to 2. PII Prevention
- Only numeric values are exported
- No raw text, images, or structured data
- All values are aggregated (counters, histograms, gauges)
- Raw data export is optional and controlled
3. Runtime ValidationDirect link to 3. Runtime Validation
def export(self, metrics_data, **kwargs):
"""Export metrics with validation."""
facets = {}
for metric in metrics_data.resource_metrics:
for scope_metrics in metric.scope_metrics:
for metric_data in scope_metrics.metrics:
metric_name = metric_data.name
# Validate against allowlist
if not self._is_metric_allowed(metric_name):
logger.warning(f"Metric '{metric_name}' not in allowlist, skipping")
continue
# Convert to facet
facets[metric_name] = self._convert_metric_to_facet(metric_data)
return facets
Example ImplementationsDirect link to Example Implementations
Basic License Plate DetectorDirect link to Basic License Plate Detector
from openfilter.filter_runtime.filter import Filter
from openfilter.observability import MetricSpec
import time
class LicensePlateDetector(Filter):
metric_specs = [
MetricSpec(
name="frames_processed",
instrument="counter",
value_fn=lambda d: 1
),
MetricSpec(
name="frames_with_plates",
instrument="counter",
value_fn=lambda d: 1 if d.get("plates") else 0
),
MetricSpec(
name="plates_per_frame",
instrument="histogram",
value_fn=lambda d: len(d.get("plates", [])),
num_buckets=8
),
MetricSpec(
name="detection_confidence",
instrument="histogram",
value_fn=lambda d: d.get("confidence", 0.0),
num_buckets=8
),
MetricSpec(
name="processing_time_ms",
instrument="histogram",
value_fn=lambda d: d.get("processing_time", 0),
num_buckets=10
)
]
def process(self, frames):
for frame_id, frame in frames.items():
start_time = time.time()
# Process frame
frame.data["plates"] = self._detect_plates(frame)
frame.data["confidence"] = self._get_confidence(frame)
frame.data["processing_time"] = (time.time() - start_time) * 1000
return frames
Advanced OCR ProcessorDirect link to Advanced OCR Processor
class OCRProcessor(Filter):
metric_specs = [
MetricSpec(
name="frames_processed",
instrument="counter",
value_fn=lambda d: 1
),
MetricSpec(
name="frames_with_text",
instrument="counter",
value_fn=lambda d: 1 if d.get("ocr_text") else 0
),
MetricSpec(
name="text_length",
instrument="histogram",
value_fn=lambda d: len(d.get("ocr_text", "")),
num_buckets=10
),
MetricSpec(
name="ocr_confidence",
instrument="histogram",
value_fn=lambda d: d.get("ocr_confidence", 0.0),
boundaries=[0.0, 0.5, 0.7, 0.8, 0.9, 1.0] # Custom buckets
),
MetricSpec(
name="memory_usage_mb",
instrument="gauge",
value_fn=lambda d: d.get("memory_usage", 0)
)
]
Expected TimelineDirect link to Expected Timeline
With default settings (OPENLINEAGE__HEART__BEAT__INTERVAL=10):
- 0-10 seconds: Empty or minimal payloads (system warming up)
- 10-20 seconds: Some metrics start appearing
- 20+ seconds: Full metric payloads with meaningful data
This is normal behavior and ensures that metrics are properly aggregated before being sent to Oleander.
TroubleshootingDirect link to Troubleshooting
Common IssuesDirect link to Common Issues
-
No metrics appearing in Oleander
- Check
TELEMETRY_EXPORTER_ENABLED=true - Verify
OF_SAFE_METRICSincludes your metric names - Ensure
OPENLINEAGE_URLis set correctly
- Check
-
Histogram bucket mismatch
- Verify
len(counts) = len(boundaries) + 1 - Check automatic bucket generation logic
- Use
num_bucketsfor auto-generation orboundariesfor custom
- Verify
-
Raw data not appearing
- Set
OPENLINEAGE_EXPORT_RAW_DATA=true - Check that frames have data in
frame.data - Verify memory limits aren't exceeded
- Set
-
OpenLineage not starting
- Ensure
OPENLINEAGE_URLis set - Check API key and endpoint configuration
- Verify network connectivity
- Ensure
Debug LoggingDirect link to Debug Logging
Enable debug logging to trace metric flow:
export FILTER_DEBUG=true
export OPENLINEAGE_EXPORT_RAW_DATA=true
This will show:
- Metric recording events
- Raw data accumulation
- Bridge export operations
- OpenLineage heartbeat emissions
BenefitsDirect link to Benefits
- Standards compliance: Uses OpenTelemetry for aggregation
- Reusable: Same declaration mechanism works for all filters
- Safe: Zero PII risk through allowlist and numeric-only export
- Flexible: Easy to add new metrics without code changes
- Automatic: Smart bucket generation and optimization
- Optional: Raw data export for debugging when needed
- Conditional: OpenLineage only starts when configured
- Backward compatible: Existing filters work without changes
User sets OF_EXPORT_RAW_DATA=true
↓
Filter.__init__() reads env var → stores as self._export_raw_data
↓
Bridge.__init__() reads env var → stores as self._export_raw_data
↓
Frame processing → process_frames_metadata()
↓
if self._export_raw_data: copy frame.data → emitter._last_frame_data
↓
OpenTelemetry aggregation → Bridge.export()
↓
if self._export_raw_data: add raw_subject_data to facet
↓
OpenLineage heartbeat → Oleander receives raw data
{
"frames_processed": 150,
"frames_with_detections": 89,
"detections_per_frame_histogram": {...},
"raw_subject_data": {
"main": {
"detections": [
{
"id": 0,
"class": "person",
"confidence": 0.85,
"bbox": [0.1, 0.2, 0.3, 0.4]
}
],
"num_detections": 1,
"avg_confidence": 0.85,
"processing_time": 23.4,
"size_ratio": 0.15,
"timestamp": 1234567890.123
}
}
}
Expected Timeline With default settings (OPENLINEAGE__HEART__BEAT__INTERVAL=10): 0-10 seconds: Empty or minimal payloads 10-20 seconds: Some metrics start appearing 20+ seconds: Full metric payloads with meaningful data This is normal behavior and ensures that the metrics are properly aggregated before being sent to Oleander. The empty early events are just the system warming up!