SObjectizer Tales – 12. Receiving commands from the network

Posted: December 28, 2023 in SObjectizer Tales
Tags: , ,

Welcome back to SObjectizer Tales! In this installment, we are going to introduce something huge: external communication. Ekt, a colleague from the R&D Department, submitted a new request: he needs to command the camera remotely. Ekt is going to install calico in different areas of the company and he’s working on a dashboard to control all such instances.

There exist several approaches to accommodate this request and we have decided to add a simple gRPC service to calico. This episode is just the first one of some others that will regard this topic. However, we won’t discuss gRPC in the series but we’ll just introduce a few things about that. In case you need to know more about gRPC, this is the official starting point. However, the integration of gRPC in calico should be easy enough for you to follow even without being exposed to it before.

gRPC is an open source framework – initially created by Google – for connecting programs over the network. It’s portable, efficient and supports a variety of languages. Basically, we declare a service interface similarly to declaring the methods of a class and then we obtain some pieces of code to integrate into our program to implement (server) or to call (client) such methods. By default, this workflow is powered by Protocol Buffers.

This repository provides some examples from a talk (in Italian) I gave in the past. For instance, this service here below:

syntax = "proto3";

service HelloService {
  rpc SayHello (HelloRequest) returns (HelloResponse);
}

message HelloRequest {
  string name = 1;
}

message HelloResponse {
  string message = 1;
}

exposes a method SayHello that takes a message HelloRequest containing a string and returns a message HelloResponse containing a string. In gRPC, every method has a single input message and a single output message. Then, we give this file to protoc (aka: Protocol Buffers Compiler) to generate code stubs that we include to our program, linking also gRPC core libraries.

From now on, calico depends on gRPC. In case you are pulling the latest version, ensure you make gRPC available and visible from CMake (e.g. vcpkg install grpc). If you are using calico-builder image, gRPC is already there.

A simple service for receiving commands

First of all, we need to think about how to accommodate this from a design point of view. As said, there exist several ways to do that. Just to mention a couple:

  1. calico hosts and implements the service. This means, other programs communicate with calico directly;
  2. calico does not implement the service, instead, it just communicates with another program that eventually exposes the service to others. In this case, there is another program in the middle, a sort of message broker.

For the moment, we go for the first way.

In addition to that, C++ gRPC comes in three flavors: synchronous, asynchronous, and (recently) callback. The first is simplified and manages threads automatically, however it’s “blocking” (meaning that, a thread waits until every rpc operation is completed). The second, instead, gives full control over threading, but it’s clearly more complicated. The callback API is still asynchronous, built on top of the reactor model. For now, we’ll go for the first one that is simpler.

Ekt’s request is clear enough for us to sketch this straightforward service definition:

syntax = "proto3";

service calico_service {    
	rpc send_command(send_command_request) returns (send_command_response);
}

message send_command_request {
    string channel_name = 1;
	string command_name = 2;
}

message send_command_response {    
}

Basically, send_command takes the information to deliver a certain command to a specific channel. Clearly, command_name should be mapped somehow to the corresponding so_5::signal_t. You might wonder why channel_name is part of the request instead of hiding such detail into the service. Well, it’s because things are a bit more flexible this way. For example, imagine we host two cameras and we want to command both through the same channel:

This way, send_command already covers the scenario above without any additional efforts.

Now it’s time to generate the service stubs. calico automatically produces the generated files during the build every time the proto file is changed by using CMake. Also, such files are automagically incorporated into the project. Anyway, doing this by hand instead is simply explained in the official tutorial.

Implementing the service

The implementation of the service should be straightforward, however there are a few details that gRPC requires us to deal with in order to work. Specifically, gRPC services are exposed through a server that can be reached at one or more endpoints. The server is just a class that we need to instantiate and keep alive. We’ll see that gRPC provides a very simple way to build a server instance.

