SObjectizer Tales – 16. Interrupting work

Posted: January 25, 2024 in SObjectizer Tales
Tags: , ,

In this new episode, we’ll delve into another crucial topic that is at times overlooked or simply assumed. The heart of the matter was anticipated last time, following up on an email we got from our colleague Gerri who brought up a new problem: occasionally, when the program is requested to terminate, the actual shutdown takes too long.

After inquiring further with Gerri regarding his use of calico, we discover that he processes large images using a custom image processor he developed, eventually saving all these processed images to disk. When requesting calico to shut down by pressing CTRL+C, it appears that some frames are still undergoing processing through his algorithm, causing delays in the overall pipeline.

At this point, we wonder what the correct behavior should be. From our perspective, we could argue that losing data is undesirable. If frames have been acquired and dispatched for processing, it would be more logical for the program to conclude its work gracefully instead of abruptly disregarding the last set of data. Right?

Well, as always, the answer to this question depends on the context. Frankly speaking, when the user decides to shutdown a program, it might be bothersome to keep them waiting for more than a few seconds, otherwise they probably will kill the program anyway. In other scenarios, which are quite common nowadays, the program might function as a service or a containerized application managed by other programs – such as orchestrators or similar entities – that also do not wait too much. Typically, they have a configurable “graceful shutdown timeout” that is 10 or 15 seconds (for example, by default, docker stop sends SIGTERM and then waits for 10 seconds before killing the running container).

Hence, in certain situations, this particular aspect is non-negotiable.

The topic is complex and goes beyond the series but it’s perfectly on point here. As developers of calico, we must make a decision about what to do with pending work when the shutdown request is received. Often, real-world applications might adopt cautious strategies, such as storing the pending tasks in a persistent location (disk, cloud, etc) for potential resumption at a later time. And usually, this action is part of the normal application’s execution, not specifically executed during shutdown.

For the moment, we decide to apply a simple strategy: any pending work must be gracefully interrupted. In this context, “gracefully” means that every operation currently in progress (e.g. an individual disk write operation) will be waited for completion. On the other hand, new tasks must be dropped.

In this regard, calico‘s agents must be assessed to identify which tasks among them could potentially cause delays in the shutdown process. Finally, every agent in the “risk zone” should be fixed somehow. This final action will lead us to explore methods for interrupting pending work, that is the main topic of this article. While our primary focus will be on discarding tasks during shutdown, the strategies outlined here could be applied at any given time.

Can we drop pending messages?

A couple of years ago, I naively asked this question to the SObjectizer folks: “what is the best way for an agent to unsubscribe from all its pending messages on shutdown?”. The question refers to message boxes. For example, consider our face_detector agent’s subscription:

void face_detector::so_define_agent()
{
	so_subscribe(m_input).event([this](const cv::Mat& src) {
		// ... potentially intensive work for a single frame ...
		so_5::send<cv::Mat>(m_output, std::move(cloned));
	});
}

Given that its operations might require considerable time for each frame, we would like to discard all remaining frames when the shutdown occurs, without processing them. However, this is not so easy as is. Once frames have been dispatched to face_detector‘s message queue, only the agent and the dispatcher can ignore them. And the problem is how to change the agent state when the program is shutting down if the so_evt_finish() notification arrives in the form of another message that will be processed after all the pending frames?! In other words, suppose the agent has still to process frame1, frame2, frame3; if shutdown occurs, so_evt_finish() will be handled only after frame3, so our attempt would be futile.

A solution would be to use message priorities. In essence, a “stop” message would have higher priority than ordinary frames, enabling it to jump to the front of the queue. However, SObjectizer does not support message priorities (anymore).

The other entity that could intervene is the dispatcher which is responsible for extracting a message from the agent’s queue and for binding the corresponding handler to the proper worker thread. So, in theory, we might write a special “dispatcher decorator” that works on top of another dispatcher (that we selected for a particular agent or cooperation) just to drop all pending messages at some point. However, this approach is exceptionally challenging, if not entirely unfeasible, to implement (an ad-hoc dispatcher to handle “message priorities” was proposed by Yauheni, however this will be tailored to the particular use case and can’t reuse existing dispatchers).

Therefore, we should explore alternative strategies for structuring agents in a manner that permits discarding remaining messages upon a shutdown signal. These strategies should be practical, avoiding inefficiencies or overly complex logic.

Interrupting work with message chains

