SObjectizer Tales – 2. Can you call me back?

Posted: October 19, 2023 in SObjectizer Tales
Tags: , ,

Last time we left with a new requirement:

A camera vendor has just released a new SDK that works through callbacks. Basically, the user sets a function that will get called by the SDK on every new available frame. How can we integrate this into a producer agent?

As usual, this post is also available on dev.to.

If you have some experience with camera drivers, you know that SDKs usually work in two ways:

  • polling/pull model: to retrieve the next available frame, we must explicitly call a GetNextFrame-like function;
  • callback-based/push model: every time a new frame is available, the SDK calls a function we have set in advance.

In our simple scenario where we just continuously extract frames from the camera, they are interchangeable, at least from a functional point of view. However, each has its own peculiarities that might be more suitable than the other in some scenarios. Indeed, some SDKs support both the flavors, others only one. We have already covered the pull model in the previous post:

class image_producer final : public so_5::agent_t
{
public:
	image_producer(so_5::agent_context_t ctx, so_5::mbox_t channel, std::stop_token st)
		: agent_t(std::move(ctx)), m_channel(std::move(channel)), m_stop(std::move(st))
	{
	}

	void so_evt_start() override
	{
		cv::VideoCapture cap(0);
		if (!cap.isOpened())
		{
			throw std::runtime_error("Can't connect to the webcam");
		}
		cv::Mat image;
		while (!m_stop.stop_requested())
		{
			cap >> image; // we pull here the next frame
			so_5::send<cv::Mat>(m_channel, std::move(image));
		}
	}

private:
	so_5::mbox_t m_channel;
	std::stop_token m_stop;
};

In this post, we are going to cover the push model, but unfortunately OpenCV’s VideoCapture does not support a callback-based API. How can we do? Well, we can emulate it by resorting to the classical reader thread approach:

class observable_videocapture
{
public:
	observable_videocapture()
		: m_capture(0, cv::CAP_DSHOW)
	{		
	}

	void on_next_image(std::stop_token st, auto on_next)
	{
        if (!m_capture.isOpened())
		{
			throw std::runtime_error("Can't connect to the webcam");
		}
		
		// reader thread
		m_worker = std::jthread{ [this, st, f = std::move(on_next)] {
			cv::Mat image;
			while (!st.stop_requested())
			{
				m_capture >> image;				
				f(std::move(image)); // callback
			}
		} };
	}
private:
	cv::VideoCapture m_capture;
	std::jthread m_worker;
};

observable_videocapture is a minimal callback-based replacement for VideoCapture. Using this into an agent is smooth as glass:

class image_producer_callback final : public so_5::agent_t
{
public:
	image_producer_callback(so_5::agent_context_t ctx, so_5::mbox_t channel, std::stop_token st)
		: agent_t(std::move(ctx)), m_channel(std::move(channel)), m_stop(std::move(st))
	{
	}

	void so_evt_start() override
	{
		m_device.on_next_image(m_stop, [this](cv::Mat image) {
			so_5::send<cv::Mat>(m_channel, std::move(image));
		});
	}
private:
	so_5::mbox_t m_channel;
	observable_videocapture m_device;
	std::stop_token m_stop;
};

on_next_image does not engage image_producer_callback in a tight loop, instead it returns immediately after spawning an internal worker thread. If the webcam can’t be open, on_next_image throws (and so does so_evt_start, as before). In case you are wondering, VideoCapture manages a configurable buffer of frames that prevents frame loss in case the caller is not fast enough (it’s a common practice).

An interesting observation: image_producer_callback can be slightly changed in order to automatically stop the acquisition on shutdown. Indeed, as we have seen in the previous episode, agents can execute some code before stopping by overriding so_evt_finish() that is a special event handler installed on our behalf by SObjectizer. We can simply keep a stop_source whose stop_token is passed to on_next_image and call request_stop() from that is the shutdown handler:

class image_producer_callback final : public so_5::agent_t
{
public:
	image_producer_callback(so_5::agent_context_t ctx, so_5::mbox_t channel)
		: agent_t(std::move(ctx)), m_channel(std::move(channel))
	{
	}

	void so_evt_start() override
	{
		m_device.on_next_image(m_stop_source.get_token(), [this](cv::Mat image) {
			so_5::send<cv::Mat>(m_channel, std::move(image));
		});
	}
	
	void so_evt_finish() override
	{
		m_stop_source.request_stop();
	}
private:
	so_5::mbox_t m_channel;
	observable_videocapture m_device;
	std::stop_source m_stop_source;
};

This is a minimal implementation and some details are missing. Bear in mind that observable_videocapture is conceptually similar to some real camera SDKs. For example, Spinnaker SDK spawns an image event processing thread that polls an internal GetNextImage-like function, and calls the supplied callback(s) on other threads (using boost::thread_group).