Practically speaking, implementing the service boils down to inheriting from the generated base class and overriding the service functions. So we might opt for a class that inherits from both so_5::agent_t and the service skeleton:

class calico_server_agent : public so_5::agent_t, public calico_service::Service
{
public:
	calico_server_agent(so_5::agent_context_t ctx)
		: agent_t(std::move(ctx))
	{
	}
	
	grpc::Status send_command(grpc::ServerContext* context, const send_command_request* request, send_command_response* response) override
	{
		// ...
	}
};

Although this solution is compact and simple, it conceals a subtle issue. Indeed, using the so-called gRPC synchronous API has a consequence: when the service receives a remote method request, gRPC picks a thread from a pool to handle and execute that (the thread is blocked until the operation completes). This ultimately leads to calling send_command from a thread that is not managed by SObjectizer. This should be avoided because changing the state of the agent from here would require synchronizing with the agent’s ordinary handling of messages. As extensively discussed by Yauheni here, “SObjectizer’s user usually assumes that an agent works on just one thread and there is no need for protection of the agent’s state”. Even though the implementation of send_command does not change the agent state, calling agents methods from non-SObjectizer threads is unnatural and should be avoided as much as possible, also to express intent clearly through a more bulletproof design. As if that were not enough, some agent’s functions can’t be called at all from non-SObjectizer threads, including so_subscribe() and so_change_state().

Then you see that gRPC threading clashes with SObjectizer. The only way to get more control on threading requires us to use the gRPC “asynchronous API” that is more convoluted.

So, we end up with two classes:

  • service_facade, the agent that brings calico to gRPC by managing the server and registering our service to it;
  • service_impl, the actual implementation of our service.

service_facade recalls one of the GoF’s patterns and encapsulates the intricacies of gRPC within a familiar, simplified entry point, which is essentially an agent.

class service_impl : public calico_service::Service
{
public:
	grpc::Status send_command(grpc::ServerContext* context, const send_command_request* request, send_command_response* response) override;
	// ...
};

class service_facade final : public so_5::agent_t
{
public:
	service_facade(so_5::agent_context_t ctx);
	
	// ...
private:
	// ...
	service_impl m_service_impl;
};

The implementation of send_command is straightforward: we check if the command name is valid and, if so, we send the corresponding signal to the specified channel. Fancy things are possible but we opt for a simple implementation. The only missing piece is how to obtain the channel (aka: the message box). Well, several solutions are possible but we choose to inject a parameter object into service_impl holding a reference to the SObjectizer environment:

struct service_params
{
	so_5::environment_t& environment;
};

class service_impl : public calico_service::Service
{
public:
	explicit service_impl(service_params params)
		: m_params(params)
	{
	}
	
	grpc::Status send_command(grpc::ServerContext*, const send_command_request* request, send_command_response*) override
	{
		const auto& channel_name = request->channel_name();
		const auto& command_name = request->command_name();
		auto status = grpc::Status::OK;

		if (command_name == "start_acquisition")
		{
			so_5::send<start_acquisition_command>(m_params.environment.create_mbox(channel_name));
		}
		else if (command_name == "stop_acquisition")
		{
			so_5::send<stop_acquisition_command>(m_params.environment.create_mbox(channel_name));
		}
		else
		{
			status = grpc::Status{ grpc::StatusCode::INVALID_ARGUMENT, std::format("command {} not found", command_name) };
		}
return status;
	}
private:
	service_params m_params;
};

class service_facade final : public so_5::agent_t
{
public:
	service_facade(so_5::agent_context_t ctx)
		: so_5::agent_context_t(std::move(ctx)), m_service_impl({so_environment()})
	{
	}
	
	// ...
private:
	// ...
	service_impl m_service_impl;
};

ServerContext is not important now. send_command_request contains the message data that can be accessed smoothly. Nothing really new here if you are a bit familiar with Protocol Buffers.

