SObjectizer Tales – 13. Routing images across the network

Posted: January 4, 2024 in SObjectizer Tales
Tags: , ,

In our last installment, we were on the cusp of something big. We had just cracked the code on receiving commands from the network, in our mission to develop a simplified gRPC service. Today, we’re ready to build upon that foundation and, as we pick up where we left off, let’s explore how to send images across the network, taking our journey to the next level.

At the coffee machine, waiting for that much-needed caffeine boost, Giulia, a colleague from another department, strikes up a conversation: “I’ve created a C# program for drone camera image analysis. Currently, it operates in offline mode, processing images in batches. However, I’m eager to enhance it for real-time image acquisition and analysis as the drone’s data stream comes in. Is it possible to leverage calico to achieve this goal?”

Now that we’ve made calico remotely accessible in the network, we can extend our usage of gRPC by introducing an additional method that Giulia can invoke from her program. In essence, we require a mechanism to facilitate the routing of the image stream to external clients. This way, Giulia can receive each image and execute the processing logic on her end. Ideally, Giulia should be able to seamlessly incorporate this new “data source” into her existing program.

First of all, we add another method to the proto file:

syntax = "proto3";

service calico_service {    
	rpc send_command(send_command_request) returns (send_command_response);
	rpc subscribe(subscribe_request) returns (stream subscribe_response)
}

message send_command_request {
    string channel_name = 1;
	string command_name = 2;
}

message send_command_response {    
}

message subscribe_request {
    repeated string channels = 1;
}

message subscribe_response {
    string channel_name = 1;
	image image = 2;	
}

message metadata {
	string name = 1;
	string value = 2;
}

message image {	
    bytes data = 1;
	repeated metadata metadata = 2;
}

The newly added method is quite straightforward: it accepts a list of channel names that the client wishes to receive and returns a continuous “stream” of images. Essentially, the server maintains an ongoing dialogue with the client, delivering images as they arrive. Each response message includes both the channel name to which the image belongs and the image itself. The image is comprised of two components: the compressed frame’s byte data and a set of metadata pairs. While the metadata is not of primary significance, it proves useful for exchanging additional information without being constrained to a specific message format. This way, we have room to include any information, such as the image size, the format, or even domain-specific data.

Adding this method leads to this high-level design:


In some way, the entry point for our gRPC service must possess the capability to subscribe to any channel and relay data to the connected clients. Given that only agents can subscribe to channels, this implies the requirement for a dedicated agent. The idea consists in spawning one such agent for each incoming client and delegating the task to be managed by that agent:

While this design is not the sole option, and it comes with its own set of advantages and disadvantages, it provides us with the opportunity to delve into some additional features of SObjectizer.

Let’s focus on designing that agent. First of all, it needs to subscribe to the requested channels on the client’s behalf. It can be accomplished by turning all the channel names into the corresponding message boxes and injecting them into the agent’s constructor. Recalling from the previous post that the entry point of the service has a reference to the environment, accomplishing this task is relatively easy:

vector<so_5::mbox_t> get_channels_from(so_5::environment_t& env, const subscribe_request& request)
{
	vector<so_5::mbox_t> channels(request.channels_size());
	ranges::transform(request.channels(), begin(channels), [&](const auto& name) {
		return env.create_mbox(name);
	});
	return channels;
}

Note that if we had C++23 available, we would write this as follows:

vector<so_5::mbox_t> get_channels_from(so_5::environment_t& env, const subscribe_request& request)
{
	return request.channels() | views::transform([&](const auto& name) {
		return env.create_mbox(name);
	}) | ranges::to<vector>;
}

The second aspect to consider is grasping the mechanics of gRPC streaming from the server to the client. In essence, we obtain a “writer” object that enables us to send data to the client when it becomes available. It provides a mechanism for sending data incrementally and continuously from the server to the client, making it suitable for scenarios involving real-time or continuous data transmission. This object needs to be delivered to the agent responsible for transmitting data to the client (either as pointer or reference). Consequently, the service function requires a grpc::ServerWriter* as an input parameter.

