Skip to main content

REST Connect Filter

The REST Connect filter is an input filter for OpenFilter that provides HTTP REST API endpoints to receive data from external sources. It exposes FastAPI endpoints that can accept JSON payloads and binary data, then forwards the received data through the OpenFilter pipeline as Frame objects.

OverviewDirect link to Overview

The REST Connect filter is designed to handle HTTP-based data ingestion scenarios where you need to:

  • Accept data from web applications and external systems
  • Provide RESTful API endpoints for data ingestion
  • Support various HTTP methods (GET, POST, PUT, PATCH, DELETE)
  • Load local files from a specified resource path
  • Convert HTTP requests into OpenFilter Frame objects
  • Handle JSON and binary data payloads

Key FeaturesDirect link to Key Features

  • REST API Endpoints: FastAPI-based HTTP server with multiple endpoints
  • JSON Payload Support: Direct JSON data ingestion
  • Local File Loading: Serve files from local resource path
  • Path Parameters: Dynamic endpoint routing with path parameters
  • HTTP Methods: Support for GET, POST, PUT, PATCH, DELETE
  • Data Conversion: Automatic conversion to Frame objects
  • Resource Management: Local file serving capabilities

ConfigurationDirect link to Configuration

Basic ConfigurationDirect link to Basic Configuration

from openfilter.filter_runtime.filter import Filter
from openfilter.filter_runtime.filters.rest import Rest

# Simple REST API server
Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='0.0.0.0',
port=8080,
)),
])

Advanced ConfigurationDirect link to Advanced Configuration

# REST API with local resources
Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='0.0.0.0',
port=8080,
resource_path='/path/to/local/files',
methods=['GET', 'POST', 'PUT'],
endpoints=['/data', '/files/{filename}'],
)),
])

Environment VariablesDirect link to Environment Variables

You can configure via environment variables:

export FILTER_OUTPUTS="tcp://*:5550"
export FILTER_HOST="0.0.0.0"
export FILTER_PORT="8080"
export FILTER_RESOURCE_PATH="/path/to/files"
export FILTER_METHODS="GET,POST,PUT"
export FILTER_ENDPOINTS="/data,/files/{filename}"

HTTP EndpointsDirect link to HTTP Endpoints

Default EndpointsDirect link to Default Endpoints

The REST Connect filter provides several built-in endpoints:

1. Root Endpoint (/)Direct link to 1-root-endpoint-

  • Method: GET
  • Purpose: Health check and basic information
  • Response: JSON with server status

2. Data Endpoint (/data)Direct link to 2-data-endpoint-data

  • Method: POST, PUT, PATCH
  • Purpose: JSON data ingestion
  • Content-Type: application/json
  • Body: JSON payload

3. Dynamic File Endpoint (/files/{filename})Direct link to 3-dynamic-file-endpoint-filesfilename

  • Method: GET, POST, PUT, PATCH, DELETE
  • Purpose: File operations with path parameters
  • Path Parameters: filename - the file to operate on

Custom EndpointsDirect link to Custom Endpoints

You can define custom endpoints using the endpoints configuration:

endpoints=[
'/api/v1/data',
'/api/v1/files/{filename}',
'/custom/endpoint/{id}',
]

HTTP MethodsDirect link to HTTP Methods

Supported MethodsDirect link to Supported Methods

  • GET: Retrieve data and files
  • POST: Send data
  • PUT: Update/replace data
  • PATCH: Partial data updates
  • DELETE: Remove data

Method ConfigurationDirect link to Method Configuration

methods=['GET', 'POST', 'PUT', 'PATCH', 'DELETE']

Data HandlingDirect link to Data Handling

JSON Payload ProcessingDirect link to JSON Payload Processing

The REST Connect filter converts JSON payloads into Frame objects:

# Incoming JSON
{
"detections": [
{"class": "person", "confidence": 0.95, "bbox": [100, 100, 200, 200]}
],
"timestamp": "2024-01-15T10:30:00Z",
"camera_id": "cam1"
}

# Converted to Frame object
frame = Frame()
frame.data = {
"detections": [...],
"timestamp": "...",
"camera_id": "cam1"
}

Local File ServingDirect link to Local File Serving

Resource Path ConfigurationDirect link to Resource Path Configuration

The resource_path parameter specifies where local files are served from:

resource_path='/path/to/local/files'

File AccessDirect link to File Access

Files can be accessed via the /files/{filename} endpoint:

GET /files/image1.jpg
GET /files/data/sensor_readings.json
GET /files/videos/sample.mp4

File OperationsDirect link to File Operations

Different HTTP methods provide different file operations:

  • GET: Download/read file
  • POST: Create file
  • PUT: Replace file content
  • PATCH: Update file metadata
  • DELETE: Remove file

Usage ExamplesDirect link to Usage Examples