Remember that we aim for adding such an agent to the cooperation and everything should work out of the box. For this reason, we need to create a server and register service_impl to it. so_5::agent_t‘s capabilities come in handy: we create and start the server in so_evt_start() and shut it down in so_evt_finish():

class service_facade final : public so_5::agent_t
{
public:
	service_facade(so_5::agent_context_t ctx)
		: so_5::agent_context_t(std::move(ctx)), m_service_impl({so_environment()})
	{
	}
	
	void so_evt_start() override
	{
		grpc::ServerBuilder builder;
		builder.AddListeningPort("0.0.0.0:50051", grpc::InsecureServerCredentials());
		builder.RegisterService(&m_service_impl);
		m_server = builder.BuildAndStart();
	}
	
	void so_evt_finish() override
	{
		m_server->Shutdown();
		m_server->Wait();
	}
private:
	std::unique_ptr<grpc::Server> m_server;
	service_impl m_service_impl;
};

As you can see, gRPC provides a handy ServerBuilder to build a grpc::Server. It supports functions like AddListeningPort to add an endpoint and RegisterService to add a service to the party.

The endpoint can be configured and injected from outside but we leave this hardcoded for simplicity. Moreover, it’s interesting to note that, although m_server->Wait(); blocks until the shutdown completes, since we’ll give this agent its own worker thread, it won’t block others.

Finally, we can add service_facade to the main cooperation and run the program:

int main()
{
	const auto ctrl_c = get_ctrlc_token();

	const so_5::wrapped_env_t sobjectizer;
	const auto main_channel = sobjectizer.environment().create_mbox("main");
	const auto commands_channel = sobjectizer.environment().create_mbox("commands");
	const auto message_queue = create_mchain(sobjectizer.environment());

	sobjectizer.environment().introduce_coop(so_5::disp::active_obj::make_dispatcher(sobjectizer.environment()).binder(), [&](so_5::coop_t& c) {
		c.make_agent<image_producer_recursive>(main_channel, commands_channel);
		c.make_agent<maint_gui::remote_control>(commands_channel, message_queue);
		c.make_agent<maint_gui::image_viewer_live>(main_channel, message_queue);
		c.make_agent<maint_gui::image_viewer>(main_channel, message_queue);
		c.make_agent<service_facade>();
	});

	do_gui_message_loop(ctrl_c, message_queue, sobjectizer.environment().create_mbox(constants::waitkey_channel_name));
}

Playing with the service

At this point, we have a fully-fledged gRPC service hosted in calico. To call this service from other programs we clearly have to write some code, even in different programming languages. However, using a postman-like application might be easier and quicker for smoke testing our task. Indeed, BloomRPC has been created just for that – even though we have just discovered that has been recently deprecated (eventually, check this out for an alternative).

We launch calico and BloomRPC. Then we add the service proto and make the call, for example sending start_acquisition:

Then calico starts acquiring images! Then we can stop it by sending stop_acquisition.

Task accomplished!

Takeaway

In this episode we have learned:

  • gRPC is a framework for connecting programs over the network;
  • to design a gRPC-based service, we declare its methods into a particular file and then we generate code stubs through protoc;
  • to implement a gRPC-based service, we just inherit from a base class and override its methods;
  • when non-SObjectizer threads need to coexist with agents, we have to be careful.

What’s next?

We have just released a new version of calico and Ekt is going to install it on several machines. Then he will be busy for a few days with the control dashboard implementation. He tried to convince us to do some UI but we stepped away!

While getting back to our desk, we grab a coffee at the break area and we meet Giulia, a colleague from another team: “I developed a C# program that performs image analysis from drone cameras. However, my program works offline by taking a batch of images. It would be awesome to get and process such images in real-time, as acquisition progresses. Can I use calico for that purpose?”.

This conversation happens at the perfect time! In the next post, we’ll add another method to calico_service that distributes images out of process!


Thanks to Yauheni Akhotnikau for having reviewed this post.

Leave a comment