At this point, we are ready to draft a first implementation of the agent:

class subscribe_client_agent : public so_5::agent_t
{
public:
	subscribe_client_agent(so_5::agent_context_t ctx, grpc::ServerWriter<subscribe_response>& writer, std::vector<so_5::mbox_t> channels)
		: agent_t(std::move(ctx)), m_writer(writer), m_channels(std::move(channels))
	{
	}

	void so_define_agent() override
	{
		for (const auto& channel : m_channels)
		{
			so_subscribe(channel).event([chan_name = channel->query_name(), this](const cv::Mat& image) {				
				subscribe_response response;
				response.set_channel_name(chan_name);
				// fill the response with data ...	
				m_writer.Write(response);				
			});
		}
	}
	
private:	
	grpc::ServerWriter<subscribe_response>& m_writer;
	std::vector<so_5::mbox_t> m_channels;
};

Essentially, the agent handles all the subscriptions on behalf of the client, and for each received message, it sends the corresponding image through the writer (this call blocks until the data is received on the client side since, as said, we employ the synchronous API). To complete this process, we simply need to convert the image into a sequence of bytes. Additionally, we’ve made the decision to compress the image into the JPEG format beforehand:

so_subscribe(channel).event([chan_name = channel->query_name(), this](const cv::Mat& image) {				
	subscribe_response response;
	response.set_channel_name(chan_name);
	std::vector<uchar> raw_data;
	imencode(".jpg", image, raw_data, { cv::IMWRITE_JPEG_QUALITY, 95 });
	response.mutable_image()->set_data(raw_data.data(), raw_data.size());
	m_writer.Write(response);				
});

We use imencode to compress the image. This OpenCV’s function writes the output to a vector of bytes, represented by uchars. Finally, we send the response back to the client. We don’t add any metadata for the moment.

At this point, we need to spawn one of such agents for every “subscribe” call that is directed to the service. For this purpose, we can simply introduce a new cooperation holding an agent instance:

grpc::Status subscribe(grpc::ServerContext* context, const subscribe_request* request, grpc::ServerWriter<subscribe_response>* writer)
{
	m_params.environment.introduce_coop([&, this](so_5::coop_t& coop) {
		coop.make_agent<subscribe_client_agent>(*writer, get_channels(m_params.environment, *request));
	});
	return grpc::Status::OK; // ???
}

Nonetheless, the code above is not correct. In fact, subscribe must wait until the stream is completed. To put it differently, we must ensure that the client remains in a blocked state as long as the agent is active. How can we achieve this? There are several potential approaches, one of which involves obtaining a future from the agent that will be marked as complete when the task is finished. This mechanism allows the agent to signal to the RPC method when the whole stream operation is finished. It provides a straightforward and synchronized way for the agent and RPC method to exchange information and coordinate their actions:

grpc::Status subscribe(grpc::ServerContext* context, const subscribe_request* request, grpc::ServerWriter<subscribe_response>* writer)
{
	std::future<grpc::Status> ret;
	m_params.environment.introduce_coop([&, this](so_5::coop_t& coop) {
		ret = coop.make_agent<subscribe_client_agent>(*writer, get_channels(m_params.environment, *request))->get_status_future();
	});
	return ret.get();
}

SObjectizer allows us to rewrite this as follows:

grpc::Status subscribe([[maybe_unused]] grpc::ServerContext* context, const subscribe_request* request, grpc::ServerWriter<subscribe_response>* writer)
{
	return m_params.environment.introduce_coop([&, this](so_5::coop_t& coop) {
		return coop.make_agent<subscribe_client_agent>(*writer, get_channels(m_params.environment, *request))->get_status_future();
	}).get();
}

Therefore, we add two functions and a member variable (the promise) to the agent:

