SObjectizer Tales – 27. Design ideas

Posted: April 11, 2024 in SObjectizer Tales
Tags: , ,

After pairing with Ronnie, it became apparent that there is a need to delve into broader design considerations that could benefit inexperienced users of SObjectizer. This exploration might also be useful for designing message-passing styled applications in general.

Although the topic is vast, the goal here is to offer key insights and raise awareness about important aspects that have been relevant throughout my experience with SObjectizer over the years. This discussion aims to inspire new ideas and approaches to design applications using this paradigm.

There is no “interface”

Agents within SObjectizer offer the flexibility to handle any message type without requiring a traditional public interface. For instance, an agent subscribing to a channel like main for messages of type cv::Mat doesn’t need to know about the sender, and vice versa. Agents operate akin to (theoretical) microservices, decoupled from one another and interacting solely through message exchange.

This model inherently promotes the Open/Closed Principle, as components can adapt to evolving business needs without internal modifications. For example, in projects like calico, rearranging agents can yield vastly different workflows to address diverse business requirements. Even in the absence of a particular component, it can be seamlessly integrated into the system without disrupting existing ones.

However, while not exposing an explicit interface, it doesn’t imply that agents can handle every possible message. The drawback here is that an agent’s “intent” is only explainable from its implementation details.

In essence, understanding which messages an agent can handle requires delving into its source code, often limited to functions like so_define_agent(). This contrasts with classical Object-Oriented Programming, where classes declare methods corresponding to the messages they handle. Moreover, Object-Oriented Programming further abstracts this aspect by introducing the concept of interface, which represents a list of methods that implementers guarantee to define. This distinction highlights unique considerations when designing systems with SObjectizer and with the actor model in general, as actors provide no interface at all.

The situation becomes more intricate when considering agent states. Indeed, agents not only process messages but also adapt their behavior based on their current state. Also, transitions between these states are not exposed at all.

Some actor model-based frameworks have tackled this common issue by introducing a “typed” actor concept, as seen in Akka. While SObjectizer doesn’t inherently include this concept, there have been initial efforts to explore whether SObjectizer users experience similar challenges and to propose a potential solution, as blogged in this initial draft (in Russian).

My personal experience with this issue is limited because my team and I usually have access to the source of agents. However, as a best practice, I tend to roughly document the “interface” of existing agents in the header files, as illustrated here below:

// handles [cv::Mat]
class image_tracer final : public so_5::agent_t
...

// handles [cv::Mat]
// outputs [cv::Mat]
class face_detector final : public so_5::agent_t
...

In more complex cases, I rely on common sense to ensure that the comments are clear and informative without becoming overly complicated.

Another important but often underestimated tool for documenting agents is unit and integration testing, as discussed in another article. Tests should be designed to identify and showcase both typical and edge cases, offering runnable code that can be beneficial for other developers.

Effective agent organization

One key aspect we’ve learned about agents is that they are always registered and deregistered within a cooperation. Essentially, this means that we cannot simply create an agent and inject it into the environment; instead, we must add it to a cooperation first. Once a cooperation is established and running, however, there is no built-in mechanism to add new agents to it. This design decision aligns with the transactional nature of cooperations. But, in many scenarios, dynamic creation of agents is a common requirement, necessitating the creation of new cooperations to host these agents.

A straightforward approach to manage dynamically created agents in SObjectizer is to introduce a new cooperation for each new agent. The SObjectizer environment offers a function called register_agent_as_coop() precisely for this purpose. Typically, it’s a good practice to organize these one-agent cooperations as children of larger cooperations that are relevant to the system. For instance, in a system like calico, we might dynamically create all agents as children of a larger cooperation designated for each camera. This organizational structure simplifies the process of deregistering all agents associated with a specific device, as they are grouped within the same cooperation. For example, if a camera is removed from the system, we can easily deallocate all the agents associated with it by deregistering the cooperation that represents the camera’s “group”.

Certainly, the same approach remains effective when multiple agents are logically part of the same cooperation. In such cases, we wouldn’t create individual cooperations for each agent but rather group them together within a single one. However, when we need to trigger deregistration for a single agent, as learnt in a previous post, opting for one cooperation per agent is usually preferred.

