SObjectizer Tales – 18. Limiting message chains

Posted: February 8, 2024 in SObjectizer Tales
Tags: , ,

In the previous episode we explored message limits and discussed their impact on agent design. We understood that this tool is effective only when we need to limit an agent’s message queue. This typically happens when “handling” and “processing” a message are combined within a singular operation.

On the contrary, in scenarios where an agent’s design separates such tasks, the usual practice involves buffering data in an external data structure rather than within the agent’s message queue. In this case, controlling the message flow is only attainable by implementing specific countermeasures within that particular data structure. For example, in our current implementation of face_detector, we regulate the incoming flow by verifying whether the queue contains less than 100 images:

so_subscribe(m_input).event([this](so_5::mhood_t<cv::Mat> m) {		
	if (m_buffer.size() < 100)
	{
		m_buffer.push(m.make_holder());
		if (m_buffer.size() == 1)
		{
			so_5::send<process_one_buffered_image>(*this);
		}
	}			
});

Given that this manual check can be annoying and prone to errors, in this installment, we are exploring a feature of SObjectizer as an alternative solution to this issue. The feature presented is actually more broad and isn’t confined solely to limiting message flow.

Before we begin, it’s essential to provide a necessary preamble that applies to the last two articles as well. We discussed methods that involved adding a strategy for buffering and dequeuing messages to the rest of the agent’s logic. Indeed, sending internal messages like process_one_buffered_image is essentially a means to iterate to the next buffered message that is not domain logic. As said several times, the beauty of the actor model is that messaging and such things are referred to the framework. Thus, formally, these approaches violate the Single Responsibility Principle, as classes (agents) should ideally have only one responsibility. Therefore, a slightly better approach would involve refactoring to create a generic agent responsible for middleware operations, while allowing its logic to be customizable. This is not done in this article but might be explored in the future.

The primary objective of this discussion is to restrict incoming messages to this face_detector implementation based on message chains introduced in a previous post:

class face_detector final : public so_5::agent_t
{
	struct process_one_buffered_image  final : so_5::signal_t {};
public:
	face_detector(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output)
		: agent_t(std::move(ctx)), m_input(std::move(input)), m_output(std::move(output))
	{
	}

	void so_evt_start() override
	{
		if (!m_classifier.load("haarcascade_frontalface_default.xml"))
		{
			throw std::runtime_error("Can't load face detector classifier 'haarcascade_frontalface_default.xml'");
		}
	}

	void so_define_agent() override
	{
		m_buffer = so_environment().create_mchain(so_5::make_unlimited_mchain_params().not_empty_notificator([this] {			
			so_5::send<process_one_buffered_image >(*this);
		}));
		m_binding.bind<cv::Mat>(m_input, wrap_to_msink(m_buffer->as_mbox()));

		so_subscribe_self().event([this](so_5::mhood_t<process_one_buffered_image>) {
			receive(from(m_buffer).handle_n(1).no_wait_on_empty(), [this](const cv::Mat& src) {
				cv::Mat gray;
				cvtColor(src, gray, cv::COLOR_BGR2GRAY);
				std::vector<cv::Rect> faces;
				m_classifier.detectMultiScale(gray, faces, 1.1, 4);
				auto cloned = src.clone();
				for (const auto& [x, y, w, h] : faces)
				{
					rectangle(cloned, { x, y }, { x + w, y + h }, { 255, 0, 0 }, 2);
				}
				so_5::send<cv::Mat>(m_output, std::move(cloned));
			});
						
			if (!m_buffer->empty())
			{
				so_5::send<process_one_buffered_image >(*this);
			}
		});
	}

private:
	so_5::mbox_t m_input;
	so_5::mbox_t m_output;
	so_5::mchain_t m_buffer;
	so_5::single_sink_binding_t m_binding;
	cv::CascadeClassifier m_classifier;
};

Limited message chains

Up to now, we have created message chains without bothering about their capacity by blindly using create_mchain(environment). When invoked without any additional parameters, this function generates a message chain devoid of any message limitations. But actually, it accepts several arguments to customize its capacity and a few other settings. For this reason, SObjectizer semantically distinguishes two types of message chains:

  • unlimited
  • limited

The difference is intuitive: the former permits the storage of an unlimited number of messages within the message chain (restricted only by the available memory), while the latter enforces a specific limit on the quantity of messages that can be stored. Once created, the type of message chain cannot be changed.

Limited message chains have a crucial distinction from unlimited message chains: a limited message chain cannot accommodate more messages than its maximum capacity. Consequently, there should be a response or handling mechanism in place when an attempt is made to add another message to a full message chain. The initial reaction might involve waiting – at the sender’s apartment – for a specified duration, in the hope that some space within the chain will become available. If this does not occur or if the chain was created without such a waiting time, a reaction will be performed.

