Skip to main content

MQTT Bridge Filter

The MQTT Bridge filter is an output filter for OpenFilter that publishes incoming frame data and images to MQTT brokers. It supports flexible topic mapping, QoS settings, message retention, and automatic serialization of various data types including images as base64-encoded data.

OverviewDirect link to Overview

The MQTT Bridge filter is designed to handle MQTT publishing scenarios where you need to:

  • Publish frame data and images to MQTT brokers
  • Map OpenFilter topics to MQTT topics with flexible routing
  • Control message quality of service (QoS) and retention
  • Serialize complex data structures including images
  • Handle authentication and connection management
  • Support sampling intervals for rate-limited publishing
  • Integrate with IoT platforms and monitoring systems

Key FeaturesDirect link to Key Features

  • Flexible Topic Mapping: Map OpenFilter topics to MQTT topics with path-based routing
  • Image Publishing: Automatically encode images as base64 JPEG data
  • Data Serialization: Handle complex nested data structures and arrays
  • QoS Control: Configurable Quality of Service levels (0, 1, 2)
  • Message Retention: Control message retention on broker
  • Authentication: Support for username/password authentication
  • Connection Management: Automatic reconnection with exponential backoff
  • Sampling Intervals: Rate-limited publishing for high-frequency data
  • Multiple Data Types: Support for JSON, binary, and custom data formats

ConfigurationDirect link to Configuration

Basic ConfigurationDirect link to Basic Configuration

from openfilter.filter_runtime.filter import Filter
from openfilter.filter_runtime.filters.mqtt_out import MQTTOut

# Simple MQTT publishing
Filter.run_multi([
# ... other filters above
(MQTTOut, dict(
sources='tcp://localhost:5550',
outputs='mqtt://localhost:1883/my_base_topic',
)),
])

Advanced Configuration with Topic MappingDirect link to Advanced Configuration with Topic Mapping

# Complex topic mapping with QoS and retention
Filter.run_multi([
# ... other filters above
(MQTTOut, dict(
sources='tcp://localhost:5550',
broker_host='mqtt.example.com',
broker_port=8883,
username='myuser',
password='mypass',
base_topic='sensors/camera',
mappings=[
'main/image > frames!qos=0',
'main/data > metadata!qos=2!retain=true',
'detection/data/detections > detections!qos=1',
],
interval=2.0, # Sample every 2 seconds
)),
])

Environment VariablesDirect link to Environment Variables

You can configure via environment variables:

export FILTER_SOURCES="tcp://localhost:5550"
export FILTER_BROKER_HOST="localhost"
export FILTER_BROKER_PORT="1883"
export FILTER_BASE_TOPIC="my_app/sensors"
export FILTER_INTERVAL="1.0"
export DEBUG_MQTT="true"

MQTT Connection ConfigurationDirect link to MQTT Connection Configuration

Broker SettingsDirect link to Broker Settings

  • broker_host: MQTT broker hostname (default: 'localhost')
  • broker_port: MQTT broker port (default: 1883)
  • username: Authentication username (optional)
  • password: Authentication password (optional)
  • client_id: MQTT client ID (auto-generated if not specified)
  • keepalive: Keep-alive interval in seconds (default: 60)

Connection ManagementDirect link to Connection Management

  • Automatic Reconnection: Exponential backoff on connection failures
  • Connection Timeout: Configurable timeout settings
  • Error Handling: Graceful handling of connection issues
  • Client ID Management: Automatic or custom client ID generation

Topic MappingDirect link to Topic Mapping

The MQTTOut filter provides flexible topic mapping between OpenFilter topics and MQTT topics:

Basic Mapping SyntaxDirect link to Basic Mapping Syntax

"src_topic/src_path > dst_topic !qos=1 !retain=true"

Mapping ComponentsDirect link to Mapping Components

Source SpecificationDirect link to Source Specification

  • topic: OpenFilter topic name (e.g., 'main', 'camera1', 'detection')
  • topic/image: Frame image data
  • topic/data: Frame metadata dictionary
  • topic/data/subfield: Specific field in metadata
  • topic/data/nested/field: Deep nested field access

