src/core/messenger/Messenger.cpp

Implementation of messenger. More…

Functions

Name
bool check_send(Module * source, BaseMessage * message, BaseDelegate * delegate)

Detailed Description

Implementation of messenger.

Copyright: Copyright (c) 2017-2024 CERN and the Allpix Squared authors. This software is distributed under the terms of the MIT License, copied verbatim in the file “LICENSE.md”. In applying this license, CERN does not waive the privileges and immunities granted to it by virtue of its status as an Intergovernmental Organization or submit itself to any jurisdiction. SPDX-License-Identifier: MIT

Functions Documentation

function check_send

static bool check_send(
    Module * source,
    BaseMessage * message,
    BaseDelegate * delegate
)

Source code


#include "Messenger.hpp"

#include <memory>
#include <stdexcept>
#include <string>
#include <typeindex>

#include "Message.hpp"
#include "core/module/Module.hpp"
#include "core/utils/log.h"
#include "core/utils/type.h"
#include "delegates.h"

using namespace allpix;

Messenger::Messenger() = default;
#ifdef NDEBUG
Messenger::~Messenger() = default;
#else
Messenger::~Messenger() { assert(delegate_to_iterator_.empty()); }
#endif

// Check if the detectors match for the message and the delegate and that we don't have self-dispatch
static bool check_send(Module* source, BaseMessage* message, BaseDelegate* delegate) {
    if(delegate->getDetector() != nullptr &&
       (message->getDetector() == nullptr || delegate->getDetector()->getName() != message->getDetector()->getName())) {
        return false;
    }
    if(delegate->getUniqueName() == source->getUniqueName()) {
        return false;
    }
    return true;
}

bool Messenger::hasReceiver(Module* source, const std::shared_ptr<BaseMessage>& message) {
    std::lock_guard<std::mutex> lock(mutex_);

    const BaseMessage* inst = message.get();
    std::type_index type_idx = typeid(*inst);

    // Get the name of the output message
    auto name = source->get_configuration().get<std::string>("output");

    // Check if a normal specific listener exists
    for(auto& delegate : delegates_[type_idx][name]) {
        if(check_send(source, message.get(), delegate.get())) {
            return true;
        }
    }
    // Check if a normal generic listener exists
    for(auto& delegate : delegates_[type_idx]["*"]) {
        if(check_send(source, message.get(), delegate.get())) {
            return true;
        }
    }
    // Check if a base message specific listener exists
    for(auto& delegate : delegates_[typeid(BaseMessage)][name]) {
        if(check_send(source, message.get(), delegate.get())) {
            return true;
        }
    }
    // Check if a base message generic listener exists
    for(auto& delegate : delegates_[typeid(BaseMessage)]["*"]) {
        if(check_send(source, message.get(), delegate.get())) {
            return true;
        }
    }

    return false;
}

bool Messenger::isSatisfied(BaseDelegate* delegate, Event* event) const {
    auto* local_messenger = event->get_local_messenger();
    return local_messenger->isSatisfied(delegate);
}

void Messenger::add_delegate(const std::type_info& message_type,
                             Module* module,
                             const std::shared_ptr<BaseDelegate>& delegate) {
    std::lock_guard<std::mutex> lock(mutex_);

    // Register generic or specific delegate depending on flag
    std::string message_name;
    if((delegate->getFlags() & MsgFlags::IGNORE_NAME) != MsgFlags::NONE) {
        message_name = "*";
    } else if((delegate->getFlags() & MsgFlags::UNNAMED_ONLY) != MsgFlags::NONE) {
        message_name = "?";
    } else {
        message_name = module->get_configuration().get<std::string>("input");
    }

    // Register delegate internally
    delegates_[std::type_index(message_type)][message_name].push_back(delegate);
    auto delegate_iter = --delegates_[std::type_index(message_type)][message_name].end();
    delegate_to_iterator_.emplace(delegate_iter->get(),
                                  std::make_tuple(std::type_index(message_type), message_name, delegate_iter));

    // Add delegate to the module itself
    module->add_delegate(this, delegate_iter->get());
}

void Messenger::remove_delegate(BaseDelegate* delegate) {
    std::lock_guard<std::mutex> lock(mutex_);

    auto iter = delegate_to_iterator_.find(delegate);
    if(iter == delegate_to_iterator_.end()) {
        throw std::out_of_range("delegate not found in listeners");
    }
    delegates_[std::get<0>(iter->second)][std::get<1>(iter->second)].erase(std::get<2>(iter->second));
    delegate_to_iterator_.erase(iter);
}