Example 1: Basic REST API ServerDirect link to Example 1: Basic REST API Server

Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='0.0.0.0',
port=8080,
)),
(ObjectDetection, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
(ImageOut, dict(
sources='tcp://localhost:5552',
outputs='file:///output/detected_{frame_number}.jpg',
)),
])

Behavior: Accepts HTTP requests, processes data through object detection, and saves results.

Example 2: Data Processing PipelineDirect link to Example 2: Data Processing Pipeline

Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='0.0.0.0',
port=8080,
resource_path='/data',
endpoints=['/data', '/files/{filename}'],
)),
(DataProcessor, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
(OutputFilter, dict(
sources='tcp://localhost:5552',
outputs='file:///processed/data.json',
)),
])

Behavior: Accepts data requests, processes them, and saves processed results.

Example 3: JSON Data IngestionDirect link to Example 3: JSON Data Ingestion

Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='0.0.0.0',
port=8080,
methods=['POST', 'PUT'],
endpoints=['/data', '/api/v1/sensors'],
)),
(DataProcessor, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
(Recorder, dict(
sources='tcp://localhost:5552',
outputs='file:///logs/sensor_data.jsonl',
rules=['+'],
)),
])

Behavior: Accepts JSON sensor data, processes it, and logs results.

Example 4: Multi-Endpoint APIDirect link to Example 4: Multi-Endpoint API

Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='0.0.0.0',
port=8080,
resource_path='/resources',
endpoints=[
'/api/v1/data',
'/api/v1/files/{filename}',
'/health',
'/status',
],
)),
(Router, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
routing_rules={
'data': 'tcp://*:5554',
'files': 'tcp://*:5555',
},
)),
])

Behavior: Provides multiple API endpoints with different processing paths.

Example 5: Webhook IntegrationDirect link to Example 5: Webhook Integration

Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='0.0.0.0',
port=8080,
endpoints=['/webhook/{source}'],
)),
(WebhookProcessor, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
(NotificationSender, dict(
sources='tcp://localhost:5552',
outputs='tcp://*:5554',
)),
])

Behavior: Accepts webhook calls from external services and processes them.

Example 6: File Management APIDirect link to Example 6: File Management API

Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='0.0.0.0',
port=8080,
resource_path='/files',
methods=['GET', 'POST', 'PUT', 'DELETE'],
endpoints=['/files/{filename}'],
)),
(FileProcessor, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
)),
(FileManager, dict(
sources='tcp://localhost:5552',
outputs='tcp://*:5554',
)),
])

Behavior: Provides complete file management API with CRUD operations.

API Usage ExamplesDirect link to API Usage Examples

Sending JSON DataDirect link to Sending JSON Data

Using curlDirect link to Using curl

# Send JSON data
curl -X POST http://localhost:8080/data \
-H "Content-Type: application/json" \
-d '{
"detections": [
{"class": "person", "confidence": 0.95, "bbox": [100, 100, 200, 200]}
],
"timestamp": "2024-01-15T10:30:00Z",
"camera_id": "cam1"
}'

Using Python requestsDirect link to Using Python requests

import requests
import json

# Send JSON data
data = {
"detections": [
{"class": "person", "confidence": 0.95, "bbox": [100, 100, 200, 200]}
],
"timestamp": "2024-01-15T10:30:00Z",
"camera_id": "cam1"
}
response = requests.post('http://localhost:8080/data', json=data)
print(response.json())

File OperationsDirect link to File Operations

Download fileDirect link to Download file

# Download a file
curl -X GET http://localhost:8080/files/data1.json -o downloaded_data.json

Create fileDirect link to Create file

# Create a new file
curl -X POST http://localhost:8080/files/new_data.json \
-H "Content-Type: application/json" \
-d '{"data": "content"}'

Delete fileDirect link to Delete file

# Delete a file
curl -X DELETE http://localhost:8080/files/old_data.json

Error HandlingDirect link to Error Handling

HTTP Status CodesDirect link to HTTP Status Codes

  • 200 OK: Successful operation
  • 400 Bad Request: Invalid request data
  • 404 Not Found: Endpoint or file not found
  • 405 Method Not Allowed: Unsupported HTTP method
  • 413 Payload Too Large: File too large
  • 500 Internal Server Error: Server processing error

Error Response FormatDirect link to Error Response Format

{
"error": "Error description",
"detail": "Detailed error information",
"status_code": 400
}

Common Error ScenariosDirect link to Common Error Scenarios

  • Invalid JSON: Malformed JSON payload
  • Missing Files: File not found in resource path
  • Permission Errors: File access permission issues
  • Large Data: Data exceeding size limits
  • Unsupported Methods: HTTP method not allowed
  • Missing Parameters: Required parameters not provided

Security ConsiderationsDirect link to Security Considerations

Input ValidationDirect link to Input Validation

  • Data Type Validation: Restrict allowed data types
  • Size Limits: Limit data payload sizes
  • Path Traversal: Prevent directory traversal attacks
  • Content Validation: Validate received content

