SObjectizer Tales – 17. Limiting message flow

Posted: February 1, 2024 in SObjectizer Tales
Tags: , ,

After the recent stand-up meeting, Paolo, our lead architect, started a discussion concerning a potential out of memory issue. In essence, he simulated an excessive message flow directed towards an instance of face_detector that can’t dequeue messages fast enough, leading to a rapid increase in memory usage:

To simulate this behavior, you can change our virtual_image_producer by lowering the delay time between two frames (e.g. 1ms):

st_started.event([this](so_5::mhood_t<grab_image>) {
	const auto tic = std::chrono::steady_clock::now();
	cv::Mat mat;
	m_current_it = std::find_if(m_current_it, {}, [](const auto& e) {
		return e.is_regular_file();
	});
	so_5::send<cv::Mat>(m_channel, cv::imread(m_current_it->path().string()));
	so_5::send_delayed<grab_image>(*this, 1ms); // much faster!!!
	if (++m_current_it == std::filesystem::directory_iterator{})
	{
		m_current_it = std::filesystem::directory_iterator{ m_path };
	}
}

Although we have equipped face_detector with the capability to interrupt ongoing tasks during shutdown, Paolo highlighted that it remains susceptible to receiving an excessive number of messages that it can’t process fast enough.

This topic is crucial in message passing-styled applications and it pertains to protecting against excessively intensive – or just limiting – message flow. In this new article we’ll explore some facilities provided by SObjectizer to effectively regulate message flow and we’ll also discuss how to use them within agents.

Premise: evidently, the most straightforward method to limit message flow involves sending less data. This could include, for example, filtering messages in advance or reducing the sample rate of data acquisition. These approaches operate at the source level and may not always be applicable. However, in situations where an excess of data reaches an agent in any case, the tools presented here can come into play.

Introducing message limits

In general, crafting a suitable overload control mechanism for specific issues is a challenging task, contingent on various problem-specific considerations. Determining how to handle messages exceeding size-limited event queues – whether to discard, log, store, or take alternative actions – makes the ideal tool highly problem-specific.

Moreover, determining what constitutes an intense message flow depends on the context of the application. Consider a server that transmits data to numerous clients. In such scenarios, if the client struggles to process the incoming data quickly enough, the server typically implements some backpressure strategies. These strategies may involve setting a maximum limit on the raw amount of data buffered in the queue (e.g. 20 MB), time-related limits (e.g. 5 minutes), or even a certain number of messages allowed in the queue regardless of their dimension. When the threshold is surpassed, the server can take countermeasures. However, which threshold to apply typically depends on the use case.

For all these reasons, SObjectizer only offers a simple and fundamental tool for limiting message flow: a way to restrict the number of messages within an agent’s event queue. This feature is called message limits. In essence, an agent can accept one or more message limit settings when created. By default, this feature is disabled, allowing the agent to continuously receive messages without any restrictions (as we have done so far).

Once a message limit is defined, we are required to define a limit for every message the agent intends to subscribe to, otherwise an exception will be thrown (e.g. from so_subscribe()). Message limits cannot be modified at any other point after their creation so we must define them all in the constructor. To define a message limit, we need to provide three information:

  • the type of message to limit;
  • the maximum number of messages allowed (the “threshold”);
  • the reaction SObjectizer should do when the limit is exceeded.

The latter means that we configure a sort of “reaction strategy” that SObjectizer will execute for us if the limit for a certain message type is surpassed in an agent. Four kind of reactions are provided:

  • drop exceeding messages silently;
  • abort the application;
  • redirect exceeding messages to another message box;
  • transform exceeding messages and send them to a message box.

At this point, we start practicing with message limits by introducing one to the original implementation of face_detector:

class face_detector final : public so_5::agent_t
{
public:
	face_detector(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output)
		: agent_t(std::move(ctx)), m_input(std::move(input)), m_output(std::move(output))
	{
	}
		
	void so_define_agent() override
	{
		so_subscribe(m_input).event([this](const cv::Mat& src) {
			// ... core logic
			so_5::send<cv::Mat>(m_output, std::move(cloned));
		});
	}
	
	void so_evt_start() override
	{
		if (!m_classifier.load("haarcascade_frontalface_default.xml"))
		{
			throw std::runtime_error("Can't load face detector classifier 'haarcascade_frontalface_default.xml'");
		}
	}	
private:
	so_5::mbox_t m_input; so_5::mbox_t m_output;
	cv::CascadeClassifier m_classifier;
};

To introduce a message limit, we act on the agent’s constructor:

face_detector(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output)
	: agent_t(std::move(ctx)), m_input(std::move(input)), m_output(std::move(output))
{
}

To define a message limit, we add a “tuning option” to agent_context_t that supports operator+ to “add” settings to it:

face_detector(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output)
	: agent_t(std::move(ctx) + limit_then_drop<cv::Mat>(100)), m_input(std::move(input)), m_output(std::move(output))
{
}

Here above, we have added a limit of 100 messages on the type cv::Mat, configuring the “drop” reaction. In other words, at most 100 cv::Mat are allowed to stay in the agent’s message queue before being handled and processed by the agent. Extra messages will be silently discarded and won’t reach the agent’s queue. All message limits are in the form limit_then_something. Adding more limits is straightforward as operator+ naturally chains, for example:

some_agent(so_5::agent_context_t ctx)
	: agent_t(std::move(ctx) + limit_then_drop<cv::Mat>(100) + limit_then_abort<some_message_type>(10))
{
}

However, Paolo raised an issue on all the implementations of the face_detector, including those mentioned in the preceding post. Setting limits in those cases doesn’t help. Can you guess why?

The reason behind is that those implementations of face_detector don’t buffer images within the agent’s queue; instead, they add data to other data structures that aren’t governed by message limits, such as std::queue. In practice, we separated the act of “handling” images from their subsequent “processing”. In other words, buffering happens outside of the agent’s queue.

What to do in this situation?

The short and obvious answer is that we should implement some strategies on top of the data structure we adopt for buffering, such as std::queue. For example, in face_detector, if the queue already contains 100 images, we might silently drop the next one:

so_subscribe(m_input).event([this](so_5::mhood_t<cv::Mat> m) {		
	if (m_buffer.size() < 100)
	{
		m_buffer.push(m.make_holder());
		if (m_buffer.size() == 1)
		{
			so_5::send<process_one_buffered_image>(*this);
		}
	}			
});

This small change addresses Paolo’s problem, but it might leave us feeling somewhat dissatisfied. Indeed, while implementing all the “standard” strategies is relatively straightforward, it’s conceivable that this “manual” process could become tedious and repetitive (it might be a viable option to refactor these details into a more generic agent, streamlining and abstracting these common functionalities for reuse and simplification).

Actually, SObjectizer provides an alternative but it necessitates the utilization of a message chain. This feature will be elaborated on in the upcoming installment.

Instead, the rest of this episode will focus on other aspects of message limits – perhaps less obvious – that play an important role not only in restricting message flow but also in designing agents effectively.

The effectiveness of message limits

Message limits are a natural and effective means of control when we need to limit the number of messages in an agent’s queue. Typically, the efficacy of this basic tool for limiting message flow becomes evident when handling and processing a message are combined within the same operation, as said before. For example, this happens in the first implementation of face_detector, in image_viewer, and in image_resizer, just to mention a few.

In all such cases, “buffering” is done in the agent’s queue and only here message limits can restrict incoming flow. Then we can configure some predefined reactions and eventually craft more advanced strategies, such as redirecting exceeding messages to other – specialized – agents, logging such extra data, or whatever else. As said, most of the time these strategies are application-specific.

In addition to these obvious considerations, message limits demonstrate their effectiveness in two other noteworthy aspects:

  • defensive design
  • invariant expression

First of all, a prevalent use case for message limits involves designing agents defensively. Often, we might proactively set limits on agents within the system due to uncertainty regarding the volume of messages they might receive – having these limits configurable could be also beneficial. This cautious approach helps mitigate the risk of potential out-of-memory issues. In this case, it’s essential to take decisions on exceeding messages, as silently dropping them might be undesirable.

The second aspect of message limits is their natural ability to express agent’s invariants. For example, recalling the recent implementation of face_detector:

class face_detector final : public so_5::agent_t
{
	struct process_one_buffered_image final : so_5::signal_t {};
public:
	face_detector(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output)
		: agent_t(std::move(ctx)), m_input(std::move(input)), m_output(std::move(output))
	{
	}

	void so_define_agent() override
	{
		so_subscribe(m_input).event([this](so_5::mhood_t<cv::Mat> m) {		
			m_buffer.push(m.make_holder());
			if (m_buffer.size() == 1)
			{
				so_5::send<process_one_buffered_image>(*this);
			}
		});
		
		so_subscribe_self().event([this](so_5::mhood_t<process_one_buffered_image>) {
			const auto mat = m_buffer.front();
			// ... core logic ...
			so_5::send<cv::Mat>(m_output, std::move(cloned));
			m_buffer.pop();
			if (!m_buffer.empty())
			{
				so_5::send<process_one_buffered_image>(*this);
			}
		});
	}

private:
	so_5::mbox_t m_input;
	so_5::mbox_t m_output;
	std::queue<so_5::message_holder_t<cv::Mat>> m_buffer;
};

From a logic standpoint, we are sure the agent can’t have more than one instance of process_one_buffered_image in the message queue otherwise our effort to decouple handling and processing images would be in vain. This invariant can be effectively expressed with message limits:

face_detector(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output)
	: agent_t(std::move(ctx) + limit_then_abort<process_one_buffered_image>(1) + limit_then_drop<cv::Mat>(100)), // ... as before
{
}

limit_then_abort<process_one_buffered_image>(1) clearly states that having more than one process_one_buffered_image in the agent’s queue breaks an invariant. Clearly, aborting the application is a fatal decision and other options are viable (such as transforming this into an error condition and sending the result to another message box), but as Yauheni told me in a private conversation ”limit_then_abort may play the role similar to the noexcept keyword in C++. The noexcept doesn’t guarantee that a function can’t throw. It just says that if the function throws then there is no way to recover. limit_the_abort expresses something similar: there is no guarantee that there won’t be an overflow, but if it happens then there is no way to recover and it’s better to crash early”.

Bear in mind that message limits incur costs only when used. Therefore, in this scenario, we’re activating this feature solely to express an invariant. Is it worth it? Perhaps, or perhaps not. In this particular case, the “cost” will be close to zero, in practice. If you want to know a bit more of how message limits work, check out this section of the documentation.

Before concluding, it’s worth exploring another feature of message limits.

Setting default limits

As said, whenever a limit is set for a particular message type similar limits must be defined for all others. This requirement can become bothersome and error-prone. Additionally, because this validation isn’t conducted at compile-time, it becomes crucial to ensure that every programmer modifying the agent in the future keeps limits up to date.

For all such reasons, SObjectizer provides a special type any_unspecified_message that can be used to set a “default” message limit for any unspecified message (or signal) type. This type can’t be used with limit_then_transform but it can with:

  • limit_then_drop
  • limit_then_abort
  • limit_then_redirect

As an example, let’s add a default limit to stream_heartbeat_with_detector from a previous post:

stream_heartbeat_with_detector(so_5::agent_context_t ctx, so_5::mbox_t detector_channel)
	: agent_t(std::move(ctx) + limit_then_drop<any_unspecified_message>(100)), m_channel(std::move(detector_channel))

The use of any_unspecified_message is also commonly employed for designing agents defensively. It can be applied to restrict the incoming message flow of agents without specific consideration for message type, serving as a general mechanism to control the overall message intake.

In addition, the role of any_unspecified_message is crucial for addressing scenarios where an agent indirectly subscribes to message types that are concealed within SObjectizer’s internals.

Limiting unknown message types

image_viewer (and image_viewer_live) suffer the same issues as face_detector since imshow involves a non-trivial code path. Hence, sending an excessive number of messages to these agents could quickly result in high buffering. Thus, setting a message limit on cv::Mat can make some sense. Since the agent subscribes also to call_waitkey, we must define an additional limit for this type. The updated constructor is here below:

image_viewer(so_5::agent_context_t ctx, so_5::mbox_t channel)
	: agent_t(std::move(ctx) + limit_then_drop<cv::Mat>(100) + limit_then_drop<call_waitkey>(100)), // ... as before
{
}

However, this small change would lead to brutally aborting the application…Can you guess why?

The underlying reason pertains to a “hidden” message type that the agent indirectly subscribes to, yet for which we did not establish a limit. Let’s see the full implementation of so_define_agent():

void so_define_agent() override
{
	st_handling_images
		.event(m_channel, [this](const cv::Mat& image) {
			imshow(m_title, image);
			cv::waitKey(25);
			st_handling_images.time_limit(500ms, st_stream_down);
		});

	st_stream_down
		.on_enter([this] {
			so_5::send<call_waitkey>(*this);
		})
		.event([this](so_5::mhood_t<call_waitkey>) {
			cv::waitKey(25);
			so_5::send<call_waitkey>(*this);
		}).transfer_to_state<cv::Mat>(m_channel, st_handling_images);

	st_handling_images.activate();
}

When Yauheni reviewed this code some time ago, he replied: “the problem is that the call to st_handling_images.time_limit() creates a new subscription to SObjectizer’s internal message msg_timeout“. In essence, when a state has a time limit then on_enter() tries to make a unique subscription for an internal msg_timeout message. But on_enter() is marked noexcept and, you know, leaving a noexcept function with an exception leads to terminating the program.

The proposed solution is to use any_unspecified_message to specify a limit for this “unknown” (and potentially subject to change in the future) internal message type:

image_viewer(so_5::agent_context_t ctx, so_5::mbox_t channel)
	: agent_t(std::move(ctx) + 
		limit_then_drop<cv::Mat>(100) + 
		limit_then_drop<call_waitkey>(100) + 
		limit_then_drop<any_unspecified_message>(100)), // ... as before
{
}

An alternative way to write this involves integrating the limit for call_waitkey within the scope of any_unspecified_message:

image_viewer(so_5::agent_context_t ctx, so_5::mbox_t channel)
	: agent_t(std::move(ctx) + 
		limit_then_drop<cv::Mat>(100) + 
		limit_then_drop<any_unspecified_message>(100)), // ... as before
{
}

Takeaway

In this episode we have learned:

  • SObjectizer provides message limits as a basic tool for limiting the number of messages (and signals) in an agent’s message queue;
  • by default, message limits for an agent are deactivated, but, if a limit is defined for a specific message, it becomes mandatory to define limits for every other message the agent subscribes to;
  • a limit consists of three information: the type of message, the maximum number of messages allowed, the reaction strategy;
  • SObjectizer provides four reaction strategies: dropping extra messages, aborting the application, redirecting extra messages, transforming and sending extra messages;
  • message limits are an effective control mechanism when limiting the number of messages in the agent’s queue is essential. This typically happens when handling and processing data are combined in the same operation;
  • message limits are also effective tools for defensive design and expressing invariants;
  • setting a limit for “any” message type is possible through the special type any_unspecified_message.

As usual, calico is updated and tagged.

What’s next?

While we’ve resolved Paolo’s issue, in the next segment, we’ll delve into an alternative method for limiting excessive message flow that revolves around message chains, opening doors to new usages of this fundamental data structure.


Thanks to Yauheni Akhotnikau for having reviewed this post.

Leave a comment