class subscribe_client_agent : public so_5::agent_t
{
public:
	subscribe_client_agent(so_5::agent_context_t ctx, grpc::ServerWriter<subscribe_response>& writer, std::vector<so_5::mbox_t> channels)
		: agent_t(std::move(ctx)), m_writer(writer), m_channels(std::move(channels))
	{
	}
	
	void so_define_agent() override
	{
		for (const auto& channel : m_channels)
		{
			so_subscribe(channel).event([chan_name = channel->query_name(), this](const cv::Mat& image) {				
				subscribe_response response;
				response.set_channel_name(chan_name);
				std::vector<uchar> raw_data;
				imencode(".jpg", image, raw_data, { cv::IMWRITE_JPEG_QUALITY, 95 });
				response.mutable_image()->set_image_data(raw_data.data(), raw_data.size());
				m_writer.Write(response);				
			});
		}
	}

	std::future<grpc::Status> get_status_future()
	{
		return m_status_promise.get_future();
	}

	void so_evt_finish() override
	{
		m_status_promise.set_value(grpc::Status::OK);
	}

private:
	std::promise<grpc::Status> m_status_promise;
	grpc::ServerWriter<subscribe_response>& m_writer;
	std::vector<so_5::mbox_t> m_channels;
};

To test the service, first we launch calico:

int main()
{
    const auto ctrl_c = get_ctrlc_token();
 
    const so_5::wrapped_env_t sobjectizer;
    const auto main_channel = sobjectizer.environment().create_mbox("main");
    const auto commands_channel = sobjectizer.environment().create_mbox("commands");
    const auto message_queue = create_mchain(sobjectizer.environment());
 
    sobjectizer.environment().introduce_coop(so_5::disp::active_obj::make_dispatcher(sobjectizer.environment()).binder(), [&](so_5::coop_t& c) {
        c.make_agent<image_producer_recursive>(main_channel, commands_channel);
        c.make_agent<maint_gui::remote_control>(commands_channel, message_queue);
        c.make_agent<service_facade>();
    });
 
    do_gui_message_loop(ctrl_c, message_queue, sobjectizer.environment().create_mbox(constants::waitkey_channel_name));
}

then we craft a simple program that invokes the new gRPC method, subscribing to the “main” channel:

int main()
{
	const auto stub = calico_service::NewStub(CreateChannel("0.0.0.0:50051", grpc::InsecureChannelCredentials()));

	grpc::ClientContext ctx;
	subscribe_request request;
	request.add_channels("main");
	subscribe_response response;
	const auto subscription = stub->subscribe(&ctx, request);
	while (subscription->Read(&response))
	{
		const auto bytes = response.mutable_image()->mutable_data();
		const auto frame = imdecode(cv::Mat(1, static_cast<int>(bytes->size()), CV_8UC1, bytes->data()), cv::IMREAD_UNCHANGED);
		imshow(response.channel_name(), frame);
		cv::waitKey(1);
	}	
}

This new program connects to calico via gRPC at 0.0.0.0:50051 and invokes subscribe on the channel called “main”. It should display the stream of images. Remember to start the acquisition (either “manually” or still via the service) on calico.

A little step forward

Although the service does its job, before releasing a new version of calico and sending it to Giulia for some beta testing, we reflect a bit on the overall design. The attentive reader has noticed that every subscribe_client_agent is bound to the default dispatcher, that is one_thread (all the agents are served by the very same worker thread). Is that a mistake? After all, the service works even when called from multiple clients.

To see superficially what this means, let’s add a log to the image handler:

class subscribe_client_agent : public so_5::agent_t
{

// ... as before	