SObjectizer provides four possible “overflow” reactions:

  • dropping the new message,
  • removing the oldest message to make room for the new one,
  • throwing an exception (propagated up to the sender),
  • aborting the application.

You should see some similarities with message limits reactions, however message redirection and transformation are not provided.

It’s worth noting that, regardless of the reaction, it will be performed in the sender’s context. Consequently, so_5::sending a message to a message chain will always result in either the addition of a new message or the execution of the configured reaction.

Before delving into some more details about creating a message chain with different capacity settings, it’s mandatory to introduce another related aspect of this feature: memory usage. Indeed, limited message chains can be customized even further by specifying if the memory allocated is fixed (preallocated) or not (dynamic). By specifying the former setting, the message buffer will be preallocated, ensuring it remains constant and unaffected by the chain’s expansion or reduction. Conversely, employing dynamic memory usage results in the chain’s size varying as it expands or contracts.

How can message chains be created with all such customizations in place?

SObjectizer provides two ways. The first one still consists in using create_mchain that, in addition to unlimited message chains, enables us to create both waiting and non-waiting limited message chains. It has three main overloads:

  • create_chain(env, timeout, max_size, memory_usage, overflow_reaction): makes a waiting limited chain,
  • create_chain(env, max_size, memory_usage, overflow_reaction): makes a non-waiting limited chain,
  • create_chain(env): makes an unlimited chain.

memory_usage and overflow_reaction are enumerations of type, respectively:

  • mchain_props::memory_usage_t
  • mchain_props::overflow_reaction_t

For example:

auto chain = create_mchain(env,
      200,
      mchain_props::memory_usage_t::dynamic,
      mchain_props::overflow_reaction_t::drop_newest);

creates a non-waiting dynamically-allocated message chain with at most 200 messages that drops new ones in case of overflow. This instead:

auto chain = create_mchain(env,
      100ms,
      200,
      mchain_props::memory_usage_t::preallocated,
      mchain_props::overflow_reaction_t::drop_oldest);

makes a 100ms-waiting preallocated message chain with at most 200 messages that drops old ones in case it’s full.

This method of creating message chains requires explicit specification of all the parameters. Differently, the other approach involves calling an environment’s member function:

env.create_mchain(params)