Another method we explored in the previous post involved organizing agents based on their functionality. For instance, we grouped all the “monitoring” agents into the same cooperation. In this case, we initially created all the involved agents statically at program startup, but the same can be achieved dynamically as discussed earlier.

An additional rationale for structuring cooperations in hierarchies is to facilitate the sharing and propagation of dispatchers. A recent update of SObjectizer includes new functionalities that allow access to both agent and cooperation dispatchers. This enhancement was prompted by feedback provided by a user and myself.

A compelling example is a workload distribution scheme featuring a root cooperation and several dynamically created worker agents. Typically, we create the cooperation with a specific binder and pass it to the root agent to generate workers accordingly. However, this approach not only adds boilerplate code but also fails to propagate to any children of the workers (again, we have to pass it explicitly). A cleaner solution is to leverage the new features introduced in SObjectizer 5.8.1:

void root_agent::so_evt_start()
{
	so_environment().introduce_child_coop(*this, so_this_coop_disp_binder(), [](so_5::coop_t& c) {
		// ...
	});
}

In essence:

  • so_this_coop_disp_binder() gets the binder of the agent’s cooperation;
  • so_this_agent_disp_binder() gets the binder of the agent.

Domain-specific abstractions on top of SObjectizer

When adopting SObjectizer, there are three possible scenarios to consider:

  1. starting from scratch: this involves beginning a new project where SObjectizer is the chosen concurrency framework from the outset;
  2. starting from a codebase without existing concurrency: in this scenario, there is no prior concurrency layer in the codebase, making the integration of SObjectizer per se relatively straightforward (the most complicated part is how to redesign the system for concurrency);
  3. starting from a codebase with an existing concurrency layer: this scenario presents more complexity as there is already some concurrency code implemented using another library, whether it be a first-party or third-party solution.

In my case, I was in the third scenario as I had existing code based on PPL’s agents. I had previously encapsulated the existing library within a thin abstraction layer, which enabled me to work side-by-side without disrupting the existing codebase. The main task involved implementing another version of this layer, particularly to accommodate the addition of new “agents” specific to SObjectizer.

In essence, the new layer introduces an “agent manager” entity responsible for dynamically adding agents to the system. However, to better organize the interactions and accommodate the scenario similar to calico, where multiple cameras are involved, I decided to introduce an additional abstraction between the manager and its clients.

In this setup, clients interact with a “session” entity, which offers functionalities for introducing agents to the system. At the logic level, each session corresponds to a device. This is also beneficial for testing, as creating multiple sessions emulates using multiple devices.

But the real benefit of the session lies elsewhere: agents are not added randomly, instead, the session provides functions for adding agents to predefined logical groups (cooperations), such as “monitoring” or “high-priority”. Internally, these groups are bound to predetermined dispatchers but there is also the possibility to create new custom groups. All such groups are structured as children of a root cooperation corresponding – still at the logic level – to a specific device.

Additionally, the session facilitates the creation and referencing of message boxes and chains, streamlining the management process. It ensures that chains are automatically closed when the session is destroyed or when the environment is shut down.

Here below is a basic implementation of the agent manager, as a foundation for further development:

enum class chain_close_policy
{
	retain = 0,
	drop
};

using chain_action = std::function<void(const so_5::mchain_t&)>;

struct chain_closer
{
	using pointer = so_5::mchain_t;
	void operator()(const so_5::mchain_t& chain) const
	{
		closer(chain);
	}
	chain_action closer;
};

using chain_holder = std::unique_ptr<so_5::mchain_t, chain_closer>;

using coop_pair = std::pair<so_5::coop_handle_t, so_5::disp_binder_shptr_t>;

struct session_coops
{
	coop_pair root_coop;
	std::map<std::string, coop_pair, std::less<>> group_to_coop;
};

struct agent_session_state
{
	std::reference_wrapper<so_5::environment_t> env;
	std::string session_id;
	session_coops coops;
	std::function<so_5::mchain_t(const std::string&, chain_close_policy)> create_chain;
};

