Introduction
This guide is designed to provide a comprehensive overview of how to create custom filters using the openfilter library. Whether you`re just starting out or looking to refine your skills, this guide will walk you through the process of setting up your development environment, understanding the fundamental components of openfilter, and building your own filters.
This document assumes that you have your dev environment setup and that you have completed all the setup steps in overview.md
In the sections that follow, we will delve into the structure of a typical filter, using a practical example to illustrate key concepts. Then, this guide will help you understand how to construct workflows using the run_multi function, enabling you to build flexible, reusable pipelines that can process video streams, apply various filters, and output results in multiple formats.
Understanding How Filters Work
OpenFilter's filter_runtime is the library component of the that contains all the necessary boilerplate code to tie different modules and functionalities together. It provides a range of utilities that make it easier to build custom filters. Those below are common modules under the library but there are many more. This document focuses on speeding you up to the very basic modules that you will need in order to build your own filter.
Filter Structure
Consider the folder structure below (some items omitted for simplicity), which you can view in your recently cloned git repo.
filter_example/
├── __init__.py
├── filter.py
├── inference.py
├── model
│ ├── ...
├── scripts
│ └── filter_usage.py
└── tests
The filter.py file acts as a wrapper that integrates model inference and post-processing logic into the filter_runtime. It needs to inherit from the Filter base class. The setup method initializes a model using configuration parameters, ensuring they are valid. If the initialization is successful, the model's object is created and ready for processing.
The process method takes frames as input and applies the necessary processing, whether it's model inference or classic CV techniques. It extracts the frame, processes it, and then returns the modified frame, encapsulated in a Frame object.
This design allows the filter to seamlessly integrate into a larger pipeline, providing specific functionality while abstracting the details of model inference and processing.
Filter IDs
So far in this document, filter config.id values have been left out for the sake of brevity and convenience. And in fact these are not needed is you want to quickly run a filter or during development (in this case they are autogenerated because they are very much needed internally to operate).
The ideal case though is that you provide a unique filter id for each instance of a filter you run in your pipeline. This allows continuity across runs and all logs and metrics related to that specific filter instance will be stored in the same place across runs. When deploying to production or customers you must absolutely provide filter ids so that telemetry for any given pipeline is consistent across runs.
Specifying an id when using the OpenFilter CLI is just another parameter - --id myfilter_id, when running from a python script just add a id='myfilterid' to the configuration or simply set FILTER_ID as an environment variable.
Config
All filters take a config object to specify everything from their network connections with other filters to their individual runtime parameters. At base this config object is a python dict with strings as keys. The actual config classes used add a little bit of convenience by allowing you to access the items as attributes and also allow for “hidden” attributes, but at core they are just dictionaries.
In the examples above, specialized config classes were not used for the sake of simpler examples, and they worked because they are meant to for quick startup, but here is the “proper” way of doing it:
from filter_runtime import Filter, FilterConfig
from filter_runtime.filters.video_in import VideoIn, VideoInConfig
from filter_runtime.filters.webvis import Webvis, WebvisConfig
from filter_myfilter.myfilter import MyFilter
class MyFilterConfig(FilterConfig): # in reality this would be imported, but here to show definition
my_config_option: str
if __name__ == '__main__':
Filter.run_multi([
(VideoIn, VideoInConfig(
id = 'vidin',
sources = 'file:///video.mp4',
outputs = 'tcp://*:5550',
)),
(MyFilter, MyFilterConfig(
id = 'myfilter',
sources = 'tcp://localhost:5550',
outputs = 'tcp://*:5552',
my_config_option = '???',
)),
(Webvis, WebvisConfig(
id = 'webvis',
sources = 'tcp://localhost:5552',
)),
])
The config classes are simply derived from FilterConfig and they are used exactly like the dict class. In fact they are meant to be used everywhere a dict can be used and in the same manner, as well as vice-versa. It is the intention that this will always be the case but you should still define the config class for your filter along with expected parameters as annotations. This both helps when using a type-aware IDE as well as that information may be used in the future to automatically validate configs without having to write explicit validation code.
There are several conventions you should be aware of and follow for config objects. The base class overrides attribute gets and if you attempt to read an attribute that does not exist you will successfully read a None value instead of an AttributeError exception, simplifying config usage throughout your code. If you need to check if a key is actually in the config, you can do so via the standard python key in config expression. Also, it is possible for a parameter to be in the config and have a value of None, which is different from a parameter not being in a config but returning a value of None on access, keep this distinction in mind.
When config parameters are parsed by the CLI or when in Filter.run() from environment variables, special treatment is given to empty strings. If an empty string is encountered in either of these two cases then the config parameter is not set to an empty string (or even None) but rather is not set at all. This is to account for the possibility of an environment variable being present empty, or by extension being passed to a CLI command line due to being passed through a docker compose environment of the form FILTER_PARAM: \${PARAM:-} which has a default value of an empty string. In this case the correct behavior is to act as if the parameter was not set at all and so this is done.
If you wish to pass an actual empty string as a config parameter then pass the JSON value of an empty string "" (two double quotes). Remember to quote this string itself wherever you are passing it because that environment will probably just treat it as an empty string if you do not. For example, in the CLI:
filter_runtime run - Util --outputs '""'
Special “hidden” parameters are allowed in configs which are parameters with a name that begins with an underscore _ character. The only real special thing currently with these parameters is that they are not printed out during filter initialization along with the rest of the config for the config itself and any object within the config which subclasses off of FilterConfig. In general these should be considered as ancillary and should not be counted on as necessary to fully describe a configuration. They may be returned from normalize_config() as part of the configuration, in which case they are 100% guaranteed to make it into the filter setup() function. This is done with the intention of caching any fields which may be expensive to compute from the base configuration as part of validation which is done in normalize_config(). These hidden parameters may be stripped from a FilterConfig by calling config.clean().
Building Workflows with run_multi()
The run_multi function is central to creating complex workflows in filter_runtime. It allows you to define multiple filters in sequence or parallel by specifying the class and configuration for each. It is constructed from two artifacts:
Filter Class: The class type of the filter (e.g., Video, FaceGuard).
Configuration: A dictionary containing specific parameters for the filter instance, such as id, sources, outputs, and any additional parameters required by the filter.
By chaining together multiple filters using run_multi, you can build flexible, reusable workflows that process video streams, apply filters, and output results to various destinations (e.g., files, web interfaces). This modular approach allows for easy customization and scaling of workflows by simply adding or modifying filters in the sequence.
The provided code snippet is an example of how to build a video processing workflow using the openfilter library. This workflow processes a video file to blur faces and demonstrates the use of various filters like Video, FaceGuard, Util, and Web Viewer.
Diving Deep on Filters
This section will go deep into the technical details of how a filter works. A filter is a class that subclasses off of the Filter base class in order to process a stream of data (usually images) in a pipeline. Filters can be chained together in up to a many-to-many topology but must be acyclic (no backsies, though in some use cases it is even possible to create circular filter pipelines, but that is a more advanced topic discussed further below).
Synchronization is taken care of automatically so that all input frames that come in together stay together until the output. Unsynchronized filter paths can also be specified to allow a slower processing path to be triggered without slowing down the rest of the pipeline. Load balancing is also supported so that an expensive workload can be broken up to run on multiple copies of a filter (or even pipeline) in parallel and the sequence rejoined on output to achieve a higher framerate than would be possible with a single filter.
filter_runtime
The filter_runtime component of the OpenFilter Python package takes care of setting up and running a Filter subclass and synchronized communication with other Filters in the pipeline via ZeroMQ over TCP or IPC (pipes). This includes parsing config settings, calling setup and shutdown functions, calling the user's Filter.process() function on incoming data and sending returned data on downstream to other connected Filters in the pipeline. OpenFilter a CLI tool to run filters directly or examine logs and metrics output by filters.
filter_runtime includes as part of itself several support Filters which are needed for the operation of general user Filters. Things like Video Source and Video Streamer, console logging, image manipulation, Data Capture, Web Viewer, MQTT Bridge output of data and telemetry are all provided. When you install OpenFilter you specify whether you want the bare bones library with only dependencies needed for your Filter to function (pip install openfilter) or whether you want other dependencies installed needed for the builtin support Filters (e.g. pip install openfilter[video_in], pip install openfilter[webvis] or pip install openfilter[all]) .
Once installed, you can run the CLI via openfilter --help which will give you a list of other commands you can use, including running Filter pipelines.
Filter base class
The Filter base class is what you will subclass off of and overload some functions. There is only one function which really MUST be overloaded - process(), the others are optional but you will almost certainly overload setup() and probably shutdown() and normalize_config(). A simple sketch of a subclassed filter with all function overloaded would be:
from filter_runtime import FilterConfig, Filter
class MyFilterConfig(FilterConfig):
my_option: str
...
class MyFilter(Filter):
@classmethod
def normalize_config(cls, config):
...
config = MyFilterConfig(super().normalize_config(config))
...
return config
def setup(self, config):
...
def shutdown(self):
...
def process(self, frames):
...
return frames
First off - MyFilterConfig. This is subclassed off of FilterConfig which is essentially a dict. There is currently no type checking enforced and you can technically use a dict in place of the config but in the future the config types and annotations will be used to more strictly parse and control config parameters at startup, so you should always use this kind of config class hierarchy. There is one small detail about config classes subclassed from FilterConfig, you can access their parameters with dot . notation, for example above you can access config.my_option instead of config['my_option'].
normalize_config() is a class method which is intended to parse and validate an incoming config dictionary to make sure the parameters are correct and put them in a state usable by Filter.setup(). The config returned from this function is subsequently passed into setup(config), the very same object usually in fact, but not always, you can count on it having the same fields though. This config object is also stored as the .config attribute in the Filter instance, but dont count on this stored instance being the same config object as was passed to normalize_config()orsetup()` as a filter class derived from your own may have modified it just for interacting with your own filter as a base class.
normalize_config() should not attempt to validate any external resources referenced by the config, that is the job of setup(). Treat normalize_config() as essentially a validator and formatter, not to test for the existence of files or network connections ..etc.
setup() is called with the config object returned from normalize_config() and is the actual function responsible for opening/creating any files, connecting to any servers or databases, ..etc. Everything needed to prepare your filter for operation should happen here.
Likewise shutdown() is responsible for cleaning everything up. shutdown() will be called whether your filter exited cleanly or due to an exception. So if there is anything that might need cleaning up when your filter is exiting for any reason, do it here.
The process() function
This is the core of your filter. This function gets called continuously whenever there is input coming from upstream filters and what it returns is passed on downstream. The full typed declaration of this function is as follows:
def process(self, frames: dict[str, Frame]) \
-> dict[str, Frame] | Frame | Callable[[], dict[str, Frame] | Frame | None] | None:
As you can see the function takes a single parameter which is a dict mapping of string names to Frame objects. Frame will be explained in detail further below, but for now it is sufficient to know that a single Frame object encapsulates exactly one or zero images (as numpy.ndarray, 1 or 3 channel or None) as frame.image and a data dict object (always present but possibly empty) as frame.data.
These frames come from one or possibly multiple upstream filters and are guaranteed to come in synchronized. If you subscribe to one upstream filter and that filter sends three Frame objects with names one, two and three then you are guaranteed to get all three of these Frames or none of them and skip on to the next send. Frame objects are tracked through the filter pipeline topology with IDs for their specific sequential position and thus they can be split up among different filters and recombined and they will always be guaranteed to be of the same sequential ID - you will not get earlier and later frames together no matter how you arrange your pipeline.
The process function can do whatever it wants with these frames and then return whatever frames it wants to to propagate on downstream. It can even return None to indicate that nothing should be propagated downstream and downstream filters will not be woken up to process anything in this case. Please note that returning None is different from returning an empty {} dictionary, in which case filters downstream will get a call to their own process() functions but with an empty {} frames dictionary.
The process() function can return an individual Frame object instead of a dictionary in which case this Frame is sent on downstream as “main”, this is just a convenience for returning {'main': frame}.
You may have noticed that the process() function can also return a Callable. This is provided for advanced usage for input filters like for video. This Callable will not be called until all downstream filters have signaled they are ready for the next frame and sending is guaranteed to happen in that moment. This is so that the freshest frame of video can be provided at the exact moment of send, instead of getting a frame and having it sit in the output queue until it is read to send, at which point there may be a newer frame ready to send but you are stuck sending the old data. As mentioned this is an advanced use of process() and you should not use this unless you know what you are doing.
IMPORTANT! Message processing (including exit messages) in filters is synchronous and happens outside of process() when it is not running. This means that that if you sit in the process() function waiting for some outside event then no messages will be processed. If the rest of the pipeline exits in this time your filter will be left there hanging preventing everything from exiting properly. In short, dont sit in process() waiting for things to happen, treat it as a poll operation to your filter. If you need something to sit blocking on some other event and cant or don`t want to poll it yourself then put the blocking wait in another thread or process.
sources' and outputs`
As stated above, all filters live in a pipeline where images are generated from somewhere (video, rtsp, REST input), go through processing filters and finally output filters (output video, rtsp, mqtt, image / data files, databases, etc…). These filters are run either as separate processes by the CLI or a python script or by docker compose in their own containers (this is the way we ship to production). The way these filters are connected together is defined by the sources and outputs fields in the individual filter configs.
In general, sources and outputs are not symmetrical. That is, when referring to filters, sources refer to specific upstream filters whereas outputs are bind addresses where other filters can connect via their own sources, they are not destination filter addresses.
It was stated that “when referring to filters” because these fields can be overloaded by special input and output filters to refer to other resources. In general sources and outputs should ALWAYS be specified as a single or multiple comma delimited URIs, that is address strings that begin with a scheme like http://, rtsp://, file:// or in the case of referring to other filters in the pipeline to connect to, they are specified as either tcp:// or ipc://.
From this point on this document will refer to sources and outputs in the context of connecting to other filters in the pipeline as the other uses of these (such as video input or output) should be relatively self-explanatory.
outputs are the simpler of the two to explain. Simply, outputs just defines locations where other filters can connect TO to talk to this filter, for example:
tcp://*- Listen for connections on all network interfaces on the default port 5550.tcp://192.168.1.12:5552- Listen for connections only on192.168.1.12port 5552.ipc://my_pipe- Listen to connections on a pipe created in the current directory namedmy_pipe
These examples demonstrate the two methods of communication supported by filter_runtime - TCP and IPC pipes. filter_runtime uses ZeroMQ for all its communication in order to keep things as nimble as possible. When images are sent, for example, they are sent as pure binary (ususally jpg encoded) to avoid the overhead of any other ASCII encodings.
‘ipc://’ connections are slightly faster and lower latency than ‘tcp://’ but they are limited to the local machine and in fact to a directory which must be mutually accessible by both the source and output filters (so you have to map directories or volumes when running in docker for this). TCP on the other hand allows connecting to any machine anywhere accessible on your network.
There is one idiosyncrasy with TCP connections to keep in mind, TWO ports are used, the port number you specify and that port number + 1. This is because of ZeroMQ limitations on data flow but also allows any ZeroMQ client outside the filter ecosystem to be able to plug into a filter pipeline and receive messages (the messages are sent on a standard ZeroMQ PUB socket so any ZeroMQ client SUB socket can just subscribe to this).
The fact that two ports are used WILL absolutely 100% at some point bite you in the ass. Whether it will be forgetting to expose a second port in a docker-compose.yaml, or using the next sequential port + 1 for another filter on your local machine, you will run into this in the future so try to remember these wise words in that moment.
`sources' and topics
Filters communicate by sending Frame objects to each other. Each filter “publishes” zero or more Frame objects each with its own topic name. Downstream filters can “subscribe” to individual topics and receive only Frames on those topics. Using ZeroMQ this is handled in a way that if a downstream filter does not subscribe to a given topic that an upstream filter publishes, that data is never sent on that connection. For this reason it makes sense to subscribe only to upstream topics your filter actually uses or will need to pass on downstream.
By default if nothing is specified in a source connection string to an upstream filter, all topics are subscribed to (with the exception of the special system topic _metrics, explained below). This means that you don’t have to know what the upstream filter publishes, you will get all of it. Such a sources string might take the format of tcp://upstream_filter:5554.
If you wish to subscribe to a specific topic, or more, you can add these topics after the address in the sources string separated by semicolons, e.g. tcp://upstream_filter:5554;my_topic;other_topic will subscribe to just my_topic and other_topic. Anything else the upstream filter publishes will not be sent to your filter and if the upstream filter does not publish one or both of your subscribed topics you will simply not receive them in the frames dictionary passed to process(), with the extreme case being if the upstream filter does not publish anything you subscribe to you will get an empty {} frames dictionary.
It is possible to subscribe to more than one upstream filter at the same time, for example ipc://filter1, ipc://filter2. In this case if the filters publish any duplicated topics you will get a runtime error. In order to avoid this you should explicitly subscribe to topics and in the case of duplicate upstream topics you should remap them to new local topic names. You do this via the > destination topic specifier after the topic name after the ;. For example, if you subscribe to two upstream filters that both publish a single topic on main, you can remap them in the following way: ipc://filter1;main>mymain1, ipc://filter2;main>mymain2, and in this way your process() function will receive a frames object of the form {'mymain1': Frame from filter1, 'mymain2': Frame from filter2}.
In addition to the normal topics filters publish, there is a system topic that (unless explicitly turned off with config options) is published individually by each filter and available for downstream filters to subscribe to explicitly only - the _metrics topic. This is a topic with no image but with performance metrics for the filter you subscribed to. There is also a way to subscribe to EVERYTHING from upstream as if you didn’t specify any explicit topics at all which will include the _metrics topic, simply subscribe to * as in tcp://upstream_filter:5556;*. Don’t subscribe to anything else, just this, and you will get EVERYTHING from the upstream filter, including any future system level topics that might start with _ and be undelivered otherwise like the _metrics topic.
Frame objects
A Frame object encapsulates a single image possibly or None and a (possibly empty) dictionary of key/value pairs associated with that image. It is perfectly fine to pass data without passing an image, but if there is data to be passed which relates to an image then it should be passed in the dictionary of the Frame containing that image.
When dealing with Frame objects there is no preset format (RGB, BGR or GRAY). The Frame itself knows what format the image is in and when you wish to use the image you request it in the format you want: frame.rgb, frame.bgr or frame.gray. These properties are smart and if the image in the frame is already in the requested format then they do nothing and just return the current Frame. Likewise, when creating a Frame object for passing downstream you should specify the format you are sending to avoid ambiguity, either by specifying it directly as in Frame(image, data, format) or by using the incoming frame as a template to copy data and format information Frame(new_imagee, old_frame).
A Frame will also normally come from upstream with a readonly image. For technical reasons this is more optimal than always creating a writable image when all you will need to do is read the image (like for inference). JPG encoding is normally also used for passing the images between component and if a readonly frame does not have its image modified and passes the image downstream in the proper way then relatively expensive reencoding of the JPG image does not have to take place again as it will be passed through cached in the Frame object, even if you create a new frame from the incoming one, e.g. Frame(old_frame, new_data).
If you need a writable image because you will be modifying it or passing it to something which needs a writable image then you need to convert the frame first. This is done in a manner similar to format conversion, frame.rw will give you a new Frame with a writable copy of the incoming image. If the frame object was already writable then nothing happens and you get back the same frame. A new writable Frame object will not preserve any JPG encoding that came in from the network though as it is writable and any changes which will be done to the image would invalidate the encoding anyway.
If you need an image from an incoming frame in a specific format / writable state you should always assume the incoming frame is in an unknown state and access it in the way you will want the image. frame.rw_rgb, frame.rw_bgr, frame.ro_rgb and frame.ro_bgr will give you the a Frame object with the kind of image you want in the most optimal manner from what the Frame currently is, including returning the same exact Frame object if it is already in the desired format. Likewise, if possible, it will preserve any JPG encoding to pass on downstream.
One caveat about Frames and JPG encoding. The underlying image library used is OpenCV. This library uses BGR image color format as the default. This means that figuring out what what format your image is in and what format it should be in may become confusing. To maintain speed, BGR<->RGB conversion is kept to a minimum, including when automatically encoding JPG for network transmission. This is fine if you work and pass around BGR images as is the default, but if for some reason you use RGB images and use those as default, the JPGs encoded for network transmission will be backwards. In order to convert to a proper JPG for writing out to a file you MUST make sure that the source format is BGR, so you need to do frame.bgr.jpg in order to have the properly formatted output JPG data, which will ignore any cached JPG encoding that came in from the network and reencode.
Ephemeral sources
Normally when a filter sends frames downstream it doesn’t send anything until all the downstream filters have signaled that they are ready for the next frame. This is done to maintain synchronization and make sure that all frames from a given moment in time stay together. However it is possible that there is a filter which takes a long time to process its frames that may be triggered only occasionally. If this filter subscribes to an upstream in the normal way it may hold up the rest of the pipeline while it is doing its processing unnecessarily. For this reason the “ephemeral” method of subscribing was added.
You can subscribe to an upstream in a way which in not synchronized with the rest of the pipeline and will not hold it up if you do not request the next frame in time with the rest of the pipeline. This is done by simply appending a ? to the end of the source address as such: ipc://upstream?. What this does is tell the upstream to ignore this filter for the purposes of synchronization and send the next frame when all other subscribed filters have requested it, regardless of if this filter is ready for the next frame or not.
In practice this is meant for filters which will carry out their expensive processing on selected inputs. This input can be selected in many ways but the intended method is that the expensive filter will subscribe to a topic on the upstream which will only be present when there is a special Frame to process, in other cases this topic will simply not be present and the expensive filter will simple receive an empty frames. However, when it does receive a frame and starts working on it, it will not hold up the rest of the pipeline while it completes this processing.
There are other uses for this type of source connection but the above described was the one that necessitated the creation of this connection type. In terms of properties of an ephemeral connection, it is a connection that will not hold up the normal connection pipeline if it is not ready for new frames. It is not guaranteed to get all frames like other connections, some may be dropped, but it is guaranteed to get them in order (older frames will never arrive before newer). In practice, unless there are a lot of frames being sent to an ephemeral receiver no frames will be dropped because they will queue up in the communication buffers and be delivered when the receiver is ready for them.
Ephemeral receivers should also not be rejoined with a normal stream of frames in a normal way, but rather the filter that subscribes to an ephemeral part of the pipeline but also a synchronized part must subscribe to the ephemeral part in an ephemeral way with a ? source connection string.
There are also “doubly” ephemeral source connections, these are specified with two ?? at the end of a source string like tcp://localhost:5552??. This is everything that a normal ephemeral connection is and more. The doubly ephemeral source only connects with the SUB socket so it only listens for new messages and can not send messages TO the upstream filter at all, which means that it can not affect the state of the filters it is listening to (for example by sending out-of-band exit messages). This is meant for when you want to passively monitor the output of a filter but not interact with it in any other way. Needless to say it is not guaranteed in this mode that you will get ALL messages, but unless you are doing some heavy processing with these received messages you should actually get them all.
Out-of-band messages
These exist and are used to pass filter exit or error messages up and down the pipeline. When one filter exits in a way which policy allows it to signal, it sends an out-of-band exit message to all its outputs as well as sources (even if it is an ephemeral connection, but not a doubly-ephemeral one). This in turn will cause those filters to send exit messages to everything THEY are connected to and exit themselves. In this way, when a filter exits either in the normal course of processing (a video file ends), or as an error (if it is configured to send this exit message) then the rest of the pipeline can exit as well.
This especially is useful when running filters in docker containers as otherwise a filter might exit but the rest of the pipeline might keep running regardless and never exit. The exit propagation and whether to obey those or not are configurable via environment variables and are normally set to filters propagating a clean exit but not an error exit - which is meant for you to correct the error with the docker container and re-run that filter.
Logging, metrics and telemetry
Filters automatically plug in to the standard Python logging mechanism and extend its functionality. Basically, if you are familiar with the logging module and how that works, you can do filter logging and telemetry. The filter_runtime component plugs into the python log stream and writes all logs out to a log directory specific to the running filter identified by its id from the config. These logs are stored as serialized JSON objects so they can be read back later preserving all stored information, including time with timezone, log severity, process id, thread id and the actual log message itself.
The log manager takes care of ensuring that any individual log file does not exceed a preset maximum size and that the total size of all logs for this filter also does not exceed another preset maximum size. When this happens, the log manager will start to delete older log files to keep the total below this maximum.
Metrics are automatically generated for each filter and include: CPU usage percentage (this process and all its children recursively), memory usage (this process and all its children recursively), FPS framerate (rate at which the process() function is called), latency in (time from when the incoming frame image was generated to when you receive it) and latency out (time from image generation to when it is output). GPU usage and memory is also available if a GPU is present for up to 8 GPUs.
In addition to these autogenerated metrics a user filter can also include any number of its own custom metrics by simply returning them from the process() function in a topic called _metrics. These user-generated metrics will be combined with the autogenerated ones and published to either all outputs, a specific output, or not published to other filters at all depending on configuration.
Load balancing
Disclaimer! As of the time of writing this document load balancing works but is considered semi-experimental and has one caveat: If the downstream filters from a load balanced section run slower than the load balanced section itself then frames may be dropped. This should not normally happen as you will load balance filters because they are slower than downstream but it still an issue that needs to be fixed. For now, please be aware of this limitation.
This is intended to allow a workload that normally runs too slow executing on a single filter in a pipeline to be parallelized across multiple copies of the same identical filter or section of pipeline. This will allow each of the N copies of the filter to handle 1/N of the work thus speeding up the effective processing by N. The restriction is that the processing needs to be something that does not keep state and does not require the result of the current frame to depend on the previous or any number of previous frames. So detection and classification yes, tracking and counting no.
In order to load balance a filter stream, the last filter before the balanced section needs to specify outputs_balance=True in its config and this will cause all outputs to be treated individually and be used in a round-robin manner to send alternate frames to. So for example if there are three outputs then frame 1 will be sent to output 1, frame 2 to output 2, frame 3 to output 3 and then frame 4 back to output 1 and so on. In reality it is a bit more involved in that if the next output is still busy then the frame will be sent to another output if one is available, but the basic idea is that the outputs are rotated.
Needless to say this is a special way of sending messages and for the load balanced parts of the pipeline the internal message ids will not be sequential and special considerations need to be taken that this all works properly (so no ephemeral channels within the balanced section).
After the load balanced section, there will be a filter which receives input from all the load balanced workers and joins the output back into a single stream. This filter needs to specify sources_balance=True in its config which will tell it to treat all its sources in a special manner and that they are coming from a balanced section and to join them together into a single stream. This filter’s process() function itself will receive the frames in sequential order as if they had come from a single faster upstream filter. And of course if the filter then outputs its data it will be a single stream.
The splitter filter, the one that has outputs_balance=True, can receive its data from any number of upstream filters as sources or it can generate the frames itself like VideoIn does. Any number of topics can be balanced and they will all be sent together round-robin to all the balanced workers. Remember that the splitter filter will treat ALL its outputs as worker filters so there will not be a normal sequential stream of frames on any output.
The joiner filter, the one that has sources_balance=True, will likewise treat all its sources as balanced workers and should not be plugged into any other normal sequential stream of frames. It can output in a normal manner on any number of outputs and those outputs will have the joined stream of frames, but this filter is a special node in the topology just like the splitter filter.
Circular pipelines
Disclaimer! Like load balancing this should be considered semi-experimental, but it is less technically involved and works well enough.
Normally a pipeline can take any number of splits and rejoins but must be completely acyclic or chicken-and-egg problems can arise. In the simplest case it is a filter that listens to itself. It will never move on to the process() function because it never receives anything because its process() function is never executed to send something because it never received anything, ad infinitum…
This can be resolved with a few config options which technically do not result in a “circular” pipeline per se but rather splits the specified filter into a receiving filter and a sending filter, with the latter not depending on the former to receive data.
The first of the two options you need to specify in order to be able to send and receive to/from the same pipeline in your filter is sources_timeout=100, or some other similar value but 100 milliseconds is the standard polling timeout used in other places so it is suggested to use this. This option tells the filter that if no messages come in from upstream in the given time then it should call the process() function anyway with an empty frames object so it can at least get control and maybe do some processing to send something out. This option is necessary because the filter sends data out into the pipeline by returning that data from the process() function so this function MUST be called at some point. If there is nothing received from sources which may be dependent on you sending something out to outputs from process() then you will never get a chance to send something in the first place.
The second option is mq_msgid_sync=false. This option is needed to turn off message id sharing between the pipeline sender (to outputs) and receiver (from sources). Normally internal message ids are passed between the two in order that if a downstream filter jumps ahead for some reason that upstream filters do not continue to send already outdated frames and jump ahead to send the requested frames. Likewise the message ids are passed from upstream to downstream in case upstream jumped ahead to inform downstream that it not continue to request older messages and synchronizes with what is being sent.
In a circular pipeline this would cause a problem because the same message id that was sent out would eventually return to the sources input making the filter think that upstream is repeating older messages. In order to avoid this, mq_msgid_sync=false turns off this synchronization and the message IDs are just validated to be sequential (which they will be) and not that they match up in terms of consistency between sources and outputs.
This mode of operation is meant for things like servers that wait for some request then need to pass it on to a filter pipeline for processing and then wait for the result from this filter pipeline. There are other perfectly valid ways of doing the same thing, but this is the simplest.
Plugging into the pipeline from outside
The filter pipeline does all its communication via ZeroMQ PUB/SUB and PUSH/PULL sockets. This is set up in a way to make it easy for application outside the Filter architecture to be able to plug into and read messages directly from a running pipeline using just ZeroMQ. In order to do this all the external app needs to do is create a ZeroMQ SUB socket (ZeroMQ clients are available for a wide range of languages and platforms) and then subscribe to a pipeline filter endpoint as specified in that fitler’s outputs parameter.
Once subscribed, the application will start to receive all messages being published by that filter on that output in a format that is relatively easy to parse. The topic is the first sub-message of a multipart message (for individual topic subscribe purposes). This is followed by a JSON metadata envelope which is followed immediately by a binary JPG encoded (or raw) image (if an image is present at all) with dimensions specified in the envelope. Following this is the JSON data object for the given frame.
This is meant to allow easy access for web UIs for example to be able to display an image stream directly from a pipeline or some other kind of application not written on top of the Filter base class or even in Python to be able access pipeline data.