Access ControlDirect link to Access Control

  • Authentication: Implement authentication mechanisms
  • Authorization: Control access to endpoints
  • Rate Limiting: Prevent abuse and DoS attacks
  • CORS: Configure cross-origin resource sharing

Security Best PracticesDirect link to Security Best Practices

# Example security configuration
Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='127.0.0.1', # Bind to localhost only
port=8080,
resource_path='/secure/data', # Restricted path
# Add authentication middleware
# Add rate limiting
# Add input validation
)),
])

Performance ConsiderationsDirect link to Performance Considerations

Request HandlingDirect link to Request Handling

  • Async Processing: FastAPI provides async request handling
  • Connection Pooling: Efficient HTTP connection management
  • Memory Usage: Monitor memory usage for large data payloads
  • Processing Time: Consider downstream processing time

File OperationsDirect link to File Operations

  • Streaming: Stream large files instead of loading into memory
  • Caching: Cache frequently accessed files
  • Compression: Use compression for large responses
  • CDN Integration: Use CDN for static file serving

MonitoringDirect link to Monitoring

  • Request Metrics: Monitor request rates and response times
  • Error Rates: Track HTTP error rates
  • Data Operations: Monitor data processing rates
  • Resource Usage: Track CPU and memory usage

TroubleshootingDirect link to Troubleshooting

Common IssuesDirect link to Common Issues

Server Not StartingDirect link to Server Not Starting

  1. Check port availability
  2. Verify host binding
  3. Check firewall settings
  4. Validate configuration parameters

Data Processing FailuresDirect link to Data Processing Failures

  1. Check data size limits
  2. Verify data permissions
  3. Ensure sufficient disk space
  4. Validate data format support

JSON Processing ErrorsDirect link to JSON Processing Errors

  1. Validate JSON syntax
  2. Check content-type headers
  3. Verify data structure
  4. Handle encoding issues

Path Parameter IssuesDirect link to Path Parameter Issues

  1. Validate endpoint patterns
  2. Check parameter extraction
  3. Verify file path construction
  4. Handle special characters

Debug ConfigurationDirect link to Debug Configuration

import logging
logging.basicConfig(level=logging.DEBUG)

# Enable REST Connect filter debugging
export DEBUG_REST=true
export LOG_LEVEL=DEBUG

Debug InformationDirect link to Debug Information

  • Request Logging: Log all incoming requests
  • Response Logging: Log response details
  • Data Operations: Log data access operations
  • Error Details: Detailed error information

Advanced UsageDirect link to Advanced Usage

Custom MiddlewareDirect link to Custom Middleware

# Add custom middleware for authentication, logging, etc.
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

Custom EndpointsDirect link to Custom Endpoints

# Define additional custom endpoints
@rest_app.post("/custom/endpoint")
async def custom_endpoint(data: dict):
# Custom processing logic
return {"status": "processed", "data": data}

Integration with External SystemsDirect link to Integration with External Systems

# Integrate with external APIs and services
Filter.run_multi([
# ... other filters above
(Rest, dict(
outputs='tcp://*:5550',
host='0.0.0.0',
port=8080,
)),
(ExternalAPIClient, dict(
sources='tcp://localhost:5550',
outputs='tcp://*:5552',
api_url='https://external-api.com',
api_key='your-api-key',
)),
(ResponseFormatter, dict(
sources='tcp://localhost:5552',
outputs='tcp://*:5554',
)),
])

API ReferenceDirect link to API Reference

RestConfigDirect link to RestConfig

class RestConfig(FilterConfig):
outputs: str | list[str] | list[tuple[str, dict[str, Any]]]
host: str
port: int
resource_path: str | None
methods: list[str] | None
endpoints: list[str] | None

RestDirect link to Rest

class Rest(Filter):
FILTER_TYPE = 'Input'

@classmethod
def normalize_config(cls, config)
def init(self, config)
def setup(self, config)
def shutdown(self)
def process(self, frames)
@staticmethod
def load_file(filename: str, resource_path: str | None) -> bytes
@staticmethod
def parse_multipart_data(content: bytes, content_type: str) -> tuple[dict[str, Any], dict[str, bytes]]
@staticmethod
def parse_json_data(content: bytes) -> dict[str, Any]
@staticmethod
def parse_form_data(content: bytes, content_type: str) -> dict[str, Any]

Environment VariablesDirect link to Environment Variables

  • DEBUG_REST: Enable debug logging
  • FILTER_OUTPUTS: Output destinations
  • FILTER_HOST: Server host address
  • FILTER_PORT: Server port number
  • FILTER_RESOURCE_PATH: Local file resource path
  • FILTER_METHODS: Allowed HTTP methods
  • FILTER_ENDPOINTS: Custom endpoint patterns