Destination SpecificationDirect link to Destination Specification

  • > topic_name: MQTT topic name (relative to base_topic)
  • No destination: Uses default topic names based on source path

OptionsDirect link to Options

  • !qos=0|1|2: Quality of Service level
  • !retain=true|false: Message retention flag

Mapping ExamplesDirect link to Mapping Examples

Simple Topic MappingDirect link to Simple Topic Mapping

mappings = [
'main', # main/image → base_topic/frames, main/data → base_topic/data
'camera1/image > camera1', # camera1/image → base_topic/camera1
'detection/data > det', # detection/data → base_topic/det
]

Path-Based MappingDirect link to Path-Based Mapping

mappings = [
'main/image > frames', # Image data to frames topic
'main/data > metadata', # All metadata to metadata topic
'main/data/detections > detections', # Only detections field
'main/data/timestamp > timestamp', # Only timestamp field
]

Advanced Mapping with OptionsDirect link to Advanced Mapping with Options

mappings = [
'main/image > frames!qos=0', # Images with QoS 0
'main/data > metadata!qos=2!retain=true', # Metadata with QoS 2 and retention
'detection/data/detections > detections!qos=1', # Detections with QoS 1
]

Nested Data AccessDirect link to Nested Data Access

mappings = [
'camera1/data/sensors/temperature > temp',
'camera1/data/sensors/humidity > humidity',
'camera1/data/status/battery > battery',
'camera1/data/config/interval > config_interval',
]

Data SerializationDirect link to Data Serialization

The MQTTOut filter automatically handles serialization of various data types:

Supported Data TypesDirect link to Supported Data Types

  • JSON Objects: Dictionaries and nested structures
  • Arrays: Lists and tuples
  • Images: Base64-encoded JPEG data
  • Binary Data: Base64-encoded bytes
  • Primitives: Strings, integers, floats
  • Datetime: ISO format timestamps
  • NumPy Arrays: Converted to appropriate format

Image HandlingDirect link to Image Handling

  • Automatic Encoding: Images are encoded as JPEG and then base64
  • Format Detection: Supports various input formats (BGR, RGB, grayscale)
  • Compression: Uses JPEG compression for efficient transmission
  • Metadata: Image dimensions and format information included

Data Structure ExamplesDirect link to Data Structure Examples

Input Frame DataDirect link to Input Frame Data

frame.data = {
'detections': [
{'class': 'person', 'confidence': 0.95, 'bbox': [100, 200, 300, 400]},
{'class': 'car', 'confidence': 0.87, 'bbox': [500, 100, 700, 300]}
],
'timestamp': 1640995200.123,
'camera_id': 'cam001',
'temperature': 23.5
}

MQTT OutputDirect link to MQTT Output

{
"detections": [
{"class": "person", "confidence": 0.95, "bbox": [100, 200, 300, 400]},
{"class": "car", "confidence": 0.87, "bbox": [500, 100, 700, 300]}
],
"timestamp": "2022-01-01T00:00:00.123000",
"camera_id": "cam001",
"temperature": 23.5
}

QoS and Message RetentionDirect link to QoS and Message Retention

Quality of Service (QoS) LevelsDirect link to Quality of Service (QoS) Levels

  • QoS 0: At most once delivery (fire and forget)
  • QoS 1: At least once delivery (acknowledged)
  • QoS 2: Exactly once delivery (assured)

Default QoS SettingsDirect link to Default QoS Settings

  • Images: QoS 0 (fastest, suitable for real-time video)
  • Data: QoS 2 (most reliable, suitable for critical metadata)
  • Customizable: Override per mapping

Message RetentionDirect link to Message Retention

  • Retain Flag: Messages persist on broker for new subscribers
  • Use Cases: Configuration data, status information
  • Performance: Retained messages consume broker memory

Sampling and Rate LimitingDirect link to Sampling and Rate Limiting

Sampling IntervalsDirect link to Sampling Intervals

interval=2.0  # Sample and publish every 2 seconds