The first approach aligns with a concept we’ve already partially implemented in a previous episode while discussing the splitting of tasks. For your convenience, here is the current implementation of image_save_worker:

class image_save_worker final : public so_5::agent_t
{
public:
    image_save_worker(so_5::agent_context_t ctx, so_5::mchain_t input, std::filesystem::path root_folder)
        : agent_t(std::move(ctx)), m_input(std::move(input)), m_root_folder(std::move(root_folder))
    {
    }
 
    void so_evt_start() override
    {       
        receive(from(m_input).handle_all(), [this](const cv::Mat& image) {
            imwrite((m_root_folder / std::format("image_{}_{}.jpg", m_worker_id, m_counter++)).string(), image);
        });
    }
 
private:
    so_5::mchain_t m_input;
    std::filesystem::path m_root_folder;
    int m_counter = 0;
    static inline int global_id = 0;
    int m_worker_id = global_id++;
};

You should remember that a couple of instances of this agent are spawned by a sort of “coordinator” called image_saver. This agent is also responsible for closing the shared message chain on shutdown:

void image_saver::so_evt_finish()
{
	close_retain_content(so_5::terminate_if_throws, m_chain);
}

In this case, we used the “retain” strategy but if instead we use close_drop_content, the result would be to discard any pending messages. Task accomplished.

Hence, we can implement the same concept in other agents that are not “workers”. For example, let’s see how to rewrite the face_detector from a previous episode. This is the current implementation:

class face_detector final : public so_5::agent_t
{
public:
	face_detector(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output)
		: agent_t(std::move(ctx)), m_input(std::move(input)), m_output(std::move(output))
	{
	}
		
	void so_define_agent() override
	{
		so_subscribe(m_input).event([this](const cv::Mat& src) {			
			// ... core logic ...
			so_5::send<cv::Mat>(m_output, std::move(cloned));
		});
	}
	
	void so_evt_start() override
	{
		if (!m_classifier.load("haarcascade_frontalface_default.xml"))
		{
			throw std::runtime_error("Can't load face detector classifier 'haarcascade_frontalface_default.xml'");
		}
	}	
private:
	so_5::mbox_t m_input; so_5::mbox_t m_output;
	cv::CascadeClassifier m_classifier;
};

The idea is to modify the core part in order to receive from a message chain. We have a few things to do:

  • forwarding the images from the input channel to the message chain,
  • turning the subscription into a receive operation,
  • closing the chain on shutdown.

The first point can be resolved by employing a message sink binding, similar to what we implemented for the save workers. The second point is also akin to save workers. The last point might require a separate agent responsible for closing the chain when a shutdown request is received.

Here is the new implementation of face_detector:

class face_detector final : public so_5::agent_t
{
public:
	face_detector(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output)
		: agent_t(std::move(ctx)), m_input(std::move(input)), m_output(std::move(output))
	{
	}
			
	void so_evt_start() override
	{
		if (!m_classifier.load("haarcascade_frontalface_default.xml"))
		{
			throw std::runtime_error("Can't load face detector classifier 'haarcascade_frontalface_default.xml'");
		}
		
		single_sink_binding_t binding;
		const auto buffer = create_mchain(so_environment());
		binding.bind<cv::Mat>(m_input, wrap_to_msink(buffer->as_mbox()));

		so_environment().introduce_coop(so_5::disp::active_obj::make_dispatcher(so_environment()).binder(), [&](so_5::coop_t& c) {
			c.make_agent<chain_closer>(buffer);
		});

		receive(from(buffer).handle_all(), [this](const cv::Mat& src) {
			// ... core logic ...
			so_5::send<cv::Mat>(m_output, std::move(cloned));
		});
	}	
private:
	so_5::mbox_t m_input; so_5::mbox_t m_output;
	cv::CascadeClassifier m_classifier;
};

And here is the chain_closer agent:

class chain_closer final : public so_5::agent_t
{
public:
	chain_closer(so_5::agent_context_t ctx, so_5::mchain_t chain)
		: agent_t(std::move(ctx)), m_chain(std::move(chain))
	{
	}

	void so_evt_finish() override
	{
		close_drop_content(so_5::terminate_if_throws, m_chain);
	}

private:
	so_5::mchain_t m_chain;
};

Functionally speaking, the new face_detector is equivalent to the other one. From a developer perspective, introducing a binding should be acceptable (practically speaking, it’s like pushing a message to an additional queue) but we would like to shortly comment on the introduction of the extra agent to close the chain.

