SObjectizer Tales – 9. Splitting work

Posted: December 7, 2023 in SObjectizer Tales
Tags: , ,

Last time we left with a new feature request: saving images to the hard drive. The specification is quite vague but we can imagine the user requires an agent that persists compressed frames (e.g. jpg) to disk.

OpenCV provides imwrite that compresses a specified image and writes the resulting bytes to a specified path. The compression is based on the filename extension (e.g. c:/saved/frame_1.jpg means jpg). Additional details are not important here, let’s assume to pass only image types that this function is able to handle.

A working implementation of that agent is here below:

class save_agent final : public so_5::agent_t
{
public:
	save_agent(so_5::agent_context_t ctx, so_5::mbox_t input, std::filesystem::path root_folder)
		: agent_t(std::move(ctx)), m_input(std::move(input)), m_root_folder(std::move(root_folder))
	{
	}
	
	void so_define_agent() override
	{
		so_subscribe(m_input).event([this](const cv::Mat& image) {
			imwrite((m_root_folder / std::format("image_{}.jpg", m_counter++)).string(), image);
		});
	}
	
	void so_evt_start() override
	{
		std::error_code ec;
		if (create_directories(m_root_folder, ec); ec)
		{
			throw std::runtime_error(std::format("save_agent can't create root folder: {}", ec.message()));
		}		
	}

private:
	so_5::mbox_t m_input;
	std::filesystem::path m_root_folder;
	int m_counter = 0;
};

Throwing an exception in case the directory tree can’t be created is just an option and other choices are possible, but this is not the point here.

Images are individually saved as they are received, following a typical pattern of executing a relatively resource-intensive operation for each message. Specifically, the imwrite function utilizes both CPU power for compression and the disk for storage. Most likely, employing this agent wouldn’t significantly impact the performance of your machine.

Here is an example of usage:

int main()
{
	auto ctrl_c = get_ctrl_c_token();

	const so_5::wrapped_env_t sobjectizer;
	auto main_channel = sobjectizer.environment().create_mbox("main");
	auto commands_channel = sobjectizer.environment().create_mbox("commands");
			
	auto dispatcher = so_5::disp::active_obj::make_dispatcher(sobjectizer.environment());
	sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
		c.make_agent<image_producer>(main_channel, commands_channel);
		c.make_agent<remote_control>(commands_channel);
		c.make_agent<responsive_image_viewer>(main_channel);
		c.make_agent<save_agent>(main_channel, "c:/images");
	}
		
	wait_for_stop(ctrl_c);
}

However, giving the agent too much work could quickly make performance worse. For example, if we enlarge the images significantly, such as adding an additional image_resizer with a scale factor greater than 2, it could lead to a substantial increase in memory consumption. This is because imwrite would require more time, consequently delaying the dequeuing of the subsequent image and causing noticeable message buffering.

As these operations are self-contained and do not rely on one another, there’s a chance to distribute these tasks among several workers, potentially harnessing the computational capabilities of our machine. For instance, while worker1 handles image1, worker2 can simultaneously process image2, and this sequence continues. Given that we assume CPU-dependent compression, processing multiple images concurrently might yield some advantages and efficiencies.

Bear in mind that this article is not about finding the best way to compress and save images to disk nor the ideal topology to distribute data. Rather, it serves as an introduction to demonstrate how SObjectizer can be utilized to divide tasks among multiple agents.

Can we have contention on message boxes?

This was the very first question I asked to SObjectizer folks back in 2021.

You have learned that message boxes (often called “channels” throughout the series) are carriers for delivering messages and signals to agents or, in general, to subscribers. Any messages dispatched to a message box will be distributed to all the subscribers connected to it.

However, when distributing work to do, we have a very different need: every message must be delivered to exactly one agent for processing. For example, if agent1 and agent2 take work from a certain buffer, only one of them must receive message1, only one of them receive message2, and so on. Maybe while agent1 is working on message1, agent2 is working on message2, and so on.

