src/core/module/ThreadPool.cpp

Implementation of thread pool for concurrent events. More…

Detailed Description

Implementation of thread pool for concurrent events.

Copyright: Copyright (c) 2017-2024 CERN and the Allpix Squared authors. This software is distributed under the terms of the MIT License, copied verbatim in the file “LICENSE.md”. In applying this license, CERN does not waive the privileges and immunities granted to it by virtue of its status as an Intergovernmental Organization or submit itself to any jurisdiction. SPDX-License-Identifier: MIT

Source code


#include "ThreadPool.hpp"

#include <cassert>

#include "Module.hpp"

using namespace allpix;

std::map<std::thread::id, unsigned int> ThreadPool::thread_nums_;
std::atomic_uint ThreadPool::thread_cnt_{1u};
std::atomic_uint ThreadPool::thread_total_{1u};

ThreadPool::ThreadPool(unsigned int num_threads,
                       unsigned int max_queue_size,
                       const std::function<void()>& worker_init_function,
                       const std::function<void()>& worker_finalize_function)
    : ThreadPool(num_threads, max_queue_size, 0, worker_init_function, worker_finalize_function) {
    with_buffered_ = false; // NOLINT
}

ThreadPool::ThreadPool(unsigned int num_threads,
                       unsigned int max_queue_size,
                       unsigned int max_buffered_size,
                       const std::function<void()>& worker_init_function,
                       const std::function<void()>& worker_finalize_function)
    : queue_(max_queue_size, max_buffered_size) {
    assert(max_buffered_size == 0 || max_buffered_size >= num_threads);
    // Create threads
    try {
        for(unsigned int i = 0u; i < num_threads; ++i) {
            threads_.emplace_back(&ThreadPool::worker,
                                  this,
                                  std::min(num_threads, max_buffered_size),
                                  worker_init_function,
                                  worker_finalize_function);
        }

        // When running single-threadedly, execute initialize function directly and store finalize function for later
        if(threads_.empty()) {
            if(worker_init_function) {
                worker_init_function();
            }
            finalize_function_ = worker_finalize_function;
        }
    } catch(...) {
        destroy();
        throw;
    }

    // No threads are currently working
    run_cnt_ = 0;
}

ThreadPool::~ThreadPool() { destroy(); }

void ThreadPool::markComplete(uint64_t n) { queue_.complete(n); }

void ThreadPool::checkException() {
    // If exception has been thrown, destroy pool and propagate it
    if(exception_ptr_) {
        destroy();
        Log::setSection("");
        std::rethrow_exception(exception_ptr_);
    }
}

void ThreadPool::wait() {
    std::unique_lock<std::mutex> lock{run_mutex_};
    run_condition_.wait(lock, [this]() { return exception_ptr_ != nullptr || (run_cnt_ == 0 || done_ == true); });
}

void ThreadPool::worker(size_t min_thread_buffer,
                        const std::function<void()>& initialize_function,
                        const std::function<void()>& finalize_function) {
    try {
        // Register the thread
        unsigned int thread_num = thread_cnt_++;
        assert(thread_num < thread_total_);
        thread_nums_[std::this_thread::get_id()] = thread_num;

        // Initialize the worker
        if(initialize_function) {
            initialize_function();
        }

        while(!done_) {
            Task task{nullptr};

            if(queue_.pop(task, min_thread_buffer)) {
                // Execute task
                (*task)();
                // Fetch the future to propagate exceptions
                task->get_future().get();
                // Update the run count and propagate update
                std::unique_lock<std::mutex> lock{run_mutex_};
                if(--run_cnt_ == 0) {
                    run_condition_.notify_all();
                }
            }
        }

        // Execute the cleanup function at the end of run
        if(finalize_function) {
            finalize_function();
        }
    } catch(...) {
        // Check if the first exception thrown
        std::unique_lock<std::mutex> lock{run_mutex_};
        if(!has_exception_.test_and_set()) {
            // Save the first exception
            exception_ptr_ = std::current_exception();
            // Invalidate the queue to terminate other threads
            queue_.invalidate();
        }
        // Propagate that the worker terminated
        run_condition_.notify_all();
    }
}

void ThreadPool::destroy() {
    // Lock run mutex to synchronize with queue
    std::unique_lock<std::mutex> lock{run_mutex_};
    done_ = true;
    queue_.invalidate();
    run_condition_.notify_all();
    lock.unlock();

    for(auto& thread : threads_) {
        if(thread.joinable()) {
            thread.join();
        }
    }

    // Execute the cleanup function at the end of run if running single-threaded
    if(threads_.empty() && finalize_function_) {
        finalize_function_();
    }
}

bool ThreadPool::valid() { return queue_.valid() && !done_; }

unsigned int ThreadPool::threadNum() {
    auto iter = thread_nums_.find(std::this_thread::get_id());
    if(iter != thread_nums_.end()) {
        return iter->second;
    }
    return 0;
}

unsigned int ThreadPool::threadCount() { return thread_total_; }

void ThreadPool::registerThreadCount(unsigned int cnt) { thread_total_ += cnt; }

Updated on 2024-12-13 at 08:31:37 +0000