where params is an instance of type mchain_params_t, a parameter object that collects all the message chain creation settings. Actually, the two ways are equivalent for most cases but the second is the only one that allows the installation of a non-empty notificator (as we already encountered in a previous post. SObjectizer provides also three free functions to easily make an instance of mchain_params_t corresponding to the three overloads of create_mchain() we have seen before:

  • make_limited_with_waiting_mchain_params(max_size, memory_usage, overflow_reaction, timeout)
  • make_limited_without_waiting_mchain_params(max_size, memory_usage, overflow_reaction)
  • make_unlimited_mchain_params()

For example:

auto chain = env.create_mchain(
		make_limited_without_waiting_mchain_params(
			200,
			mchain_props::memory_usage_t::dynamic,
			mchain_props::overflow_reaction_t::drop_newest)
	);

is functionally equivalent to a previous snippet based on non-member create_mchain() to make a non-waiting dynamically-allocated message chain with at most 200 messages that drops new ones in case of overflow. However, using this approach, we can add a non-empty notificator:

auto chain = env.create_mchain(
		make_limited_without_waiting_mchain_params(
			200,
			mchain_props::memory_usage_t::dynamic,
			mchain_props::overflow_reaction_t::drop_newest
		).not_empty_notificator(...lambda...)
	);

Limited message chains and timers

Before applying this feature to solve our original problem, it’s worth spending a few words on a detail that pertains to limited message chains when used in combination with periodic and delayed messages. This aspect is covered here in the documentation.

In essence, since SObjectizer’s timer thread cannot be stopped, if we send to a message chain from a delayed or periodic send function (e.g. send_periodic, send_delayed) then SObjectizer silently bypasses two customization points that might have been set:

  • even if waiting is specified for the chain, if it becomes full, there won’t be any waiting;
  • if set, overflow_reaction_t::throw_exception is replaced by overflow_reaction_t::drop_newest. This means, the timer thread does not get an exception when trying to push to a full message chain.

These two details are important when working with limited message chains.

Limiting message flow with chains

At this point, we can finally apply what we have learnt to the original problem. We need to limit face_detector‘s buffer to at most 100 frames. In case of reaching this limit, we drop new data. As you might imagine, we will act on this piece of code:

m_buffer = so_environment().create_mchain(make_unlimited_mchain_params().not_empty_notificator([this] {			
	// ... as before
}));

by replacing make_unlimited_mchain_params() with make_limited_without_waiting_mchain_params():

m_buffer = so_environment().create_mchain(
	make_limited_without_waiting_mchain_params(100, mchain_props::memory_usage_t::preallocated, mchain_props::overflow_reaction_t::drop_newest)
	.not_empty_notificator([this] {			
		// ... as before ...
}));

The choice between memory_usage_t::preallocated or memory_usage_t::dynamic depends on the specific requirements of the use case at hand. In our scenario, the buffer will quickly reach full capacity, hence preallocating it might be a reasonable choice.

We might consider limited chains as another reason in favor of the “fast shutdown” design approach based on message chains discussed in a previous article, since they support limit as a built-in feature.

Drop oldest: an opportunity

Even though limited chains do not provide reactions for redirection or transformation, we do have access to drop oldest that is not offered by message limits. This opens doors to interesting use cases that require employing a circular ring (or buffer) capability.

A common scenario in video surveillance systems involves continuous frame capture by the camera. When a specific event occurs, the camera needs to retain a set number of frames (usually based on a duration, such as 1 minute) before and possibly after the event, because we likely need to investigate the event’s root cause. This functionality can be facilitated through a circular ring or buffer capability.

For example, dashcams (dashboard cameras) are onboard cameras that continuously record the view through a vehicle’s window. A fundamental feature of such devices involves not persisting the recording continuously but rather upon the occurrence of an automatic or manual event trigger (such as the detection of another vehicle approaching or the driver manually pushing a button). A possible way to implement this feature consists in a circular buffer that keeps frames within a certain time-based window without saturating the available memory.

Therefore, limited chains utilizing the “drop oldest” reaction represent a fundamental implementation of a circular buffer, often adequate for various non-trivial use cases.

Why message boxes are not inherently limited

Having gained a more comprehensive understanding of limiting message flow within SObjectizer, you might have observed a significant distinction between message boxes and message chains. Message boxes are not inherently limited but are constrained solely in relation to an agent, whereas message chains are inherently designed with a concept of limitation that implicitly influences any receivers.

Why is that?

The answer to this question is associated with the primary role of message chains. As said in a previous article, message chains have been created to make non-SObjectizer code interoperate with SObjectizer-based components. When a message is pushed to a message chain, SObjectizer doesn’t know when or if that message will be received and handled. It simply tries “buffering” the message within the chain.

On the other hand, message boxes are subscribed to by SObjectizer components, primarily agents, and generally by message sinks. This means, when a message is dispatched to a message box, its reception is entirely contingent on the subscribers, managed by SObjectizer. The concept of “saturating” a message box isn’t applicable since it merely acts as a conduit, since messages are routed to their actual destinations, such as agents’ message queues. It’s solely at that level where the limitation can be enforced.

As a result, if a message box has no subscribers, it will behave like a “null” sink, dropping every message sent to it. In contrast, an unlimited message chain without any recipients will indefinitely expand in size when messages are sent to it until the process consumes all the available memory. This crucial disparity highlights that message chains cannot overlook the concept of limit, rather this aspect is inherently integrated into their design, rendering message chains a comprehensive and versatile tool. Indeed, limited message chains serve purposes extending beyond solely controlling message flow, as exemplified by their use as a ring buffer and in various other sophisticated pipeline and agent designs.

Moreover, as discussed before, another noteworthy feature of message chains is the ability to pause so_5::send when the buffer reaches its capacity. This serves as a straightforward yet frequently effective backpressure mechanism, well-suited for numerous real-world scenarios. Conversely, blocking a send operation on message boxes would be impractical, as it could potentially lead to perpetual blocking of an agent.

Takeaway

In this episode we have learned:

  • a message chain is either limited or unlimited;
  • limited message chains have a maximum capacity, a memory allocation strategy, and an overflow reaction policy; additionally, they can be configured with the capability to wait for an available slot within a designated timeout period;
  • waiting and the overflow reaction are performed at the sender’s site;
  • SObjectizer provides four overflow reactions: dropping the new message, dropping the oldest message, throwing an exception, aborting the application;
  • “drop oldest” reaction is a basic tool for implementing a ring buffer;
  • unlimited message chains might expand infinitely in the absence of any component receiving data; for this reason, message chains inherently incorporate the aspect of limitation into their design.

As usual, calico is updated and tagged.

What’s next?

As the team has gained more expertise in how to limit message flow, some colleagues will be focusing on existing agents to address potential associated issues.

In the meantime, during our monthly “design playground session”, Paolo has posed a new challenge: “are there alternative methods to distribute work among multiple workers spawning only a single agent?”. The question seems a bit cryptic but we might have heard something about this last week…

In the next post, we’ll explore another feature of SObjectizer that pertains to message handlers and thread safety!


Thanks to Yauheni Akhotnikau for having reviewed this post.

Leave a comment