In general, assigning tasks to workers follows a sophisticated logic, they might have priorities, and things can be extremely complicated. After all, scheduling is one of the main topics in computer science.

Returning to our simple example, it’s clear that workers shouldn’t directly subscribe to message boxes as they would receive all the messages intended for that subscription. Hence, we can introduce a “scheduler” agent in between that subscribes to pertinent messages and allocates them to workers based on specific logic. For instance, the logic might be “send the next message to the first available agent”. In this arrangement, the scheduler needs to be aware of which workers are currently occupied.

While this is achievable, there’s no urgency to rush into the implementation right now. Fortunately, SObjectizer already offers a fundamental structure that facilitates this pattern in just a few lines of code. This concept is somewhat analogous to a message box and is known as a message chain.

Message chains share similarities with asynchronous queues and do not provide a subscription-based interface like message boxes. Similar to message boxes, chains accept data via the function so_5::send. However, unlike message boxes, chains cannot be directly subscribed to; instead, we need to explicitly use the so_5::receive function to extract data. The so_5::receive function can be invoked from an agent or any other part of the application, even outside of SObjectizer. Because of this flexibility, message chains were primarily introduced to facilitate data exchange between non-SObjectizer components. It’s feasible for multiple threads to concurrently retrieve data from the same chain.

The unique aspect of message chains lies in how messages are extracted from them: when a message is received and handled, it’s essentially extracted from the chain. This implies that if one thread extracts the next message and another thread attempts to receive from the chain immediately afterward, that specific message won’t be extracted again by the second thread. You’ve likely understood our point: message chains embody the contention pattern we require.

In practice, we can give workers the same chain and let them receive from it. When one extracts a message and starts working on it, the others will have a chance of extracting the next message.

Message chains are even more sophisticated than a message queue, and provide other outstanding features we won’t get into at this time. First of all, there are several overloads to create a chain but we can just use the following one for the moment:

auto chain = create_mchain(so_environment());

Differently from message boxes, message chains can be closed explicitly:

auto chain = create_mchain(so_environment());
// ...
// possibly on another thread
close_drop_content(so_5::terminate_if_throws, chain);

Closing a chain can either drop leftovers (close_drop_content) or not (close_retain_content). This choice has effects on receivers, as we’ll see in a moment. Ignore terminate_if_throws for now, it just means that if closing a chain throws an exception, the program will be terminated (like throwing from a destructor).

At this point, one or more threads can receive from the chain by using the receive/from/handle idiom:

receive(from(chain).handle_n(1), [](const cv::Mat& image){
	// ... handle one image
});

Here above, receive blocks the execution until either the chain is closed or exactly one message of type cv::Mat is received, extracted and handled. As you see, receive accepts a message handler that is a function object.

As you might expect, using either close_retain_content or close_drop_content has important effects on the queue if the receiver is still waiting for new messages. Both ultimately interrupt wait, however, before that, the former lets remaining messages to be handled first, the latter does not. To receive all the messages until a chain is closed we use handle_all():

receive(from(chain).handle_all(), [](const cv::Mat& image){
	// ... handle one image
});

In this case, receive does not return until the chain is closed.

What a worker looks like

At this point, we have enough information to sketch a save agent worker:

class image_save_worker final : public so_5::agent_t
{
public:
	image_save_worker(so_5::agent_context_t ctx, so_5::mchain_t input, std::filesystem::path root_folder)
		: agent_t(std::move(ctx)), m_input(std::move(input)), m_root_folder(std::move(root_folder))
	{
	}

	void so_evt_start() override
	{		
		receive(from(m_input).handle_all(), [this](const cv::Mat& image) {
			imwrite((m_root_folder / std::format("image_{}_{}.jpg", m_worker_id, m_counter++)).string(), image);
		});
	}

private:
	so_5::mchain_t m_input;
	std::filesystem::path m_root_folder;
	int m_counter = 0;
	static inline int global_id = 0;
	int m_worker_id = global_id++;
};