class agent_manager
{
public:
	[[nodiscard]] agent_session create_session();
	[[nodiscard]] agent_session create_session(const std::string& session_name);
	bool destroy_session(const std::string& session_id);
private:
	// ...

	so_5::wrapped_env_t m_sobjectizer;
	std::map<std::string, session_coops> m_session_to_coop;
	std::map<std::string, std::vector<chain_holder>> m_chains;
	int m_session_progressive_counter = 0;
};

Here is a brief description of the main entities involved:

  • chain_holder, implemented using unique_ptr, acts as an RAII-wrapper ensuring the closure of the chain upon its destruction. It utilizes chain_closer to customize the closure mechanism of the chain;
  • agent_session_state includes the data utilized by an agent session, as elaborated later. In addition to the environment, a unique identifier, and a function object for message chain creation (which reenters agent manager), the session includes session_coops;
  • session_coops is a structure containing all “cooperation roots” currently available and their binders.

In essence, the agent manager:

  • creates and destroys sessions;
  • manages message chains.

In general, create_session() functions might take an extra parameter containing the “recipe” of the predefined cooperations (session_coops::group_to_coop) to include in the session. Here below is an incomplete implementation of agent_session:

class agent_session
{
public:
	explicit agent_session(agent_session_state ctx)
		: m_ctx(std::move(ctx))
	{
	}

	template<typename T, typename... Args>
	void add_agent(std::string_view group_name, Args&&... args);

	template<typename T, typename... Args>
	void add_monitoring_agent(Args&&... args);

	template<typename T, typename... Args>
	void add_core_agent(Args&&... args);

	template<typename T, typename... Args>
	void add_dedicated_thread_agent(Args&&... args);

	[[nodiscard]] so_5::mbox_t get_channel() const,
	[[nodiscard]] so_5::mbox_t get_channel(const std::string& name) const;
	[[nodiscard]] so_5::mchain_t make_chain(chain_close_policy close_policy = chain_close_policy::drop) const;
	[[nodiscard]] const std::string& get_id() const;
	[[nodiscard]] so_5::environment_t& get_env() const;
 private:
    // ...

	agent_session_state m_ctx;
};

In this example, there are no functions to create groups but, instead, the session provides domain-specific named functions for adding agents to existing groups. The available groups are added when the session is created by the manager.

At this point, there’s no need to access the environment directly, as each group already provides its own binders. Here’s the example from the previous post rewritten in terms of the agent manager and session:

int main()
{
	const auto ctrl_c = utils::get_ctrlc_token();

	agent_manager manager;
	auto session = manager.create_session("webcam");
	
	const auto main_channel = session.get_channel("main");
	const auto commands_channel = session.get_channel("commands");
	const auto message_queue = session.make_chain();
	const auto waitkey_out = session.get_channel(constants::waitkey_channel_name);

	session.add_dedicated_thread_agent<image_producer>(main_channel, commands_channel);
	session.add_dedicated_thread_agent<service_facade>();

	session.add_monitoring_agent<image_tracer>(main_channel);
	session.add_monitoring_agent<fps_estimator>(std::vector{ main_channel });
	session.add_monitoring_agent<telemetry_agent>();
	session.add_monitoring_agent<stream_heartbeat>(main_channel);
	session.add_monitoring_agent<error_logger>(main_channel);
	
	session.add_core_agent<remote_control>(commands_channel, message_queue);
	
	auto resized = session.get_channel();
	session.add_core_agent<image_resizer>(main_channel, resized, 0.5);
	auto faces = session.get_channel();
	session.add_core_agent<face_detector>(resized, faces);
	
	session.add_core_agent<image_viewer>(faces, message_queue);
		
	do_gui_message_loop(ctrl_c, message_queue, waitkey_out);
}

As you see, since we don’t return created agents from add_agent-like functions, there is no way to get output from such agents as before. For this reason, we opted for using explicitly created channels.

By the way, these are just open design ideas and are not intended to cover all the scenarios. A full implementation of this “layer” is provided in the latest version of calico.

