src/core/module/ModuleManager.cpp
Implementation of module manager. More…
Functions
Name | |
---|---|
Configuration & | add_instance_configuration(ConfigManager * config_manager, const ModuleIdentifier & identifier, const Configuration & config) |
std::string | nanoseconds_to_time(uint64_t nanoseconds) |
Defines
Name | |
---|---|
ALLPIX_MODULE_PREFIX | |
ALLPIX_GENERATOR_FUNCTION | |
ALLPIX_UNIQUE_FUNCTION |
Detailed Description
Implementation of module manager.
Copyright: Copyright (c) 2017-2024 CERN and the Allpix Squared authors. This software is distributed under the terms of the MIT License, copied verbatim in the file “LICENSE.md”. In applying this license, CERN does not waive the privileges and immunities granted to it by virtue of its status as an Intergovernmental Organization or submit itself to any jurisdiction. SPDX-License-Identifier: MIT
Functions Documentation
function add_instance_configuration
static inline Configuration & add_instance_configuration(
ConfigManager * config_manager,
const ModuleIdentifier & identifier,
const Configuration & config
)
Calls config_manager->addInstanceConfiguration(identifier, config) while handling ModuleIdentifierAlreadyAddedError
function nanoseconds_to_time
static std::string nanoseconds_to_time(
uint64_t nanoseconds
)
Macros Documentation
define ALLPIX_MODULE_PREFIX
#define ALLPIX_MODULE_PREFIX "libAllpixModule"
define ALLPIX_GENERATOR_FUNCTION
#define ALLPIX_GENERATOR_FUNCTION "allpix_module_generator"
define ALLPIX_UNIQUE_FUNCTION
#define ALLPIX_UNIQUE_FUNCTION "allpix_module_is_unique"
Source code
#include "ModuleManager.hpp"
#include "Event.hpp"
#include <dlfcn.h>
#include <unistd.h>
#include <algorithm>
#include <chrono>
#include <cstring>
#include <filesystem>
#include <fstream>
#include <limits>
#include <set>
#include <stdexcept>
#include <string>
#include <TROOT.h>
#include <TSystem.h>
#include "core/config/ConfigManager.hpp"
#include "core/config/Configuration.hpp"
#include "core/config/exceptions.h"
#include "core/geometry/GeometryManager.hpp"
#include "core/messenger/Messenger.hpp"
#include "core/utils/log.h"
// Common prefix for all modules
// TODO [doc] Should be provided by the build system
#define ALLPIX_MODULE_PREFIX "libAllpixModule"
// These should point to the function defined in dynamic_module_impl.cpp
#define ALLPIX_GENERATOR_FUNCTION "allpix_module_generator"
#define ALLPIX_UNIQUE_FUNCTION "allpix_module_is_unique"
using namespace allpix;
ModuleManager::ModuleManager() : terminate_(false) {}
void ModuleManager::load(Messenger* messenger, ConfigManager* conf_manager, GeometryManager* geo_manager) {
// Store config manager and get configurations
conf_manager_ = conf_manager;
auto& configs = conf_manager_->getModuleConfigurations();
Configuration& global_config = conf_manager_->getGlobalConfiguration();
// Set alias for backward compatibility with the previous keyword for multithreading
global_config.setDefault("multithreading", true);
multithreading_flag_ = global_config.get<bool>("multithreading");
// Set default for performance plot creation:
global_config.setDefault("performance_plots", false);
// Store the messenger
messenger_ = messenger;
// (Re)create the main ROOT file
auto path = std::filesystem::path(gSystem->pwd()) / global_config.get<std::string>("root_file", "modules");
path.replace_extension("root");
if(std::filesystem::is_regular_file(path)) {
if(global_config.get<bool>("deny_overwrite", false)) {
throw RuntimeError("Overwriting of existing main ROOT file " + path.string() + " denied");
}
LOG(WARNING) << "Main ROOT file " << path << " exists and will be overwritten.";
std::filesystem::remove(path);
}
modules_file_ = std::make_unique<TFile>(path.c_str(), "RECREATE");
if(modules_file_->IsZombie()) {
throw RuntimeError("Cannot create main ROOT file " + path.string());
}
modules_file_->cd();
// Loop through all non-global configurations
for(auto& config : configs) {
// Load library for each module. Libraries are named (by convention + CMAKE) libAllpixModule Name.suffix
std::string lib_name = std::string(ALLPIX_MODULE_PREFIX).append(config.getName()).append(SHARED_LIBRARY_SUFFIX);
LOG_PROGRESS(STATUS, "LOAD_LOOP") << "Loading module " << config.getName();
void* lib = nullptr;
bool load_error = false;
dlerror();
if(loaded_libraries_.count(lib_name) == 0) {
// If library is not loaded then try to load it first from the config directories
if(global_config.has("library_directories")) {
LOG(TRACE) << "Attempting to load library from configured paths";
auto lib_paths = global_config.getPathArray("library_directories", true);
for(const auto& lib_path : lib_paths) {
auto full_lib_path = lib_path;
full_lib_path /= lib_name;
LOG(TRACE) << "Searching in path " << full_lib_path;
// Check if the absolute file exists and try to load if it exists
std::ifstream check_file(full_lib_path);
if(check_file.good()) {
lib = dlopen(full_lib_path.c_str(), RTLD_NOW);
if(lib != nullptr) {
LOG(DEBUG) << "Found library in configuration specified directory at " << full_lib_path;
} else {
load_error = true;
}
break;
}
}
}
// Otherwise try to load from the standard paths if not found already
if(!load_error && lib == nullptr) {
lib = dlopen(lib_name.c_str(), RTLD_NOW);
if(lib != nullptr) {
Dl_info dl_info;
dl_info.dli_fname = "";
// workaround to get the location of the library
int ret = dladdr(dlsym(lib, ALLPIX_UNIQUE_FUNCTION), &dl_info);
if(ret != 0) {
LOG(DEBUG) << "Found library during global search in runtime paths at " << dl_info.dli_fname;
} else {
LOG(WARNING)
<< "Found library during global search but could not deduce location, likely broken library";
}
} else {
load_error = true;
}
}
} else {
// Otherwise just fetch it from the cache
lib = loaded_libraries_[lib_name];
}
// If library did not load then throw exception
if(load_error) {
const char* lib_error = dlerror();
// Find the name of the loaded library if it exists
std::string lib_error_str = lib_error;
size_t end_pos = lib_error_str.find(':');
std::string problem_lib;
if(end_pos != std::string::npos) {
problem_lib = lib_error_str.substr(0, end_pos);
}
// FIXME is checking the error in this way portable?
if(lib_error != nullptr && std::strstr(lib_error, "cannot allocate memory in static TLS block") != nullptr) {
LOG(ERROR) << "Library could not be loaded: not enough thread local storage available" << std::endl
<< "Try one of below workarounds:" << std::endl
<< "- Rerun library with the environmental variable LD_PRELOAD='" << problem_lib << "'"
<< std::endl
<< "- Recompile the library " << problem_lib << " with tls-model=global-dynamic";
} else if(lib_error != nullptr && std::strstr(lib_error, "cannot open shared object file") != nullptr &&
problem_lib.find(ALLPIX_MODULE_PREFIX) == std::string::npos) {
LOG(ERROR) << "Library could not be loaded: one of its dependencies is missing" << std::endl
<< "The name of the missing library is " << problem_lib << std::endl
<< "Please make sure the library is properly initialized and try again";
} else if(lib_error != nullptr && std::strstr(lib_error, "undefined symbol") != nullptr) {
LOG(ERROR) << "Library could not be loaded: library version does not match framework (undefined symbols)"
<< std::endl
<< "The name of the problematic library is " << problem_lib << std::endl
<< "Please make sure the library is compiled against the correct framework version";
} else {
LOG(ERROR) << "Library could not be loaded: it is not available" << std::endl
<< " - Did you enable the library during building? " << std::endl
<< " - Did you spell the library name correctly (case-sensitive)? ";
if(lib_error != nullptr) {
LOG(DEBUG) << "Detailed error: " << lib_error;
}
}
throw allpix::DynamicLibraryError(config.getName());
}
// Remember that this library was loaded
loaded_libraries_[lib_name] = lib;
// Check if this module is produced once, or once per detector
bool unique = true;
void* uniqueFunction = dlsym(loaded_libraries_[lib_name], ALLPIX_UNIQUE_FUNCTION);
// If the unique function was not found, throw an error
if(uniqueFunction == nullptr) {
LOG(ERROR) << "Module library is invalid or outdated: required interface function not found!";
throw allpix::DynamicLibraryError(config.getName());
} else {
unique = reinterpret_cast<bool (*)()>(uniqueFunction)(); // NOLINT
}
// Add the global internal parameters to the configuration
std::string global_dir = gSystem->pwd();
config.set<std::string>("_global_dir", global_dir);
// Set default input and output name
config.setDefault<std::string>("input", "");
config.setDefault<std::string>("output", "");
// Create the modules from the library depending on the module type
std::vector<std::pair<ModuleIdentifier, Module*>> mod_list;
if(unique) {
mod_list.emplace_back(create_unique_modules(loaded_libraries_[lib_name], config, messenger, geo_manager));
} else {
mod_list = create_detector_modules(loaded_libraries_[lib_name], config, messenger, geo_manager);
}
// Loop through all created instantiations
for(auto& id_mod : mod_list) {
// FIXME: This convert the module to an unique pointer. Check that this always works and we can do this earlier
std::unique_ptr<Module> mod(id_mod.second);
ModuleIdentifier identifier = id_mod.first;
// Check if the unique instantiation already exists
auto iter = std::find_if(id_to_module_.begin(), id_to_module_.end(), [&identifier](const auto& mapv) {
return identifier.getUniqueName() == mapv.first.getUniqueName();
});
if(iter != id_to_module_.end()) {
// Unique name exists, check if its needs to be replaced
if(identifier.getPriority() < iter->first.getPriority()) {
// Priority of new instance is higher, replace the instance
LOG(TRACE) << "Replacing model instance " << iter->first.getUniqueName()
<< " with instance with higher priority.";
// Drop configuration from replaced module
conf_manager_->dropInstanceConfiguration(iter->first);
module_execution_time_.erase(iter->second->get());
iter->second = modules_.erase(iter->second);
iter = id_to_module_.erase(iter);
} else {
// Priority is equal, raise an error
if(identifier.getPriority() == iter->first.getPriority()) {
throw AmbiguousInstantiationError(config.getName());
}
// Priority is lower, do not add this module to the run list, drop config
conf_manager_->dropInstanceConfiguration(identifier);
module_execution_time_.erase(id_mod.second);
continue;
}
}
// Save the identifier in the module
mod->set_identifier(identifier);
// Check if module can't run in parallel
auto module_can_parallelize = mod->multithreadingEnabled();
if(multithreading_flag_ && !module_can_parallelize) {
LOG(WARNING) << "Module instance " << mod->getUniqueName() << " prevents multithreading";
}
can_parallelize_ = module_can_parallelize && can_parallelize_;
// Add the new module to the run list
modules_.emplace_back(std::move(mod));
id_to_module_[identifier] = --modules_.end();
}
}
// Force MT off for all modules in case MT was not requested or some modules didn't enable multithreading
if(!(multithreading_flag_ && can_parallelize_)) {
for(auto& module : modules_) {
module->set_multithreading(false);
}
}
LOG_PROGRESS(STATUS, "LOAD_LOOP") << "Loaded " << configs.size() << " modules";
}
static inline Configuration&
add_instance_configuration(ConfigManager* config_manager, const ModuleIdentifier& identifier, const Configuration& config) {
try {
return config_manager->addInstanceConfiguration(identifier, config);
} catch(ModuleIdentifierAlreadyAddedError&) {
throw AmbiguousInstantiationError(identifier.getUniqueName());
}
}
std::pair<ModuleIdentifier, Module*> ModuleManager::create_unique_modules(void* library,
Configuration& config,
Messenger* messenger,
GeometryManager* geo_manager) {
// Make the vector to return
const std::string& module_name = config.getName();
// Return error if user tried to specialize the unique module:
if(config.has("name")) {
throw InvalidValueError(config, "name", "unique modules cannot be specialized using the \"name\" keyword.");
}
if(config.has("type")) {
throw InvalidValueError(config, "type", "unique modules cannot be specialized using the \"type\" keyword.");
}
// Create the identifier
std::string identifier_str;
if(!config.get<std::string>("input").empty()) {
identifier_str += config.get<std::string>("input");
}
if(!config.get<std::string>("output").empty()) {
if(!identifier_str.empty()) {
identifier_str += "_";
}
identifier_str += config.get<std::string>("output");
}
ModuleIdentifier identifier(module_name, std::move(identifier_str), 0);
// Get the generator function for this module
void* generator = dlsym(library, ALLPIX_GENERATOR_FUNCTION);
// If the generator function was not found, throw an error
if(generator == nullptr) {
LOG(ERROR) << "Module library is invalid or outdated: required interface function not found!";
throw allpix::DynamicLibraryError(module_name);
}
// Create and add module instance config
Configuration& instance_config = add_instance_configuration(conf_manager_, identifier, config);
// Specialize instance configuration
std::filesystem::path output_dir = instance_config.get<std::string>("_global_dir");
auto path_mod_name = identifier.getUniqueName();
std::replace(path_mod_name.begin(), path_mod_name.end(), ':', '_');
output_dir /= path_mod_name;
// Convert to correct generator function
auto module_generator = reinterpret_cast<Module* (*)(Configuration&, Messenger*, GeometryManager*)>(generator); // NOLINT
LOG(DEBUG) << "Creating unique instantiation " << identifier.getUniqueName();
// Get current time
auto start = std::chrono::steady_clock::now();
// Set module specific log settings
auto old_settings = set_module_before(identifier.getUniqueName(), instance_config, "C:");
// Build module
Module* module = module_generator(instance_config, messenger, geo_manager);
// Reset log
set_module_after(std::move(old_settings));
// Update execution time
auto end = std::chrono::steady_clock::now();
module_execution_time_[module] += std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
// Set the module directory afterwards to catch invalid access in constructor
module->get_configuration().set<std::string>("_output_dir", output_dir);
// Store the module and return it to the Module Manager
return std::make_pair(identifier, module);
}
std::vector<std::pair<ModuleIdentifier, Module*>> ModuleManager::create_detector_modules(void* library,
Configuration& config,
Messenger* messenger,
GeometryManager* geo_manager) {
const std::string& module_name = config.getName();
LOG(DEBUG) << "Creating instantions for detector module " << module_name;
// Create the basic identifier
std::string identifier;
if(!config.get<std::string>("input").empty()) {
identifier += "_";
identifier += config.get<std::string>("input");
}
if(!config.get<std::string>("output").empty()) {
identifier += "_";
identifier += config.get<std::string>("output");
}
// Open the library and get the module generator function
void* generator = dlsym(library, ALLPIX_GENERATOR_FUNCTION);
// If the generator function was not found, throw an error
if(generator == nullptr) {
LOG(ERROR) << "Module library is invalid or outdated: required interface function not found!";
throw allpix::DynamicLibraryError(module_name);
}
// Convert to correct generator function
auto module_generator =
reinterpret_cast<Module* (*)(Configuration&, Messenger*, std::shared_ptr<Detector>)>(generator); // NOLINT
// Handle empty type and name arrays:
bool instances_created = false;
std::vector<std::pair<std::shared_ptr<Detector>, ModuleIdentifier>> instantiations;
// Create all names first with highest priority
std::set<std::string> module_names;
if(config.has("name")) {
std::vector<std::string> names = config.getArray<std::string>("name");
for(auto& name : names) {
auto det = geo_manager->getDetector(name);
instantiations.emplace_back(det, ModuleIdentifier(module_name, det->getName() + identifier, 0));
// Save the name (to not instantiate it again later)
module_names.insert(name);
}
instances_created = !names.empty();
}
// Then create all types that are not yet name instantiated
if(config.has("type")) {
std::vector<std::string> types = config.getArray<std::string>("type");
for(auto& type : types) {
auto detectors = geo_manager->getDetectorsByType(type);
for(auto& det : detectors) {
// Skip all that were already added by name
if(module_names.find(det->getName()) != module_names.end()) {
continue;
}
instantiations.emplace_back(det, ModuleIdentifier(module_name, det->getName() + identifier, 1));
}
}
instances_created = !types.empty();
}
// Create for all detectors if no name / type provided
if(!instances_created) {
auto detectors = geo_manager->getDetectors();
for(auto& det : detectors) {
instantiations.emplace_back(det, ModuleIdentifier(module_name, det->getName() + identifier, 2));
}
}
// Construct instantiations from the list of requests
std::vector<std::pair<ModuleIdentifier, Module*>> module_list;
for(auto& instance : instantiations) {
LOG(DEBUG) << "Creating detector instantiation " << instance.second.getUniqueName();
// Get current time
auto start = std::chrono::steady_clock::now();
// Create and add module instance config
Configuration& instance_config = add_instance_configuration(conf_manager_, instance.second, config);
// Add internal module config
std::filesystem::path output_dir = instance_config.get<std::string>("_global_dir");
auto path_mod_name = instance.second.getUniqueName();
std::replace(path_mod_name.begin(), path_mod_name.end(), ':', '/');
output_dir /= path_mod_name;
// Set module specific log settings
auto old_settings = set_module_before(instance.second.getUniqueName(), instance_config, "C:");
// Build module
Module* module = module_generator(instance_config, messenger, instance.first);
// Reset logging
set_module_after(std::move(old_settings));
// Update execution time
auto end = std::chrono::steady_clock::now();
module_execution_time_[module] += std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
// Set the module directory afterwards to catch invalid access in constructor
module->get_configuration().set<std::string>("_output_dir", output_dir);
// Check if the module called the correct base class constructor
if(module->getDetector().get() != instance.first.get()) {
throw InvalidModuleStateException(
"Module " + module_name +
" does not call the correct base Module constructor: the provided detector should be forwarded");
}
// Store the module
module_list.emplace_back(instance.second, module);
}
return module_list;
}
// Helper functions to set the module specific log settings if necessary
std::tuple<LogLevel, LogFormat, std::string, uint64_t> ModuleManager::set_module_before(const std::string& name,
const Configuration& config,
const std::string& prefix,
const uint64_t event) {
// Set new log level if necessary
LogLevel prev_level = Log::getReportingLevel();
if(config.has("log_level")) {
auto log_level_string = config.get<std::string>("log_level");
std::transform(log_level_string.begin(), log_level_string.end(), log_level_string.begin(), ::toupper);
try {
LogLevel log_level = Log::getLevelFromString(log_level_string);
if(log_level != prev_level) {
LOG(TRACE) << "Local log level is set to " << log_level_string;
Log::setReportingLevel(log_level);
}
} catch(std::invalid_argument& e) {
throw InvalidValueError(config, "log_level", e.what());
}
}
// Set new log format if necessary
LogFormat prev_format = Log::getFormat();
if(config.has("log_format")) {
auto log_format_string = config.get<std::string>("log_format");
std::transform(log_format_string.begin(), log_format_string.end(), log_format_string.begin(), ::toupper);
try {
LogFormat log_format = Log::getFormatFromString(log_format_string);
if(log_format != prev_format) {
LOG(TRACE) << "Local log format is set to " << log_format_string;
Log::setFormat(log_format);
}
} catch(std::invalid_argument& e) {
throw InvalidValueError(config, "log_format", e.what());
}
}
// Set new section name
auto prev_section = Log::getSection();
Log::setSection(prefix + name);
// Set new event number:
auto prev_event = Log::getEventNum();
Log::setEventNum(event);
return std::make_tuple(prev_level, prev_format, prev_section, prev_event);
}
void ModuleManager::set_module_after(std::tuple<LogLevel, LogFormat, std::string, uint64_t> prev) {
// Reset the previous log level
LogLevel cur_level = Log::getReportingLevel();
LogLevel old_level = std::get<0>(prev);
if(cur_level != old_level) {
Log::setReportingLevel(old_level);
LOG(TRACE) << "Reset log level to global level of " << Log::getStringFromLevel(old_level);
}
// Reset the previous log format
LogFormat cur_format = Log::getFormat();
LogFormat old_format = std::get<1>(prev);
if(cur_format != old_format) {
Log::setFormat(old_format);
LOG(TRACE) << "Reset log format to global level of " << Log::getStringFromFormat(old_format);
}
// Reset section name
Log::setSection(std::get<2>(prev));
// Reset event number
Log::setEventNum(std::get<3>(prev));
}
void ModuleManager::initialize() {
Configuration& global_config = conf_manager_->getGlobalConfiguration();
LOG(TRACE) << "Register number of workers for possible multithreading";
if(multithreading_flag_ && can_parallelize_) {
// Try to fetch a suitable number of workers if multithreading is enabled
auto available_hardware_concurrency = std::thread::hardware_concurrency();
if(available_hardware_concurrency > 2u) {
// Try to be graceful and leave one core out if the number of workers was not specified
available_hardware_concurrency -= 1u;
}
number_of_threads_ = global_config.get<unsigned int>("workers", std::max(available_hardware_concurrency, 1u));
if(number_of_threads_ < 1) {
throw InvalidValueError(global_config, "workers", "number of workers should be larger than zero");
} else if(number_of_threads_ == 1) {
LOG(WARNING) << "Using multithreading with only one worker, this might be slower than multithreading=false";
}
if(number_of_threads_ > std::thread::hardware_concurrency()) {
LOG(WARNING) << "Using more workers (" << number_of_threads_
<< ") than supported concurrent threads on this system (" << std::thread::hardware_concurrency()
<< ") may impact simulation performance";
}
LOG(STATUS) << "Multithreading enabled, processing events in parallel on " << number_of_threads_
<< " worker threads";
// Adjust the modules buffer size according to the number of threads used
max_buffer_size_ = global_config.get<size_t>("buffer_per_worker", 256) * number_of_threads_;
if(max_buffer_size_ < number_of_threads_) {
throw InvalidValueError(global_config, "buffer_per_worker", "buffer per worker should be larger than one");
}
LOG(STATUS) << "Allocating a total of " << max_buffer_size_ << " event slots for buffered modules";
} else {
// Issue a warning in case MT was requested but we can't actually run in MT
if(multithreading_flag_ && !can_parallelize_) {
global_config.set<bool>("multithreading", false);
LOG(ERROR) << "Multithreading disabled since the current module configuration does not support it";
} else {
LOG(STATUS) << "Multithreading disabled";
}
}
// Store final number of threads to the config for later reference
global_config.set<size_t>("workers", number_of_threads_, true);
// Initialize the thread pool with the number of threads
if(number_of_threads_ > 0) {
ThreadPool::registerThreadCount(number_of_threads_);
}
// Book global performance histograms
if(global_config.get<bool>("performance_plots")) {
buffer_fill_level_ = CreateHistogram<TH1D>("buffer_fill_level",
"Buffer fill level;# buffered events;# events",
static_cast<int>(max_buffer_size_),
0,
static_cast<double>(max_buffer_size_));
event_time_ = CreateHistogram<TH1D>("event_time", "processing time per event;time [s];# events", 1000, 0, 10);
}
auto start_time = std::chrono::steady_clock::now();
LOG_PROGRESS(STATUS, "INIT_LOOP") << "Initializing " << modules_.size() << " module instantiations";
for(auto& module : modules_) {
LOG_PROGRESS(TRACE, "INIT_LOOP") << "Initializing " << module->get_identifier().getUniqueName();
// Pass the config manager to this instance
module->set_config_manager(conf_manager_);
// Create main ROOT directory for this module class if it does not exists yet
LOG(TRACE) << "Creating and accessing ROOT directory";
std::string module_name = module->get_configuration().getName();
auto* directory = modules_file_->GetDirectory(module_name.c_str());
if(directory == nullptr) {
directory = modules_file_->mkdir(module_name.c_str());
if(directory == nullptr) {
throw RuntimeError("Cannot create or access overall ROOT directory for module " + module_name);
}
}
directory->cd();
// Create local directory for this instance
TDirectory* local_directory = nullptr;
if(module->get_identifier().getIdentifier().empty()) {
local_directory = directory;
} else {
local_directory = directory->mkdir(module->get_identifier().getIdentifier().c_str());
if(local_directory == nullptr) {
throw RuntimeError("Cannot create or access local ROOT directory for module " + module->getUniqueName());
}
}
// Change to the directory and save it in the module
local_directory->cd();
module->set_ROOT_directory(local_directory);
// Get current time
auto start = std::chrono::steady_clock::now();
// Set module specific settings
auto old_settings = set_module_before(module->get_identifier().getUniqueName(), module->get_configuration(), "I:");
// Change to our ROOT directory
module->getROOTDirectory()->cd();
// Init module
module->initialize();
// Reset logging
set_module_after(std::move(old_settings));
// Update execution time
auto end = std::chrono::steady_clock::now();
module_execution_time_[module.get()] += std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
// Book per-module performance plots
if(global_config.get<bool>("performance_plots")) {
const auto& module_identifier = module->get_identifier();
const auto& identifier = module_identifier.getIdentifier();
const auto& name = (identifier.empty() ? module->get_configuration().getName() : identifier);
auto title = module->get_configuration().getName() + " event processing time " +
(!identifier.empty() ? "for " + identifier : "") + ";time [s];# events";
module_event_time_.emplace(module.get(), CreateHistogram<TH1D>(name.c_str(), title.c_str(), 1000, 0, 1));
}
}
LOG_PROGRESS(STATUS, "INIT_LOOP") << "Initialized " << modules_.size() << " module instantiations";
auto end_time = std::chrono::steady_clock::now();
initialize_time_ =
static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time).count());
}
void ModuleManager::run(RandomNumberGenerator& seeder) {
using namespace std::chrono_literals;
Configuration& global_config = conf_manager_->getGlobalConfiguration();
auto plot = global_config.get<bool>("performance_plots");
// Creates the thread pool
LOG(TRACE) << "Initializing thread pool with " << number_of_threads_ << " threads";
auto initialize_function =
[log_level = Log::getReportingLevel(), log_format = Log::getFormat(), modules_list = modules_]() {
// Initialize the threads to the same log level and format as the master setting
Log::setReportingLevel(log_level);
Log::setFormat(log_format);
// Call per-thread initialization of each module
for(const auto& module : modules_list) {
// Set module specific log settings
auto old_settings = ModuleManager::set_module_before(
module->get_identifier().getUniqueName(), module->get_configuration(), "T:");
LOG(TRACE) << "Initializing thread " << std::this_thread::get_id();
module->initializeThread();
// Reset logging
ModuleManager::set_module_after(std::move(old_settings));
}
};
// Finalize modules for each thread
auto finalize_function = [modules_list = modules_]() {
for(const auto& module : modules_list) {
// Set module specific log settings
auto old_settings = ModuleManager::set_module_before(
module->get_identifier().getUniqueName(), module->get_configuration(), "T:");
LOG(TRACE) << "Finalizing thread " << std::this_thread::get_id();
module->finalizeThread();
// Reset logging
ModuleManager::set_module_after(std::move(old_settings));
}
};
// Push 128 events for each worker to maintain enough work
auto max_queue_size = number_of_threads_ * 128;
thread_pool_ = std::make_unique<ThreadPool>(
number_of_threads_, max_queue_size, max_buffer_size_, initialize_function, finalize_function);
// Record the run stage total time
auto start_time = std::chrono::steady_clock::now();
// Push all events to the thread pool
std::atomic<uint64_t> finished_events{0};
std::atomic<uint64_t> aborted_events{0};
global_config.setDefault<uint64_t>("number_of_events", 1u);
auto number_of_events = global_config.get<uint64_t>("number_of_events");
// Skip first N events and discard their event seed from the seeder engine:
auto skip_events = global_config.get<uint64_t>("skip_events", 0);
seeder.discard(skip_events);
// Mark the first N events as completed for the thread pool. Since events start at one, always mark zero identifier as
// completed
for(size_t n = 0; n <= skip_events; n++) {
thread_pool_->markComplete(n);
}
LOG(STATUS) << "Starting event loop";
for(uint64_t i = 1 + skip_events; i <= number_of_events + skip_events; i++) {
// Check if run was aborted and stop pushing extra events to the threadpool
if(terminate_) {
LOG(INFO) << "Interrupting event loop after " << finished_events << " events because of request to terminate";
thread_pool_->destroy();
break;
}
// Get a new seed for the new event
uint64_t seed = seeder();
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wstrict-overflow"
auto event_function_with_module =
[this, plot, number_of_events, event_num = i, event_seed = seed, &finished_events, &aborted_events](
std::shared_ptr<Event> event,
ModuleList::iterator module_iter,
int64_t event_time,
auto&& self_func) mutable -> void {
// The RNG to be used by all events running on this thread
static thread_local RandomNumberGenerator random_engine;
// Create the event data
if(event == nullptr) {
event = std::make_shared<Event>(*this->messenger_, event_num, event_seed);
event->set_and_seed_random_engine(&random_engine);
LOG(INFO) << "Starting event " << event_num << " with seed " << event_seed;
} else {
LOG(TRACE) << "Continue with earlier event, restoring random seed";
event->set_and_seed_random_engine(&random_engine);
event->restore_random_engine_state();
}
while(module_iter != modules_.end()) {
auto module = *module_iter;
LOG_PROGRESS(TRACE, "EVENT_LOOP")
<< "Running event " << event->number << " [" << module->get_identifier().getUniqueName() << "]";
// Check if the module is satisfied to run
if(!module->check_delegates(this->messenger_, event.get())) {
LOG(TRACE) << "Not all required messages are received for " << module->get_identifier().getUniqueName()
<< ", skipping module!";
++module_iter;
continue;
}
// Get current time
auto start = std::chrono::steady_clock::now();
// Set module specific logging settings
auto old_settings = ModuleManager::set_module_before(
module->get_identifier().getUniqueName(), module->get_configuration(), "R:", event->number);
// Run module
bool stop = false;
bool abort = false;
try {
if(module->require_sequence() && event_num != thread_pool_->minimumUncompleted()) {
stop = true;
} else {
module->run(event.get());
}
} catch(const MissingDependenciesException& e) {
stop = true;
} catch(const AbortEventException& e) {
LOG(WARNING) << "Event aborted:" << std::endl << e.what();
abort = true;
} catch(const EndOfRunException& e) {
// Terminate if the module threw the EndOfRun request exception:
LOG(WARNING) << "Request to terminate:" << std::endl << e.what();
this->terminate_ = true;
}
// Reset logging
ModuleManager::set_module_after(std::move(old_settings));
// Update execution time
auto end = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
// Note: we do not need to lock a mutex because the std::map is not altered and its values are atomic.
this->module_execution_time_[module.get()] += duration;
if(plot) {
std::lock_guard<std::mutex> stat_lock{event->stats_mutex_};
event_time += duration;
this->module_event_time_[module.get()]->Fill(
std::chrono::duration<double>(std::chrono::nanoseconds(duration)).count());
}
if(abort) {
// Break module execution loop:
aborted_events++;
break;
}
if(stop) {
LOG(DEBUG) << "Event " << event->number
<< " was interrupted because of missing dependencies, rescheduling...";
// Store state of PRNG engine:
event->store_random_engine_state();
// Reschedule the event:
auto event_function = std::bind(self_func, event, module_iter, event_time, self_func);
auto future = thread_pool_->submit(event->number, event_function, false);
assert(future.valid() || !thread_pool_->valid());
auto buffered_events = thread_pool_->bufferedQueueSize();
LOG_PROGRESS(STATUS, "EVENT_LOOP") << "Buffered " << buffered_events << ", finished " << finished_events
<< " of " << number_of_events << " events";
return;
}
++module_iter;
}
#pragma GCC diagnostic pop
// All modules finished, mark as complete
thread_pool_->markComplete(event->number);
LOG(INFO) << "Finished event " << event_num << " with seed " << event_seed;
auto buffered_events = thread_pool_->bufferedQueueSize();
if(plot) {
this->buffer_fill_level_->Fill(static_cast<double>(buffered_events));
event_time_->Fill(static_cast<double>(event_time) * 1e-9);
}
finished_events++;
LOG_PROGRESS(STATUS, "EVENT_LOOP") << "Buffered " << buffered_events << ", finished " << finished_events
<< " of " << number_of_events << " events";
};
auto event_function =
std::bind(event_function_with_module, nullptr, modules_.begin(), 0, event_function_with_module);
auto future = thread_pool_->submit(event_function);
assert(future.valid() || !thread_pool_->valid());
thread_pool_->checkException();
}
LOG(TRACE) << "All events have been initialized. Waiting for thread pool to finish...";
// Wait for workers to finish
thread_pool_->wait();
// Check exception for last events
thread_pool_->checkException();
LOG_PROGRESS(STATUS, "EVENT_LOOP") << "Finished run of " << finished_events << " events";
global_config.set<uint64_t>("number_of_events", finished_events);
if(aborted_events > 0) {
LOG(WARNING) << "Aborted " << aborted_events << " events in this run";
}
auto end_time = std::chrono::steady_clock::now();
run_time_ = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time).count());
LOG(TRACE) << "Destroying thread pool";
thread_pool_.reset();
}
static std::string nanoseconds_to_time(uint64_t nanoseconds) {
auto duration = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::nanoseconds(nanoseconds));
std::string time_str;
auto hours = std::chrono::duration_cast<std::chrono::hours>(duration);
duration -= hours;
if(hours.count() > 0) {
time_str += std::to_string(hours.count());
time_str += " hours ";
}
auto minutes = std::chrono::duration_cast<std::chrono::minutes>(duration);
duration -= minutes;
if(minutes.count() > 0) {
time_str += std::to_string(minutes.count());
time_str += " minutes ";
}
time_str += std::to_string(duration.count());
time_str += " seconds";
return time_str;
}
void ModuleManager::finalize() {
auto start_time = std::chrono::steady_clock::now();
LOG_PROGRESS(TRACE, "FINALIZE_LOOP") << "Finalizing module instantiations";
for(auto& module : modules_) {
LOG_PROGRESS(TRACE, "FINALIZE_LOOP") << "Finalizing " << module->get_identifier().getUniqueName();
// Get current time
auto start = std::chrono::steady_clock::now();
// Set module specific log settings
auto old_settings = set_module_before(module->get_identifier().getUniqueName(), module->get_configuration(), "F:");
// Change to our ROOT directory
module->getROOTDirectory()->cd();
// Finalize module
module->finalize();
// Remove the pointer to the ROOT directory after finalizing
module->set_ROOT_directory(nullptr);
// Remove the config manager
module->set_config_manager(nullptr);
set_module_after(std::move(old_settings));
// Update execution time
auto end = std::chrono::steady_clock::now();
module_execution_time_[module.get()] += std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
}
// Store performance plots
Configuration& global_config = conf_manager_->getGlobalConfiguration();
if(global_config.get<bool>("performance_plots")) {
auto* perf_dir = modules_file_->mkdir("performance");
if(perf_dir == nullptr) {
throw RuntimeError("Cannot create or access ROOT directory for performance plots");
}
perf_dir->cd();
event_time_->Write();
buffer_fill_level_->Write();
for(auto& module : modules_) {
const auto& module_name = module->get_configuration().getName();
auto* mod_dir = perf_dir->GetDirectory(module_name.c_str());
if(mod_dir == nullptr) {
mod_dir = perf_dir->mkdir(module_name.c_str());
if(mod_dir == nullptr) {
throw RuntimeError("Cannot create or access ROOT directory for performance plots of module " +
module_name);
}
}
mod_dir->cd();
// Write the histogram
module_event_time_[module.get()]->Write();
}
}
// Close module ROOT file
modules_file_->Close();
LOG_PROGRESS(STATUS, "FINALIZE_LOOP") << "Finalization completed";
auto end_time = std::chrono::steady_clock::now();
finalize_time_ =
static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::nanoseconds>(end_time - start_time).count());
auto total_time = initialize_time_ + run_time_ + finalize_time_;
// Check for unused configuration keys:
auto unused_keys = global_config.getUnusedKeys();
if(!unused_keys.empty()) {
std::stringstream st;
st << "Unused configuration keys in global section:";
for(auto& key : unused_keys) {
st << std::endl << key;
}
LOG(WARNING) << st.str();
}
for(const auto& config : conf_manager_->getInstanceConfigurations()) {
auto unique_name = config.getName();
auto identifier = config.get<std::string>("identifier");
if(!identifier.empty()) {
unique_name += ":";
unique_name += identifier;
}
auto cfg_unused_keys = config.getUnusedKeys();
if(!cfg_unused_keys.empty()) {
std::stringstream st;
st << "Unused configuration keys in section " << unique_name << ":";
for(auto& key : cfg_unused_keys) {
st << std::endl << key;
}
LOG(WARNING) << st.str();
}
}
// Find the slowest module, and accumulate the total run-time for all modules
int64_t slowest_time = 0, total_module_time = 0;
std::string slowest_module;
for(auto& module_exec_time : module_execution_time_) {
total_module_time += module_exec_time.second;
if(module_exec_time.second > slowest_time) {
slowest_time = module_exec_time.second;
slowest_module = module_exec_time.first->getUniqueName();
}
}
LOG(STATUS) << "Executed " << modules_.size() << " instantiations in " << nanoseconds_to_time(total_time)
<< ", spending " << std::round((100 * slowest_time) / std::max(int64_t(1), total_module_time))
<< "% of time in slowest instantiation " << slowest_module;
for(auto& module : modules_) {
LOG(INFO) << " Module " << module->getUniqueName() << " took "
<< Units::display(module_execution_time_[module.get()].load(), {"s", "ms"});
}
auto processing_time = std::round(run_time_ / std::max(uint64_t(1), global_config.get<uint64_t>("number_of_events")));
LOG(STATUS) << "Average processing time is \x1B[1m" << Units::display(processing_time, {"ms", "us"})
<< "/event\x1B[0m, event generation at \x1B[1m"
<< std::round(global_config.get<double>("number_of_events") / Units::convert(run_time_, "s"))
<< " Hz\x1B[0m";
if(global_config.get<unsigned int>("workers") > 0) {
auto event_processing_time = std::round(processing_time * global_config.get<unsigned int>("workers"));
LOG(STATUS) << "This corresponds to a processing time of \x1B[1m"
<< Units::display(event_processing_time, {"ms", "us"}) << "/event\x1B[0m per worker";
}
}
void ModuleManager::terminate() {
if(!terminate_.exchange(true) && thread_pool_) {
thread_pool_->destroy();
}
}
Updated on 2025-02-27 at 14:14:46 +0000