A few details:

  • mchain_t represents the message chain type (it’s actually a more abstract type);
  • m_worker_id is used to avoid generate unique image names across multiple agents;
  • as usual, we have not defended against specific errors (e.g. imwrite fails) to keep things simple;
  • the worker processes messages until the chain is closed, otherwise it never terminates.

The last point is essential: someone has to close the chain at the end of the program. We can do it after CTRL+C is detected.

Before getting to that, there is another question: who is sending images to the message chain? The question is on-point since we currently send images to the main channel only. Again, here we can leverage the power of message sinks. Before SObjectizer 5.8, the only way was to craft a proxy agent as discussed in a previous episode. Instead, we can now introduce a binding that takes images from the main channel and sends them to the chain:

auto binding = c.take_under_control(std::make_unique<so_5::single_sink_binding_t>());
binding->bind<cv::Mat>(resized, wrap_to_msink(chain->as_mbox()));

Actually, a similar use case motivated the introduction of message sinks and bindings. It was partially inspired by an issue I opened and discussed with Yauheni in 2021. Once more, it’s worth emphasizing that the SObjectizer community is eager and willing to assist as well as incorporate new features if necessary.

A relevant detail: message chains can be represented as message boxes through as_mbox() but, in this case, they can’t accept subscriptions (if we try adding a subscription, an exception will be thrown at runtime).

At this point, we have collected all the pieces of the puzzle (we resize the images beforehand to increase the chances to press the CPU):

int main()
{
	const auto ctrl_c = get_ctrlc_token();

	const wrapped_env_t sobjectizer;
	const auto main_channel = sobjectizer.environment().create_mbox("main");
	auto commands_channel = sobjectizer.environment().create_mbox("commands");
	auto save_buffer = create_mchain(sobjectizer.environment());
	sobjectizer.environment().introduce_coop(disp::active_obj::make_dispatcher(sobjectizer.environment()).binder(), [&](coop_t& c) {
		c.make_agent<image_producer_recursive>(main_channel, commands_channel);
		c.make_agent<remote_control>(commands_channel);
		c.make_agent<image_viewer_live>(main_channel);
		const auto resized = c.make_agent<image_resizer>(main_channel, 3.0)->output();

		const auto binding = c.take_under_control(make_unique<single_sink_binding_t>());
		binding->bind<cv::Mat>(resized, wrap_to_msink(save_buffer->as_mbox()));

		c.make_agent<image_save_worker>(save_buffer, "c:/images");
		c.make_agent<image_save_worker>(save_buffer, "c:/images");
	});

	wait_for_stop(ctrl_c);
	close_retain_content(terminate_if_throws, save_buffer);
}

Here is an picture of the situation:

In addition, SObjectizer provides a tiny RAII-wrapper to guarantee that chains are automatically closed at the end of the scope. It supports both the closing policies. For example:

int main()
{
	auto ctrl_c = get_ctrl_c_token();

	//... as before
	auto chain = create_mchain(sobjectizer.environment());
	auto closer = auto_close_retain_content(chain);
	
	// ... as before
		
	wait_for_stop(ctrl_c);	
} // chain closed automatically here

Have you noticed that we forgot one subtle detail in this implementation? We didn’t invoke create_directories…where is the best place to put this responsibility?

A little step forward

Often, data distribution is an implementation detail hidden underneath another entity. For instance, a “root” node might take on the responsibility of spawning, managing, and overseeing workers, potentially gathering their results at the conclusion. Similarly, the previously mentioned “scheduler” represents another instance of such an entity that manages the allocation of tasks.

In our scenario, things are a bit simpler but we can take the opportunity to introduce a sort of “saver agent” that creates the destination directory tree, hides data distribution, and spawns workers. In particular, it should be responsible for:

  • invoking create_directories;
  • making worker agents;
  • creating and closing the message chain;
  • creating the binding object that forwards data to the message chain.

Clearly, new agents have to be added to the cooperation. However, we can’t add new agents to “running cooperations” (it wouldn’t make sense because they are “transactional”, remember?). Thus, the workers need their own cooperation.

