src/core/module/ThreadPool.hpp

Definition of thread pool for concurrent events. More…

Namespaces

Name
allpix
Helper class to hold support layers for a detector model.

Classes

Name
class allpix::ThreadPool
Pool of threads where event tasks can be submitted to.
class allpix::ThreadPool::SafeQueue
Internal thread-safe queuing system.

Detailed Description

Definition of thread pool for concurrent events.

Copyright: Copyright (c) 2017-2024 CERN and the Allpix Squared authors. This software is distributed under the terms of the MIT License, copied verbatim in the file “LICENSE.md”. In applying this license, CERN does not waive the privileges and immunities granted to it by virtue of its status as an Intergovernmental Organization or submit itself to any jurisdiction. SPDX-License-Identifier: MIT

Source code


#ifndef ALLPIX_THREADPOOL_H
#define ALLPIX_THREADPOOL_H

#include <algorithm>
#include <atomic>
#include <exception>
#include <functional>
#include <future>
#include <map>
#include <memory>
#include <queue>
#include <set>
#include <thread>
#include <utility>

namespace allpix {
    class ThreadPool {
    public:
        template <typename T> class SafeQueue {
        public:
            SafeQueue(unsigned int max_standard_size, unsigned int max_priority_size);

            ~SafeQueue() { invalidate(); };

            bool pop(T& out, size_t buffer_left = 0);

            bool push(T value, bool wait = true);
            bool push(uint64_t n, T value, bool wait = true);

            void complete(uint64_t n);

            uint64_t currentId() const;

            bool valid() const;

            bool empty() const;

            size_t size() const;

            size_t prioritySize() const;

            void invalidate();

        private:
            std::atomic_bool valid_{true};
            mutable std::mutex mutex_{};
            std::queue<T> queue_;
            std::set<uint64_t> completed_ids_;
            uint64_t current_id_{0};
            using PQValue = std::pair<uint64_t, T>;
            std::priority_queue<PQValue, std::vector<PQValue>, std::greater<>> priority_queue_;
            std::atomic_size_t priority_queue_size_{0};
            std::condition_variable push_condition_;
            std::condition_variable pop_condition_;
            const size_t max_standard_size_;
            const size_t max_priority_size_;
        };

        ThreadPool(unsigned int num_threads,
                   unsigned int max_queue_size,
                   const std::function<void()>& worker_init_function = nullptr,
                   const std::function<void()>& worker_finalize_function = nullptr);

        ThreadPool(unsigned int num_threads,
                   unsigned int max_queue_size,
                   unsigned int max_buffered_size,
                   const std::function<void()>& worker_init_function = nullptr,
                   const std::function<void()>& worker_finalize_function = nullptr);


        ThreadPool(const ThreadPool& rhs) = delete;
        ThreadPool& operator=(const ThreadPool& rhs) = delete;

        template <typename Func, typename... Args> auto submit(Func&& func, Args&&... args);
        template <typename Func, typename... Args> auto submit(uint64_t n, Func&& func, Args&&... args);

        void markComplete(uint64_t n);

        uint64_t minimumUncompleted() const { return queue_.currentId(); }

        size_t queueSize() const { return queue_.size(); }

        size_t bufferedQueueSize() const { return queue_.prioritySize(); }

        void checkException();

        void wait();

        void destroy();

        bool valid();

        ~ThreadPool();

        static unsigned int threadNum();

        static unsigned int threadCount();

        static void registerThreadCount(unsigned int cnt);

    private:
        void worker(size_t min_thread_buffer,
                    const std::function<void()>& initialize_function,
                    const std::function<void()>& finalize_function);

        // The queue holds the task functions to be executed by the workers
        using Task = std::unique_ptr<std::packaged_task<void()>>;
        SafeQueue<Task> queue_;
        bool with_buffered_{true};
        std::function<void()> finalize_function_{};

        std::atomic_bool done_{false};

        std::atomic<unsigned int> run_cnt_{0};
        mutable std::mutex run_mutex_{};
        std::condition_variable run_condition_;
        std::vector<std::thread> threads_;

        std::atomic_flag has_exception_{false};
        std::exception_ptr exception_ptr_{nullptr};

        static std::map<std::thread::id, unsigned int> thread_nums_;
        static std::atomic_uint thread_cnt_;
        static std::atomic_uint thread_total_;
    };
} // namespace allpix

// Include template members
#include "ThreadPool.tpp"

#endif /* ALLPIX_THREADPOOL_H */

Updated on 2025-02-27 at 14:14:46 +0000