std::vector<std::pair<std::shared_ptr<BaseMessage>, std::string>> Messenger::fetchFilteredMessages(Module* module,
                                                                                                   Event* event) {
    try {
        auto* local_messenger = event->get_local_messenger();
        return local_messenger->fetchFilteredMessages(module);
    } catch(const std::out_of_range& e) {
        // No messages available after filtering, return empty vector:
        return {};
    }
}

LocalMessenger::LocalMessenger(Messenger& global_messenger) : global_messenger_(global_messenger) {}

void LocalMessenger::dispatchMessage(Module* source, std::shared_ptr<BaseMessage> message, std::string name) { // NOLINT
    // Get the name of the output message
    if(name == "-") {
        name = source->get_configuration().get<std::string>("output");
    }

    bool send = false;

    // Send messages to specific listeners
    send = dispatchMessage(source, message, name, name) || send;

    // Send to generic listeners
    send = dispatchMessage(source, message, name, "*") || send;

    // Send to listeners of unnamed messages
    if(name.empty()) {
        send = dispatchMessage(source, message, name, "?") || send;
    }

    // Display a TRACE log message if the message is send to no receiver
    if(!send) {
        const BaseMessage* inst = message.get();
        LOG(TRACE) << "Dispatched message " << allpix::demangle(typeid(*inst).name()) << " from " << source->getUniqueName()
                   << " has no receivers!";
    }

    // Save a copy of the sent message
    sent_messages_.emplace_back(message);
}

bool LocalMessenger::dispatchMessage(Module* source,
                                     const std::shared_ptr<BaseMessage>& message,
                                     const std::string& name,
                                     const std::string& id) {
    bool send = false;

    // Create type identifier from the typeid
    const BaseMessage* inst = message.get();
    std::type_index type_idx = typeid(*inst);

    // Retrieve listeners for the given message type and name
    const auto msg_type_iterator = global_messenger_.delegates_.find(type_idx);
    if(msg_type_iterator != global_messenger_.delegates_.end()) {
        const auto msg_name_iterator = msg_type_iterator->second.find(id);
        if(msg_name_iterator != msg_type_iterator->second.end()) {
            // Send messages only to their specific listeners
            for(const auto& delegate : msg_name_iterator->second) {
                if(check_send(source, message.get(), delegate.get())) {
                    LOG(TRACE) << "Sending message " << allpix::demangle(type_idx.name()) << " from "
                               << source->getUniqueName() << " to " << delegate->getUniqueName();
                    // Construct BaseMessage where message should be stored
                    auto& dest = messages_[delegate->getUniqueName()][type_idx];

                    delegate->process(message, name, dest);
                    send = true;
                }
            }
        }
    }

    // Dispatch to base message listeners
    assert(typeid(BaseMessage) != typeid(*inst));
    const auto base_msg_type_iterator = global_messenger_.delegates_.find(typeid(BaseMessage));
    if(base_msg_type_iterator != global_messenger_.delegates_.end()) {
        const auto msg_name_iterator = base_msg_type_iterator->second.find(id);
        if(msg_name_iterator != base_msg_type_iterator->second.end()) {
            for(const auto& delegate : msg_name_iterator->second) {
                if(check_send(source, message.get(), delegate.get())) {
                    LOG(TRACE) << "Sending message " << allpix::demangle(type_idx.name()) << " from "
                               << source->getUniqueName() << " to generic listener " << delegate->getUniqueName();
                    auto& dest = messages_[delegate->getUniqueName()][typeid(BaseMessage)];
                    delegate->process(message, name, dest);
                    send = true;
                }
            }
        }
    }

    return send;
}

std::vector<std::pair<std::shared_ptr<BaseMessage>, std::string>> LocalMessenger::fetchFilteredMessages(Module* module) {
    const std::type_index type_idx = typeid(BaseMessage);
    return messages_.at(module->getUniqueName()).at(type_idx).filter_multi;
}

bool LocalMessenger::isSatisfied(BaseDelegate* delegate) const {
    // check our records for messages for this module
    const std::string name = delegate->getUniqueName();
    auto messages_iter = messages_.find(name);
    if(messages_iter == messages_.end()) {
        return false;
    }

    // check if this delegate is in our records
    auto delegate_iter = global_messenger_.delegate_to_iterator_.find(delegate);
    if(delegate_iter == global_messenger_.delegate_to_iterator_.end()) {
        throw std::out_of_range("delegate not found in listeners");
    }

    // check our records for messages for this delegate
    auto iter = messages_iter->second.find(std::get<0>(delegate_iter->second));
    if(iter == messages_iter->second.end()) {
        return false;
    }

    return true;
}

Updated on 2024-12-13 at 08:31:37 +0000