This approach, reminiscent of what we discussed in the post concerning save workers, conceals specific details externally. In particular, chain_closer doesn’t necessitate foreknowledge that it will utilize a message chain internally. This makes its adoption quite straightforward. However, using an active_obj dispatcher, a new worker thread is introduced for every new chain_closer added by face_detector. Despite the fact that chain_closer essentially performs nothing apart from the asynchronous chain close operation during shutdown, the proliferation of new threads could be avoided by utilizing alternative methods.

Just to mention a couple, one approach involves introducing a “unified chain closer” agent responsible for managing all the chains that require automatic closure. In this scenario, agents like face_detector might either create the chain and “send” it to the unified chain closer, or retrieve the chain from the constructor (already managed by the unified closer). On the other hand, if our concern is only related to the proliferation of threads, multiple chain closers can be bound to the same active_group dispatcher that creates a single worker thread queue for a (named) group of agents.

However, this article won’t delve into such details, as they might be discussed in a separate post.

The public interface of the face_detector remains unchanged. To observe the impact of the new implementation, consider using it in debug mode as it may buffer, potentially revealing issues similar to those mentioned by Gerri. Following some duration, terminate the program using CTRL+C, and the shutdown should be quick.

The key benefit of interrupting work with message chains is the simplicity and the relatively minimal effort required to transition the agent logic from subscription-based to receive-based code. The disapproving frown stems from the additional internal work needed, specifically the forwarding of messages to the chain – although our example was simple, in general, an agent might subscribe to numerous message types – and the management of chain closure that is demanded to another agent.

Decoupling message reception and handling

The second approach is based on explicitly decoupling the receiving of the message from the execution of the handler (aka: the logic) associated with it. The idea comes from one simple idiom we have learned at the dawn of this series when we refactored a loop to use message passing: the agent processes incoming images only upon receiving a message sent to itself, rather than processing images upon their arrival. Thus, the reaction to an incoming image is just an enqueuing to an internal buffer. As long as the buffer contains at least one image, the agent keeps on sending a “do work” signal to itself. In this manner, the reception of images becomes a “quick” operation, while the processing of the “do work” signal remains the slower part.

As a result, when the agent is requested to shut down, despite having a potentially high number of images still to process, it will automatically stop receiving and processing further messages (including the “do work” signal).

Showing the code is likely simpler than explaining it:

class face_detector final : public so_5::agent_t
{
	struct process_one_buffered_image final : so_5::signal_t {};
public:
	face_detector(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output)
		: agent_t(std::move(ctx)), m_input(std::move(input)), m_output(std::move(output))
	{
	}

	void so_evt_start()
	{
		if (!m_classifier.load("haarcascade_frontalface_default.xml"))
		{
			throw std::runtime_error("Can't load face detector classifier 'haarcascade_frontalface_default.xml'");
		}
	}

	void so_define_agent() override
	{
		so_subscribe(m_input).event([this](const cv::Mat& m) {		
			m_buffer.push(m);
			if (m_buffer.size() == 1)
			{
				so_5::send<process_one_buffered_image>(*this);
			}
		});
		
		so_subscribe_self().event([this](so_5::mhood_t<process_one_buffered_image>) {
			const auto src = m_buffer.front();
			// ... core logic ...
			so_5::send<cv::Mat>(m_output, std::move(cloned));
			
			m_buffer.pop();
			if (!m_buffer.empty())
			{
				so_5::send<process_one_buffered_image>(*this);
			}
		});
	}

private:
	so_5::mbox_t m_input;
	so_5::mbox_t m_output;
	std::queue<cv::Mat> m_buffer;
	cv::CascadeClassifier m_classifier;	
};

A few details it’s worth describing:

  • process_one_buffered_image expresses the response action to be taken,
  • the core logic handler can’t be executed if the buffer is empty, so we can safely dequeue one image from the queue,
  • when an image is received, the sending of process_one_buffered_image must occur only when the queue becomes non-empty (if the queue has other data, the signal is sent at the end of the core logic handler).

Just for your information, if the core logic operates slower than the rate of received frames, this agent may exhibit a form of “burst” behavior, characterized by multiple enqueues occurring in rapid succession, such as:

enqueue
enqueue
enqueue
...
enqueue
do work
enqueue
enqueue
enqueue
...
do_work
...
finish
maybe a final do work