Anyway, our task was not to implement observable_videocapture, instead it was to provide an effective way to use a callback-based API from an agent. observable_videocapture is just a fake implementation that allows us to emulate this case.

Thus, we end up with two scenarios:

  • if we need/want to use polling (e.g. VideoCapture) then we use image_producer;
  • if we need/want to use callbacks (e.g. observable_videocapture) then we use image_producer_callback.

Possibly, some more code is needed to use a real SDK. For example, it might be required to configure the camera or to call some initialization functions. Such details can be hidden into so_define_agent or so_evt_start.

This new agent is a drop-in replacement for image_producer, so we can add it to the cooperation as before:

sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
	c.make_agent<image_producer_callback>(main_channel, ctrl_c);
    // ... other agents
}

The task has been accomplished, yet it’s interesting to make a small step further that is preparatory to the next post.

The tight loop problem

image_producer_callback has an advantage compared to image_producer: it is not stuck in a tight loop, instead it can handle other messages while the camera is streaming. After all, image_producer is not a “pure” agent but it’s more of an execution thread. Using agent_t here is not superior to jthread.

The question is: how can image_producer handle other requests in between? It’s reasonable to assume that both image_producer and image_producer_callback are the only classes that are coupled with the device class. Suppose, just for a moment, that we use another more realistic device class – let’s call it SomeRealLifeDevice – that provides a function GetBatteryLevel() that returns the amount of available battery. Maybe, the application needs to call this function from time to time.

image_producer_callback could simply subscribe to some message box and invoke that function when a certain message is received:

class image_producer_callback final : public so_5::agent_t
{	
	// ...	
	void so_define_agent() override
	{
		so_subscribe(..somewhere..).event([this](const PrintBatteryLevel&){
			std::cout << m_device.GetBatteryLevel();
		});
	}
	//...
private:
	so_5::mbox_t m_channel;
	SomeRealLifeDevice m_device;
	std::stop_token m_stop;
};

Here above, we assume that SomeRealLifeDevice allows calling GetBatteryLevel() while the device is grabbing frames. We can assume, also, that SomeRealLifeDevice supports both callbacks and polling. These are both fair assumptions, considering how real SDKs often work.

What about image_producer?

Well, there exist more than one solution to this problem, but one consists in introducing an extra level of indirection. After all, which computer problems can’t be solved this way?! Since image_producer can’t really react to messages then another agent will do it on its behalf. Superior to a thread-based solution, image_producer and this broker can be tied in a special way, a sort of “relationship by birth”.

Suppose we use this SomeRealLifeDevice into image_producer:

class image_producer final : public so_5::agent_t
{
public:
	image_producer(so_5::agent_context_t ctx, so_5::mbox_t channel, std::stop_token st)
		: agent_t(std::move(ctx)), m_channel(std::move(channel)), m_stop(std::move(st))
	{
	}

	void so_evt_start() override
	{	
		while (!m_stop.stop_requested())
		{		
			so_5::send<cv::Mat>(m_channel, m_device.GetNextImage());
		}
	}

	void print_battery_level()
	{
		std::osyncstream(std::cout) << m_device.GetBatteryLevel() << "\n";
	}

private:
	so_5::mbox_t m_channel;
	std::stop_token m_stop;
	SomeRealLifeDevice m_device;
};

Naively, we might think that print_battery_level can be called this way:

image_producer* ptr = nullptr;
sobjectizer.environment().introduce_coop(dispatcher.binder(), [&](so_5::coop_t& c) {
	ptr = c.make_agent<image_producer>(main_channel, ctrl_c);
    // ... other agents
}
// even from another thread
ptr->print_battery_level();

However, this is a bad idea because SObjectizer manages the lifetime of agents in an unspecified way, therefore ptr can be dangling anytime.

Instead, SObjectizer provides one guarantee regarding agent pointers: an agent can safely store and use pointers to other agents as long as they belong to its parent cooperation. This introduces cooperation parent-child relationships: a cooperation can be created as a child of another cooperation. In this case, SObjectizer guarantees that all child cooperations will be deregistered and destroyed before their parent cooperation. This means, agents of the parent cooperation will be destroyed after agents of the child cooperation.

Thus, image_producer can introduce a child cooperation holding a sort of broker agent that calls print_battery_level when a certain message arrives (somewhere that is not important now).

class image_producer final : public so_5::agent_t
{
	class image_producer_broker : public agent_t
	{
	public:
		image_producer_broker(so_5::agent_context_t c, image_producer* parent)
			: agent_t(std::move(c)), m_parent(parent)
		{
		}

		void so_define_agent() override
		{
			so_subscribe(...somewhere...).event([this](const PrintBatteryLevel&) {
				m_parent->print_battery_level();
			});
		}

	private:
		image_producer* m_parent;
	};
public:
	image_producer(so_5::agent_context_t ctx, so_5::mbox_t channel, std::stop_token st)
		: agent_t(std::move(ctx)), m_channel(std::move(channel)), m_stop(std::move(st))
	{
	}

	void so_evt_start() override
	{
        // create an image_producer_broker inside a new child cooperation
		introduce_child_coop(*this, so_5::disp::active_obj::make_dispatcher(so_environment()).binder(), [this](so_5::coop_t& c) {
			c.make_agent<image_producer_broker>(this);
		});

		while (!m_stop.stop_requested())
		{		
			so_5::send<cv::Mat>(m_channel, m_device.GetNextImage());
		}
	}

private:
	so_5::mbox_t m_channel;
	std::stop_token m_stop;
	SomeRealLifeDevice m_device;

	void print_battery_level()
	{
		std::osyncstream(std::cout) << m_device.GetBatteryLevel() << "\n";
	}
};

A few details here: there exist three ways to introduce child cooperations. Here we used the handy introduce_child_coop:

  • the first parameter may be either the parent cooperation or an agent of the parent cooperation;
  • the second parameter is the binder to use;
  • the third parameter is the usual lambda that fills the cooperation with agents;

Note that the second parameter can be omitted (e.g. introduce_child_coop(*this, lambda)) and in that case the default dispatcher will be used. In our case, we want to be sure the broker handlers will be executed in a dedicated thread and then we simply used an active_obj.

The only difference with introduce_coop is the parent-child relationship.

Semantically, it makes sense that image_producer_broker dies before image_producer, otherwise it would be possible that it receives a message when image_producer is dead.

Cooperation parent-child relationships are useful in other scenarios and we’ll use them again in the future. Just for completeness, consider that we can take advantage of the parent-child relationship between image_producer and its child to handle the producer stop without injecting a stop_token:

class image_producer final : public so_5::agent_t
{
	class image_producer_broker : public agent_t
	{
	public:
		image_producer_broker(so_5::agent_context_t c, image_producer* parent, std::stop_source stop_source)
			: agent_t(std::move(c)), m_parent(parent), m_stop_source(std::move(stop_source))
		{
		}

		void so_define_agent() override
		{
			so_subscribe(...somewhere...).event([this](const PrintBatteryLevel&) {
				m_parent->print_battery_level();
			});
		}

		void so_evt_finish() override
		{
			m_stop_source.request_stop();
		}
		
	private:
		image_producer* m_parent;
		std::stop_source m_stop_source;
	};
public:
	image_producer(so_5::agent_context_t ctx, so_5::mbox_t channel)
		: agent_t(std::move(ctx)), m_channel(std::move(channel))
	{
	}

	void so_evt_start() override
	{
        // create an image_producer_broker inside a new child cooperation
		introduce_child_coop(*this, so_5::disp::active_obj::make_dispatcher(so_environment()).binder(), [this](so_5::coop_t& c) {
			c.make_agent<image_producer_broker>(this, m_stop_source);
		});
		
		const auto st = m_stop_source.get_token();
		while (!st.stop_requested())
		{		
			so_5::send<cv::Mat>(m_channel, m_device.GetNextImage());
		}
	}

private:
	so_5::mbox_t m_channel;
	std::stop_source m_stop_source;
	SomeRealLifeDevice m_device;

	void print_battery_level()
	{
		std::osyncstream(std::cout) << m_device.GetBatteryLevel() << "\n";
	}
};

Indeed, image_producer_broker::so_evt_finish() will be called strictly before finishing image_producer.

Even though this solution is not encouraged because it implies state sharing that is not consistent with the actor model, this pattern can make some sense to glue SObjectizer code with other components that work with a totally different paradigm. By the way, it’s essential that such borderline cases are well isolated.

An alternative solution that works with message passing only will be discussed in the next post.

Takeaway

In this episode we have learned:

  • camera SDKs often support a callback-based manner of grabbing frames;
  • agents are functionally easier to use together with callback-based SDKs;
  • cooperations parent-child relationships: we can introduce a cooperation that is child of another one, with the guarantee that children cooperations are destroyed before the parent;
  • keeping a pointer to an agent created with make_agent (or such) is a bad idea;
  • state sharing is not in line with the actor model paradigm but sometimes is difficult to avoid.

As usual, calico is updated and tagged.

What’s next?

In the next post we’ll rework image_producer in order to use message passing style only.


Thanks to Yauheni Akhotnikau for having reviewed this post.

Leave a comment