We might be tempted to create a child cooperation and this wouldn’t be a problem. However, we would miss an opportunity. Parent-child relationships add a constraint: parents won’t be destroyed until children have done. In our case, the children (workers) will be perpetually waiting for new images on the chain. Then it’s mandatory that someone else closes that chain when it’s time to terminate. Well, we can take advantage of coordinator’s so_evt_finish(), can’t we? Unfortunately no, because the parent-child relationship would make it impossible to call coordinator’s so_evt_finish() before the workers have done…Deadlock!

On the other hand, without any relationships, the coordinator would be destroyed independently of the workers at some point (strictly speaking, this is true only if they have a different worker thread – that is the case). Deal!

Here is the code:

class image_saver final : public so_5::agent_t
{
public:
	image_saver (so_5::agent_context_t ctx, so_5::mbox_t input, std::filesystem::path root_folder)
		: agent_t(std::move(ctx)), m_input(std::move(input)), m_root_folder(std::move(root_folder))
	{
		m_chain = create_mchain(so_environment());
	}

	void so_evt_start() override
	{
		std::error_code ec;
		if (create_directories(m_root_folder, ec); ec)
		{
			throw std::runtime_error(std::format("image_saver can't create root folder: {}", ec.message()));
		}
		
		so_environment().introduce_coop(so_5::disp::active_obj::make_dispatcher(so_environment()).binder(), [this](so_5::coop_t& c) {
			const auto binding = c.take_under_control(std::make_unique<so_5::single_sink_binding_t>());
			binding->bind<cv::Mat>(m_input, wrap_to_msink(m_chain->as_mbox()));
			c.make_agent<image_save_worker>(m_chain, m_root_folder);
			c.make_agent<image_save_worker>(m_chain, m_root_folder);
		});
	}

	void so_evt_finish() override
	{
		close_retain_content(so_5::terminate_if_throws, m_chain);
	}

private:
	so_5::mbox_t m_input;
	so_5::mchain_t m_chain;
	std::filesystem::path m_root_folder;
};

In calico, you will find that image_save_worker has been hidden as image_saver‘s implementation detail but, if needed, there is nothing wrong with keeping it public.

Then the whole initialization boils down to:

int main()
{
	const auto ctrl_c = get_ctrlc_token();

	const wrapped_env_t sobjectizer;
	const auto main_channel = sobjectizer.environment().create_mbox("main");
	auto commands_channel = sobjectizer.environment().create_mbox("commands");

	sobjectizer.environment().introduce_coop(disp::active_obj::make_dispatcher(sobjectizer.environment()).binder(), [&](coop_t& c) {
		c.make_agent<image_producer_recursive>(main_channel, commands_channel);
		c.make_agent<remote_control>(commands_channel);
		c.make_agent<image_viewer_live>(main_channel);
		const auto resized = c.make_agent<image_resizer>(main_channel, 3.0)->output();
		c.make_agent<image_saver>(resized, "c:/images");		
	});

	wait_for_stop(ctrl_c);	
}

As you can well imagine, the saver agent might be sophisticated at will. Workers can be bound to a thread pool dispatcher and the assignment of tasks can be quite complex. For now, let’s just settle for the general pattern that is at the foundations of splitting work among multiple agents.

Takeaway

In this episode we have learned:

  • message chains are advanced asynchronous queues;
  • message chains receives messages sent via the usual so_5::send;
  • message chains do not accept subscriptions, instead they can be received from any code – whether SObjectizer-based or not – via the receive/from/handle idiom;
  • message chains can be closed to interrupt receive, either dropping or retaining leftovers.

What’s next?

The customer is happy with the new feature and takes some time to test it properly. In the meantime, Ekt from our R&D department would like to use the program for some top secret projects…However, he ran into some issues with the image viewers because his backend forbids to invoke imshow & friends from other threads but main.

In the next post we propose a SObjectizer-based solution to this problem!


Thanks to Yauheni Akhotnikau for having reviewed this post.

Leave a comment