BehaviorDirect link to Behavior

  • Accumulation: Frames are accumulated during interval
  • Latest Data: Only the latest frame per topic is published
  • Rate Limiting: Prevents overwhelming MQTT broker
  • Efficiency: Reduces network traffic and processing load

Use CasesDirect link to Use Cases

  • High-Frequency Data: Video streams, sensor readings
  • Resource Constraints: Limited bandwidth or broker capacity
  • Monitoring: Periodic status updates
  • Logging: Regular data snapshots

Usage ExamplesDirect link to Usage Examples

Example 1: Basic Image PublishingDirect link to Example 1: Basic Image Publishing

Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='file://input.mp4',
outputs='tcp://*:5550',
)),
(MQTTOut, dict(
sources='tcp://localhost:5550',
outputs='mqtt://localhost:1883/camera/stream',
mappings=['main/image > frames!qos=0'],
)),
])

Behavior: Publishes video frames as base64-encoded JPEG data to MQTT.

Example 2: Sensor Data PublishingDirect link to Example 2: Sensor Data Publishing

Filter.run_multi([
# ... other filters above
(REST, dict(
sources='http://localhost:8000',
outputs='tcp://*:5550',
)),
(MQTTOut, dict(
sources='tcp://localhost:5550',
broker_host='iot.example.com',
base_topic='sensors/weather',
mappings=[
'main/data/temperature > temperature!qos=2!retain=true',
'main/data/humidity > humidity!qos=2!retain=true',
'main/data/pressure > pressure!qos=2!retain=true',
],
)),
])

Behavior: Publishes weather sensor data with QoS 2 and retention.

Example 3: Object Detection ResultsDirect link to Example 3: Object Detection Results