Handling “any” message type

In SObjectizer, an agent can only subscribe for messages of an explicitly defined type:

so_subscribe(channel).event([](const this_is_a_defined_type&) {
	// ...
});

It’s impossible to subscribe to a message type that hasn’t been specified. This differs from frameworks like CAF, where there is support for a default handler.

Why should we ever need this?

For example, in calico, suppose we add support for a new image type and we want image_tracer to handle it. We have two options to accommodate this change:

  1. subscribe to this additional image type within the agent;
  2. introduce a “converter” agent that converts the new image type into cv::Mat. Then, bind the output of this agent to the input of image_tracer using a channel.

However, a simpler approach could be to subscribe to a generic “unhandled” type, like so:

so_subscribe(channel).event([](so_5::<unhandled>) {
	osyncstream(cout) << "got a new message\n";
});

This approach may not provide the exact functionality because image_tracer currently prints the size of the image (an information that wouldn’t exist in unhandled) and ignores non-image types. However, for troubleshooting purposes, it could be an acceptable trade-off. Evidently, this unhandled type does not exist in SObjectizer.

Another hypothetical solution proposed by Yauheni would involve leveraging inheritance to handle message types. In this approach, the event-handler searching procedure of SObjectizer would be modified. If an event handler for a specific type is not found, SObjectizer would then search for any handler of its base type:

class image_base : so_5::message_with_fallback_t {...};
class image_vendor_A : public image_base {...};
class image_vendor_B : public image_base {...};
...
void first_agent::so_define_agent() {
  so_subscribe_self()
    .event([this](mhood_t<image_vendor_A> cmd) {...})
    .event([this](mhood_t<image_vendor_B> cmd) {...})
    .event([this](mhood_t<image_base> cmd) {...})
    ...
}

void second_agent::so_define_agent() {
  so_subscribe_self()
    .event([this](mhood_t<image_base>) { ++m_counter; });
}
...

Here above, if second_agent receives an instance of image_vendor_B, the handler would be executed. However, we clarify again that this behavior is not supported at the moment and, likely, won’t be in the future.

Message proliferation

Another question related to the above-mentioned topic concerns the “fatness” of agents. Should an agent be responsible for handling a wide variety of messages, or is it preferable, whenever feasible, to limit its scope to specific types of messages? In general, there is a tendency to utilize the versatility of message handlers to efficiently manage multiple types within a single agent, however there are situations where supporting an excessive number of types can become cumbersome.

For example, in calico we have developed more than a dozen agents all handling cv::Mat only. What if we need to introduce and handle another image type? The first two options have been already discussed: either handle this new message type into all current agents or introduce a “converter” agent (or a message sink equipped with the new bind_and_transform()) that will be the only agent coupled with this new image type.

As usual, it depends. Some possible reasons to prefer handling the new type explicitly:

  • functionalities: the new type introduces functions and properties that we are required to use;
  • performance: the conversion incurs a cost we are not allowed to pay;
  • it’s not possible to do otherwise (e.g. the conversion is not possible).

However, if none of these conditions are met, opting for a conversion might be a more lightweight choice. In this case, a radical approach involves designing a “generic image type” that represents any possible image type within the system. Then, all agents would use this type instead of cv::Mat. This approach resembles the idea of “anywhere a constant string is required, use string_view“. In our scenario, this means designing a sort of “generic image type” that would be created from any image type within the system. However, this approach entails trade-offs, as there may be scenarios in the future where this generic type cannot effectively handle a new hypothetical type, or the cost of performing the creation is still not sustainable.

Finally, a hybrid approach could be adopted, where only specific agents use the new type when necessary. For instance, the image_tracer might log additional information, or the image_saver_worker might utilize native-type save operations.

Another aspect to consider is the dilemma of choosing between signals and messages. In principle, the absence of state distinguishes messages as signals. However, there are scenarios where additional data might be needed in the future. For instance, let’s say we need to add a timestamp to camera start and stop commands for storage purposes. Would the code handling these commands need to change to accommodate this new requirement?