At this point, it’s worth asking a few questions. First of all, to enqueue the message we have to make a copy of it. With cv::Mat, this isn’t problematic as it functions similar to a shared pointer. However, in general, minimizing further copies might be desirable. Is there a potential solution or approach we can consider to address this concern?

Let’s delve deeper into a concept we introduced a while back: message holders.

You remember from this initial article that SObjectizer supports three ways for receiving message references:

(const cv::Mat& image)
(so_5::mhood_t<cv::Mat> image)
(const so_5::mhood_t<cv::Mat>& image)

Specifically, we previously mentioned that mhood_t is a smart reference that incorporates distinctive features, such as allowing us to participate in the ownership of the underlying object. That’s exactly what we need in this context.

The idiom here is to call a function of mhood_t that creates a message holder for the underlying message. In other words, it returns a sort of shared pointer to the message that SObjectizer utilizes internally for transmission. Message holders have some features that are not relevant here and won’t be discussed in this article.

The only thing we need here is to change the type of the queue to message_holder_t and handle the images in the proper way. The queue turns to:

std::queue<so_5::message_holder_t<cv::Mat>> m_buffer;

The handler turns to:

so_subscribe(m_input).event([this](so_5::mhood_t<cv::Mat> m) {		
	m_buffer.push(m.make_holder());
	// ... as before ...

make_holder() is the function to call on mhood_t to get a message holder pointing to the underlying data. This brings us to a pragmatic guideline: when specifying a message handler for the type T, using const T& is more natural and preferred. However, mhood_t<T> is suitable in particular scenarios, such as taking part in the ownership of the message instance as just demonstrated, and it’s also mandatory for handling signals. Moreover, mhood_t<T> is capable of automatically absorbing changes if T is originally a message and is subsequently changed to a signal, and vice versa. Thus, it serves as a practical option when we develop a templated agent on T that subscribes to T that might be either a message or a signal.

The second question is related to the number of message types we can handle. face_detector is simple and needs to handle only cv::Mat, however agents in general subscribe to an arbitrary number of message types and signals. As if there were not enough, message holders can’t be used with signals.

What can we do?

A first way to handle multiple message types consists in diversifying the number of queues. One queue per message type:

class another_agent final : public so_5::agent_t
{
	struct process_one_buffered_image final : so_5::signal_t {};
	struct process_one_buffered_other_type final : so_5::signal_t {};
	// ...
public:
	// ... as before

	void so_define_agent() override
	{
		so_subscribe(m_input).event([this](so_5::mhood_t<cv::Mat> m) {		
			m_image_buffer.push(m.make_holder());
			if (m_image_buffer.size() == 1)
			{
				so_5::send<process_one_buffered_image>(*this);
			}
		}).event([this](so_5::mhood_t<other_type> m) {
			m_other_type_buffer.push(m.make_holder());
			if (m_other_type_buffer.size() == 1)
			{
				so_5::send<process_one_buffered_other_type>(*this);
			}
		});
		
		so_subscribe_self().event([this](so_5::mhood_t<process_one_buffered_image>) {
			// ... dequeue and process one message from m_image_buffer
		}).event([this](so_5::mhood_t<process_one_buffered_other_type>) {
			// ... dequeue and process one message from m_other_type_buffer
		});
	}

private:
	// ...
	std::queue<so_5::message_holder_t<cv::Mat>> m_image_buffer;
	std::queue<so_5::message_holder_t<some_other_type>> m_other_type_buffer;
	// ...
};

In case we need to buffer signals, since message holders can’t be employed, we should turn them into another type or use alternative ways to buffer them.

However, there is another solution that might be taken in consideration and involves replacing all the std::queues with a single message chain. Indeed, message chains support any message types and signals too. The idea is to send all the messages and signals to the chain, and to react to a signal process_one_buffered_message by receiving one data from the chain. The implementation is here below:

class another_agent final : public so_5::agent_t
{
	struct process_one_buffered_message final : so_5::signal_t {};	
public:
	another_agent(so_5::agent_context_t ctx, so_5::mbox_t input)
		: agent_t(std::move(ctx)), m_input(std::move(input))
	{
	}

	void so_define_agent() override
	{
		m_buffer = create_chain(so_environment());
		
		so_subscribe(m_input).event([this](so_5::mhood_t<cv::Mat> m) {			
			so_5::send(m_buffer, m);
			if (m_buffer->size() == 1)
			{
				so_5::send<process_one_buffered_message>(*this);
			}
		}).event([this](so_5::mhood_t<other_type> m
			so_5::send(m_buffer, m);
			if (m_buffer->size() == 1)
			{
				so_5::send<process_one_buffered_message>(*this);
			}
		});		
		
		so_subscribe_self().event([this](so_5::mhood_t<process_one_buffered_message>) {
			receive(from(m_buffer).handle_n(1).no_wait_on_empty(), [](const cv::Mat& mat) {
				// ... process cv::Mat ...
			}, [](const other_type& o) {
				// ... process other_type ...
			});

			if (!m_buffer->empty())
			{
				so_5::send<do_work>(*this);
			}
		});
	}

private:
	so_5::mbox_t m_input;
	so_5::mchain_t m_buffer;	
};

However, you should have spotted some code duplication here above. In particular, we’ll potentially have several of these:

if (m_buffer->size() == 1)
{
	so_5::send<process_one_buffered_message>(*this);
}

SObjectizer provides an idiomatic way to mitigate this issue: non-empty notificators. This customization point allows to specify a function which will be called automatically every time the chain transitions from the “empty” to the “non-empty” state. This is precisely the place to send the process_one_buffered_message signal.

First of all, we create the chain in a slightly different way:

m_buffer = so_environment().create_mchain(so_5::make_unlimited_mchain_params().not_empty_notificator([this] {	
	so_5::send<process_one_buffered_message>(*this);
}));

We create the chain from the environment by using create_mchain(). This function requires “chain parameters” (of type mchain_params_t) that are just settings to create the chain. SObjectizer provides a few functions to create frequently used options such as make_unlimited_mchain_params() that corresponds to the settings we were using before, with so_5::create_mchain(). Finally, we call not_empty_notificator() to specify the function to call, as described before.

The first part of so_define_agent() simplifies to:

m_buffer = so_environment().create_mchain(so_5::make_unlimited_mchain_params().not_empty_notificator([this] {	
	so_5::send<process_one_buffered_message>(*this);
}));

so_subscribe(m_input).event([this](so_5::mhood_t<cv::Mat> m) {			
	so_5::send(m_buffer, m);			
}).event([this](so_5::mhood_t<other_type> m
	so_5::send(m_buffer, m);			
});

However, there’s an additional small step that will streamline things even more: message sink bindings. Indeed, there is no need to explicitly subscribe and reroute messages and signals to the message chain when we can simply declare bindings.

We add a multi_sink_binding_t as a member variable of the agent, and we declare the requested bindings (in case the agent needed only one binding, we would use single_sink_binding_t). The implementation boils down to:

void so_define_agent() override
{
	m_buffer = so_environment().create_mchain(so_5::make_unlimited_mchain_params().not_empty_notificator([this] {	
		so_5::send<process_one_buffered_message>(*this);
	}));
	
	m_binding.bind<cv::Mat>(m_input, wrap_to_msink(m_buffer->as_mbox()));
	m_binding.bind<other_type>(m_input, wrap_to_msink(m_buffer->as_mbox()));
	
	so_subscribe_self().event([this](so_5::mhood_t<process_one_buffered_message>) {
		// ... as before
	});
}

A convenient design aspect is that message holders are totally hidden. Everything is efficiently managed by SObjectizer in the most effective manner possible and we don’t even need to know such details.

An essential detail: the “not empty notificator” function is executed at the sender’s site (similar to delivery filters). You might wonder what this means when bindings are involved. The execution context of any binding is the sender of the input of that binding. In our scenario, it’s the image producer. Hence, the not-empty notificator is executed within the context of the image producer. Practically speaking, bindings are efficient since they correspond to the quick transfer of a message to another message queue – an operation handled swiftly in SObjectizer. Conversely, the not-empty notificator is under our control. Similar to delivery filters, it’s crucial for any not-empty notificator to be blazing fast. In our case, it’s just a sending of a message, an operation efficiently managed by SObjectizer. So we are on the safe side.

As an example of this approach, let’s reimplement face_detector:

class face_detector final : public so_5::agent_t
{
	struct process_one_buffered_image  final : so_5::signal_t {};
public:
	face_detector(so_5::agent_context_t ctx, so_5::mbox_t input, so_5::mbox_t output)
		: agent_t(std::move(ctx)), m_input(std::move(input)), m_output(std::move(output))
	{
	}

	void so_evt_start() override
	{
		if (!m_classifier.load("haarcascade_frontalface_default.xml"))
		{
			throw std::runtime_error("Can't load face detector classifier 'haarcascade_frontalface_default.xml'");
		}
	}

	void so_define_agent() override
	{
		m_buffer = so_environment().create_mchain(so_5::make_unlimited_mchain_params().not_empty_notificator([this] {			
			so_5::send<process_one_buffered_image >(*this);
		}));
		m_binding.bind<cv::Mat>(m_input, wrap_to_msink(m_buffer->as_mbox()));

		so_subscribe_self().event([this](so_5::mhood_t<process_one_buffered_image>) {
			receive(from(m_buffer).handle_n(1).no_wait_on_empty(), [this](const cv::Mat& src) {
				cv::Mat gray;
				cvtColor(src, gray, cv::COLOR_BGR2GRAY);
				std::vector<cv::Rect> faces;
				m_classifier.detectMultiScale(gray, faces, 1.1, 4);
				auto cloned = src.clone();
				for (const auto& [x, y, w, h] : faces)
				{
					rectangle(cloned, { x, y }, { x + w, y + h }, { 255, 0, 0 }, 2);
				}
				so_5::send<cv::Mat>(m_output, std::move(cloned));
			});
						
			if (!m_buffer->empty())
			{
				so_5::send<process_one_buffered_image >(*this);
			}
		});
	}

private:
	so_5::mbox_t m_input;
	so_5::mbox_t m_output;
	so_5::mchain_t m_buffer;
	so_5::single_sink_binding_t m_binding;
	cv::CascadeClassifier m_classifier;
};

Thoughts on selecting the approach

The choice of which approach to use could be a stylistic decision in certain scenarios, while in others, it might be driven by specific reasons. Some include, yet are not restricted to:

  • when the agent functions as a worker and naturally receives data via a message chain, it’s reasonable to consider implementing the first approach outlined (e.g. image_save_worker);
  • when ordering data is important, the second approach (even with another container such as priority_queue) can be convenient;
  • when the agent works with several message and signal types, the second approach with a message chain should offer sufficient flexibility;
  • If the agent isn’t solely engaged in a single “slow” operation, structuring it in the second manner feels more organic, allowing it to subscribe to multiple sources using a more natural syntax.

By the way, there exist even more sophisticated techniques or wisely designed agents where the aspect of interrupting work is inherently integrated. For instance, in fully-fledged task managers or job distributors. On the other hand, our down-to-earth methods are practical and suitable for most scenarios.

These approaches and their variations are attempts to make shutdown faster when pending work is causing delays in the termination of agents. This typically occurs when agents operate slower than the input rate. Hopefully this condition is temporary or, for any reason, the buffer size stays within certain limits. However, in scenarios where this discrepancy is constant and the input stream is extensive, buffering might lead the program to reach its memory limits. In this case, the proposed methods can’t help as is.

In the upcoming post, we’ll delve further into this aspect.

Takeaway

In this episode we have learned:

  • making an agent drop pending messages, once received, is not as easy as one might think;
  • closing a message chain with close_drop_content() allows to drop any pending messages still in the queue;
  • message holders allow us to participate in the ownership of messages delivered within SObjectizer. We get a message holder from a mhood_t by calling make_holder();
  • message holders do not support signals;
  • handling a message using mhood_t<T> is preferred when we need to take ownership of the underlying object, when we handle signals, or when we want message handlers to work both on messages and signals. In other cases, const T& is a more natural choice;
  • message chains support a non-empty notificator callback that will be invoked every time the chain transitions from the “empty” to the “non-empty” state;
  • non-empty notificators are executed on the sender’s site (aka: they should be blazing fast).

As usual, calico is updated and tagged.

What’s next?

After addressing our potentially slow agents, we do some pair programming with Gerri to apply a similar strategy to his image processor. It seems that Gerri hasn’t encountered any “out of memory” problems, considering his usage is consistently limited to running for a few minutes. Moreover, he’s currently working on a private branch and, considering his habits, he intends to optimize his agent to mitigate any prospective performance problems!

Meanwhile, Paolo, our lead architect, is reaching out for discussing a potential out of memory problem that he brought up during the last stand-up meeting. Essentially, he created a new image producer that sends an extensive number of frames, placing considerable pressure on the face_detector:

In the upcoming post, we’ll delve into another crucial aspect of message passing-styled applications, which pertains to protecting against excessively intensive message flow.


Thanks to Yauheni Akhotnikau for having reviewed this post.

Leave a comment