Filter.run_multi([
# ... other filters above
(ObjectDetectionFilter, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
(MQTTOut, dict(
sources='tcp://localhost:5552',
base_topic='ai/detections',
mappings=[
'main/image > frames!qos=0',
'main/data/detections > detections!qos=1',
'main/data/confidence > confidence!qos=1',
],
interval=1.0, # 1 FPS for detections
)),
])

Behavior: Publishes detection results with images and metadata.

Example 4: Multi-Camera SetupDirect link to Example 4: Multi-Camera Setup

Filter.run_multi([
# ... other filters above
(VideoIn, dict(
sources='file://camera1.mp4;cam1, file://camera2.mp4;cam2',
outputs='tcp://*:5550',
)),
(MQTTOut, dict(
sources='tcp://localhost:5550',
base_topic='surveillance',
mappings=[
'cam1/image > camera1/frames!qos=0',
'cam1/data > camera1/meta!qos=1',
'cam2/image > camera2/frames!qos=0',
'cam2/data > camera2/meta!qos=1',
],
)),
])

Behavior: Publishes multiple camera streams to separate MQTT topics.

Example 5: Configuration and StatusDirect link to Example 5: Configuration and Status

Filter.run_multi([
# ... other filters above
(SystemMonitor, dict(
sources='internal://system',
outputs='tcp://*:5550',
)),
(MQTTOut, dict(
sources='tcp://localhost:5550',
base_topic='system/status',
mappings=[
'main/data/cpu > cpu_usage!qos=2!retain=true',
'main/data/memory > memory_usage!qos=2!retain=true',
'main/data/disk > disk_usage!qos=2!retain=true',
'main/data/temperature > temperature!qos=2!retain=true',
],
interval=30.0, # Status every 30 seconds
)),
])

Behavior: Publishes system status with retention for monitoring.

Example 6: Alert SystemDirect link to Example 6: Alert System

Filter.run_multi([
# ... other filters above
(AnomalyDetection, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
(MQTTOut, dict(
sources='tcp://localhost:5552',
base_topic='alerts',
mappings=[
'main/data/alert > alerts!qos=2!retain=true',
'main/image > alert_images!qos=1',
'main/data/timestamp > alert_timestamp!qos=2',
],
)),
])

Behavior: Publishes alerts with high reliability and retention.

Error Handling and ReliabilityDirect link to Error Handling and Reliability

Connection ManagementDirect link to Connection Management

  • Automatic Reconnection: Exponential backoff on failures
  • Connection Validation: Periodic health checks
  • Graceful Degradation: Continues processing during outages
  • Error Logging: Detailed connection and publishing logs

Message PublishingDirect link to Message Publishing

  • Queue Management: Internal queue for reliable delivery
  • Retry Logic: Automatic retry for failed publishes
  • Backpressure: Handles broker congestion gracefully
  • Message Validation: Ensures data integrity

Common Error ScenariosDirect link to Common Error Scenarios

  • Broker Unavailable: Automatic reconnection with backoff
  • Authentication Failure: Clear error messages and retry
  • Network Issues: Graceful handling of timeouts
  • Message Too Large: Automatic chunking or compression

Performance ConsiderationsDirect link to Performance Considerations

Network OptimizationDirect link to Network Optimization

  • Compression: JPEG compression for images
  • Sampling: Rate limiting for high-frequency data
  • Batch Publishing: Efficient message batching
  • Connection Pooling: Reuse connections when possible

Memory ManagementDirect link to Memory Management

  • Streaming: No large data buffering
  • Garbage Collection: Automatic cleanup of temporary data
  • Memory Limits: Configurable queue sizes
  • Resource Monitoring: Built-in memory usage tracking

Broker LoadDirect link to Broker Load

  • QoS Selection: Choose appropriate QoS levels
  • Retention Policy: Minimize retained messages
  • Topic Structure: Efficient topic hierarchy
  • Message Size: Optimize payload sizes

Security ConsiderationsDirect link to Security Considerations

AuthenticationDirect link to Authentication

  • Username/Password: Basic MQTT authentication
  • TLS/SSL: Encrypted connections (port 8883)
  • Client Certificates: Certificate-based authentication
  • Access Control: Broker-level topic permissions

Data PrivacyDirect link to Data Privacy

  • Image Anonymization: Remove sensitive data before publishing
  • Field Filtering: Publish only necessary data fields
  • Encryption: Use TLS for data in transit
  • Access Logging: Monitor data access patterns

TroubleshootingDirect link to Troubleshooting

Common IssuesDirect link to Common Issues

Connection ProblemsDirect link to Connection Problems

  1. Check broker hostname and port
  2. Verify network connectivity
  3. Validate authentication credentials
  4. Check firewall settings

Publishing IssuesDirect link to Publishing Issues

  1. Verify topic mapping syntax
  2. Check message size limits
  3. Validate QoS settings
  4. Monitor broker capacity

Data Format IssuesDirect link to Data Format Issues

  1. Check serialization compatibility
  2. Validate JSON structure
  3. Verify image encoding
  4. Test with simple data first

Debug ConfigurationDirect link to Debug Configuration

import logging
logging.basicConfig(level=logging.DEBUG)

# Enable MQTT debugging
export DEBUG_MQTT=true

Monitoring and LoggingDirect link to Monitoring and Logging

  • Connection Status: Monitor broker connectivity
  • Publish Rates: Track message throughput
  • Error Rates: Monitor failed publishes
  • Message Sizes: Track payload sizes

Advanced UsageDirect link to Advanced Usage

Ephemeral Sources for Non-Blocking ProcessingDirect link to Ephemeral Sources for Non-Blocking Processing

The MQTT-out filter supports ephemeral sources using the ? and ?? syntax. This is particularly useful for monitoring, metrics collection, and side-channel processing without affecting the main pipeline performance.

Single Ephemeral (?)Direct link to single-ephemeral-

Filter.run_multi([
# ... other filters above
(MQTTOut, dict(
sources='tcp://localhost:5550?;main', # Single ? for ephemeral connection
outputs='mqtt://localhost:1883/monitoring',
mappings=['main/data > status!qos=2'],
interval=5, # Sample every 5 seconds
)),
])

Behavior:

  • Does not participate in frame synchronization
  • Can request frames without blocking the main pipeline
  • Messages may be dropped if processing is too slow
  • Perfect for monitoring, logging, or side-channel analysis

Doubly Ephemeral (??)Direct link to doubly-ephemeral-

Filter.run_multi([
# ... other filters above
(MQTTOut, dict(
sources=[
'tcp://localhost:6550?? ; _metrics > m_vidin', # Doubly ephemeral for metrics
'tcp://localhost:6560?? ; _metrics > m_split', # No upstream notification
'tcp://localhost:5550?? ; main > frames!qos=0', # Silent monitoring
],
outputs='mqtt://localhost:1883/metrics',
mappings=[
'm_vidin/data > m_vidin',
'm_split/data > m_split',
'frames/data > frames',
],
interval=10, # Sample every 10 seconds
)),
])

Behavior:

  • Silent listener - doesn't even notify upstream of its existence
  • No flow control or synchronization
  • Ideal for metrics collection, debugging, or passive monitoring
  • Never affects upstream filter performance

Use CasesDirect link to Use Cases

1. Metrics Collection

# Collect system metrics without affecting video processing
Filter.run_multi([
# ... other filters above
(MQTTOut, dict(
sources='tcp://localhost:5550??;_metrics', # Silent metrics collection
outputs='mqtt://localhost:1883/system',
mappings=['_metrics/data > metrics!retain=true'],
interval=5,
)),
])

2. Side-Channel Analysis

# Process expensive AI analysis without slowing main pipeline
Filter.run_multi([
# ... other filters above
(MQTTOut, dict(
sources='tcp://localhost:5550?;main', # Ephemeral for slow processing
outputs='mqtt://localhost:1883/analysis',
mappings=['main/image > ai_input!qos=0'],
interval=2, # Lower frequency to handle slow processing
)),
])

3. Debug Monitoring

# Monitor pipeline without affecting production flow
Filter.run_multi([
# ... other filters above
(MQTTOut, dict(
sources='tcp://localhost:5550??;*', # Silent monitoring of all topics
outputs='mqtt://localhost:1883/debug',
mappings=[
'*/data > debug_data!qos=0',
'*/image > debug_frames!qos=0',
],
interval=1, # High frequency for debugging
)),
])

Custom SerializationDirect link to Custom Serialization

# The filter automatically handles:
# - Nested dictionaries
# - Arrays and lists
# - Binary data (base64)
# - Images (JPEG + base64)
# - Datetime objects (ISO format)
# - NumPy arrays

Dynamic Topic MappingDirect link to Dynamic Topic Mapping

# Topic mapping can be dynamic based on frame data:
mappings = [
'main/data/camera_id > cameras/{camera_id}/status',
'main/data/sensor_type > sensors/{sensor_type}/data',
]

Integration with IoT PlatformsDirect link to Integration with IoT Platforms

# Compatible with:
# - AWS IoT Core
# - Azure IoT Hub
# - Google Cloud IoT Core
# - Eclipse Mosquitto
# - HiveMQ
# - VerneMQ

API ReferenceDirect link to API Reference

MQTTOutConfigDirect link to MQTTOutConfig

class MQTTOutConfig(FilterConfig):
class Mapping(adict):
class Options(adict):
qos: int | None
retain: bool | None

dst_topic: str
src_topic: str | None
src_path: list[str] | None
options: Options | None

mappings: str | list[str | Mapping]
broker_host: str | None
broker_port: int | None
username: str | None
password: str | None
client_id: str | Literal[True] | None
keepalive: int | None
base_topic: str | None
interval: float | None
qos: int | None
retain: bool | None

MQTTOutDirect link to MQTTOut

class MQTTOut(Filter):
FILTER_TYPE = 'Output'
VALID_OPTIONS = ('qos', 'retain')

@classmethod
def normalize_config(cls, config)
def get_client(self)
def setup(self, config)
def shutdown(self)
def process(self, frames)

Environment VariablesDirect link to Environment Variables

  • DEBUG_MQTT: Enable MQTT debug logging
  • MQTT_RECONNECT_IVL: Initial reconnect interval (seconds)
  • MQTT_RECONNECT_IVL_MAX: Maximum reconnect interval (seconds)
  • FILTER_BROKER_HOST: MQTT broker hostname
  • FILTER_BROKER_PORT: MQTT broker port
  • FILTER_BASE_TOPIC: Base MQTT topic prefix