In practice, the code handling signals is often designed to be resilient to such changes. This is because signals are handled using mhood_t<signal_type>, which continues to work seamlessly even if the signal evolves into a message. However, the reverse scenario is not true, as ordinary messages can be handled without wrapping the type into a mhood_t.

As discussed in previous articles, the most future-proof approach to handling types in SObjectizer is by using mhood_t. However, my personal preference is to break code if a message transitions to a signal, as this would likely indicate a significant change in the system’s behavior.

One agent per operation?

When designing service_facade, we initially chose to spawn “one agent per client,” with each agent subscribing to the client’s requests and managing gRPC streaming by writing images back to the client when available. While this approach is straightforward and works well for a small number of clients, it presents scalability challenges.

Firstly, we observed that having too many subscribers can impact performance, as the sending operation is influenced by the number of receivers. For example, sending an integer from an agent on a dedicated thread to 10,000 “no-op” agents in a thread pool with 4 threads takes approximately 2 milliseconds on my machine. While this delay might be negligible in some scenarios, it can become significant in others. Secondly, managing a large number of agents can become cumbersome in terms of debugging, profiling, and observability. Finally, there may be missed optimization opportunities. For instance, when multiple clients are subscribed to the same channel (that is very common in our use case), each service worker performs image compression to JPEG before sending. However, this results in redundant compression operations performed simultaneously by multiple agents, leading to inefficiency as the compression is typically a CPU-bound operation.

Hence, we often adopt an alternative approach where agents represent stages of a pipeline, minimizing agent proliferation. For example, an alternative implementation of the “calico service” could involve the following components:

  1. only one agent is tasked with subscribing to channels on behalf of clients, maintaining a mapping of channels to clients;
  2. another agent equipped with thread-safe handlers (or a group of multiple agents), handles the compression of frames from one channel and forwards the compressed results to the next stage;
  3. another agent equipped with thread-safe handlers (or a group of multiple agents), is responsible for writing each compressed frame back to the specific client, ensuring the correct order of writes.

For agents responsible for steps 2 and 3, we may choose to utilize two different thread pools. For step 2, where compression of frames occurs, we should consider the guidelines for CPU-bound tasks. This means sizing the thread pool according to the number of available CPU cores (or slightly fewer to leave room for other system tasks, or +1 as others recommend). On the other hand, for step 3, which involves writing compressed frames back to specific clients, we should apply the guidelines for I/O-bound tasks. In this case, it’s beneficial to have more threads than the number of CPU cores to maximize CPU resource utilization during wait times for I/O operations.

While this approach shares similarities with Staged Event-Driven Architecture (SEDA), the flexibility inherent in SObjectizer enables us to overcome some of the challenges associated with that pattern, including thread pool management.

Takeaway

In this episode we have learned:

  • although agents provide significant flexibility, their lack of explicit interfaces can make it challenging to discern which messages they handle and in what state they operate;
  • organizing cooperations in hierarchies can help manage the system more effectively, such as deregistering a group of related agents;
  • creating a manager either tailored on specific needs or more generic to encapsulate SObjectizer functionality could be beneficial, particularly for simplifying the dynamic creation and management of agents;
  • SObjectizer does not permit subscriptions for unspecified types, meaning it’s not feasible to establish “default handlers” or similar mechanisms;
  • handling multiple message and signal types from an agent is common practice, but excessive type proliferation can become inconvenient. In such scenarios, opting for conversion to a common type or exploring hybrid approaches can be beneficial;
  • proliferation of agents often leads to scalability and observability issues; therefore, the preferred design strategy involves creating agents that represent stages of a processing pipeline, as in SEDA (Staged Event-Driven Architecture).

As usual, calico is updated and tagged.

What’s next?

After a productive day of pair programming with Ronnie, as we head back home, we run into Dan, an experienced developer on our team, who is eager to share some feedback on SObjectizer.

In the next installment, we’ll delve into what Dan dislikes the most about the library and explore what he would change if he had a magic wand.


Thanks to Yauheni Akhotnikau for having reviewed this post.

Leave a comment