	void so_define_agent() override
	{
		for (const auto& channel : m_channels)
		{
			so_subscribe(channel).event([chan_name = channel->query_name(), this](const cv::Mat& image) {				
				std::osyncstream(std::cout) << "handling image on thread " << std::this_thread::get_id() << "\n";
				// ... as before
			});
		}
	}
// ... as before

Then we launch again calico and a few receiver programs afterwards. When we start the acquisition, we should see a bunch of lines like these:

handling image on thread 1361
handling image on thread 1361
handling image on thread 1361
handling image on thread 1361
handling image on thread 1361
handling image on thread 1361
...

In essence, every message handler is being executed on the very same worker thread. This behavior is expected due to the usage of one_thread dispatcher.

The question at hand is whether we can improve our current situation. However, it’s challenging to provide a definitive answer without a clear understanding of our specific requirements. Let’s attempt to refine our focus.

First and foremost, considering Giulia’s specific use case, we expect only one, or perhaps at most two, clients connected to our service. Consequently, the system experiences minimal load. Typically, lower load translates to reduced communication between various components, which is advantageous. However, in a more general scenario, we may expect multiple clients concurrently connected to the service. In such a situation, several client agents could be spawned, all bound to the same worker thread. For instance, if there are N clients all interested in the “main” channel, while agent A serves client A, the other clients would be left waiting for their data. Simultaneous data retrieval becomes practically impossible in this setup.

The key operations carried out by a subscribe_client_agent are image compression (which heavily utilizes CPU resources) and network data transmission. When the agent thread becomes blocked during the network sending operation, the operating system may switch to another task. Consequently, if we were to bind each agent to a dedicated worker thread, there would be an opportunity for simultaneous data retrieval. However, this approach could lead to a proliferation of threads, potentially resulting in issues like increased context switching and CPU contention. These days, a “large” number of threads is around 500, though these figures can vary.

While acknowledging that the overall strategy of having “one agent per client” may need reconsideration, one approach to reduce the system’s load involves introducing a thread pool for all the subscribe_client_agent instances. This thread pooling approach helps manage the number of active threads more efficiently and can alleviate some of the challenges associated with high thread counts.

The good news is that introducing a thread pool is easy with SObjectizer and we don’t need to change so much of what we have done so far. First of all, we discover there is a thread_pool dispatcher that is designed precisely for this purpose. We can make one and use it to introduce each client agent:

class calico_service_impl : public calico_service::Service
{
public:
	explicit calico_service_impl(service_params params)
		: m_params(params), m_binder(so_5::disp::thread_pool::make_dispatcher(m_params.environment, 4).binder())
	{
	}
	
	grpc::Status subscribe(grpc::ServerContext* context, const subscribe_request* request, grpc::ServerWriter<subscribe_response>* writer) override
	{
		return m_params.environment.introduce_coop(m_binder, [&, this](so_5::coop_t& coop) {
			return coop.make_agent<subscribe_client_agent>(*writer, get_channels(m_params.environment, *request))->get_status_future();
		}).get();
	}
	
	grpc::Status send_command(grpc::ServerContext*, const send_command_request* request, send_command_response*) override
	{
		// ... as before
	}
private:
	service_params m_params;
	so_5::disp_binder_shptr_t m_binder;
};

The thread_pool dispatcher creates a pool with several worker threads. In this code above, we made one with 4 threads. An agent bound to that dispatcher can handle its events on any of the worker threads from that pool.

An important detail that can be initially confusing is that by default, all agents belonging to the same cooperation that are bound to the same thread pool don’t work on different threads simultaneously. This limitation is because, by default, they share the same event queue. For instance, let’s say we have agents A, B, and C on the same cooperation bound to the same thread pool with, let’s say, 4 threads, and they are all subscribed to a message box. When a message arrives at that message box, you might expect that A’s handler runs on one thread in parallel with B’s handler on another thread, which runs in parallel with C’s handler on yet another thread. However, this assumption is incorrect. If A receives the message first, both B and C must wait until A’s handler completes. Subsequently, if B receives the message, C has to wait for B’s handler to finish. It’s also important to note that there is no guarantee that the handlers for A, B, and C will be executed on different threads, as they are executed serially in this setup.

This behavior has two primary consequences:

  • there is no thread parallelism,
  • there is no need to synchronize access to shared resources among A, B and C.

While this approach is useful in some cases, it may not align with our specific scenario where we want to leverage thread parallelism more effectively. However, since we create subscribe_client_agent instances in their own “individual” cooperation, they won’t share the same event queue and can run in parallel.

Actually, this behavior is bound to a customization point of the thread_pool dispatcher that can be set when the binder is obtained. We can indicate the type of event queue that here is referred to as fifo. That type can be either cooperative (default) or individual. For example:

.binder(so_5::disp::thread_pool::bind_params_t{}.fifo(so_5::disp::thread_pool::fifo_t::individual))

As previously discussed, this parameter influences how a binder is obtained, and it allows each agent to have its own value for this fifo strategy. As said, in our specific case we don’t need to customize this at all because all agents belong to their own cooperation.

Restarting calico and a couple of instances of the receiver test program may result in an outcome that differs from the previous setup:

handling image on thread 1878
handling image on thread 1445
handling image on thread 1878
handling image on thread 1878
handling image on thread 1445
...

Indeed, given that the image compression and network transmission processes typically take only a few milliseconds, SObjectizer may opt to pick two threads from the pool to cater to the two distinct receiver clients.

After implementing this minor enhancement in calico, we release a beta version for Giulia to test. She appears satisfied with the API, and we dedicate some time to gather her feedback and make any necessary adjustments.

For the sake of completeness, it’s important to note that at this stage, the subscribe_client_agent‘s handler can be adjusted to optimize memory usage by avoiding the reallocation of the raw vector of compressed bytes every time a new image becomes available. The simplest way to accomplish this is to declare raw_bytes as member variable of subscribe_client_agent, which ensures that a separate instance of raw_bytes is maintained for each agent, reducing unnecessary reallocations. However, the effectiveness of this optimization should be carefully measured to ensure that it provides a noticeable improvement in performance vs memory usage. It’s essential to weigh the convenience of the change against its potential benefits in terms of resource optimization:

class subscribe_client_agent : public so_5::agent_t
{

// ... as before	

	void so_define_agent() override
	{
		for (const auto& channel : m_channels)
		{
			so_subscribe(channel).event([chan_name = channel->query_name(), this](const cv::Mat& image) {				
				subscribe_response response;
				response.set_channel_name(chan_name);			
				imencode(".jpg", image, m_raw_data, { cv::IMWRITE_JPEG_QUALITY, 95 });
				response.mutable_image()->set_data(m_raw_data.data(), m_raw_data.size());
				m_writer.Write(response);				
			});
		}
	}
private:
	std::promise<grpc::Status> m_status_promise;
	std::vector<uchar> m_raw_data;
	grpc::ServerWriter<subscribe_response>& m_writer;
	std::vector<so_5::mbox_t> m_channels;
};

Takeaway

In this episode we have learned:

  • grpc::ServerWriter is a generic abstraction used to stream data from the server to the client (it comes with gRPC methods returning stream messages);
  • one approach to enable communication from an agent to another component that needs to be informed when the agent has completed its task is to utilize the standard promise/future mechanism;
  • the thread_pool dispatcher keeps a pool with multiple worker threads and executes agent handlers on any of these threads;
  • by default, all agents belonging to the same cooperation and bound to the same thread pool cannot work on different threads simultaneously;
  • To exploit parallelism into the same cooperation, we must create a binder for a thread pool dispatcher that provides each agent with its own dedicated “individual” event queue with disp::thread_pool::fifo_t::individual.

What’s next?

While Giulia is testing the new version of our program, we’ve uncovered a significant issue in the implementation. In essence, when a client disconnects, we continue trying to send data to it, even though it’s no longer reachable.

What we need to do, instead, is actively monitor for write failures, and when they occur, we should promptly reclaim the agent, along with all its associated resources and subscriptions. The question is, how can we achieve this?

In the upcoming segment, we will explore the process of deregistering a running agent, addressing this critical aspect of our system’s functionality.


Thanks to Yauheni Akhotnikau for having reviewed this post.

Leave a comment