1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
// Copyright (c) 2026 Michael Vandeberg
3  
// Copyright (c) 2026 Michael Vandeberg
4  
//
4  
//
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7  
//
7  
//
8  
// Official repository: https://github.com/boostorg/capy
8  
// Official repository: https://github.com/boostorg/capy
9  
//
9  
//
10  

10  

11  
#include <boost/capy/ex/thread_pool.hpp>
11  
#include <boost/capy/ex/thread_pool.hpp>
12  
#include <boost/capy/detail/intrusive.hpp>
12  
#include <boost/capy/detail/intrusive.hpp>
13  
#include <boost/capy/test/thread_name.hpp>
13  
#include <boost/capy/test/thread_name.hpp>
14  
#include <atomic>
14  
#include <atomic>
15  
#include <condition_variable>
15  
#include <condition_variable>
16  
#include <cstdio>
16  
#include <cstdio>
17  
#include <mutex>
17  
#include <mutex>
18  
#include <thread>
18  
#include <thread>
19  
#include <vector>
19  
#include <vector>
20  

20  

21  
/*
21  
/*
22  
    Thread pool implementation using a shared work queue.
22  
    Thread pool implementation using a shared work queue.
23  

23  

24  
    Work items are coroutine handles wrapped in intrusive list nodes, stored
24  
    Work items are coroutine handles wrapped in intrusive list nodes, stored
25  
    in a single queue protected by a mutex. Worker threads wait on a
25  
    in a single queue protected by a mutex. Worker threads wait on a
26  
    condition_variable until work is available or stop is requested.
26  
    condition_variable until work is available or stop is requested.
27  

27  

28  
    Threads are started lazily on first post() via std::call_once to avoid
28  
    Threads are started lazily on first post() via std::call_once to avoid
29  
    spawning threads for pools that are constructed but never used. Each
29  
    spawning threads for pools that are constructed but never used. Each
30  
    thread is named with a configurable prefix plus index for debugger
30  
    thread is named with a configurable prefix plus index for debugger
31  
    visibility.
31  
    visibility.
32  

32  

33  
    Shutdown sequence: stop() sets the stop flag and notifies all threads,
33  
    Shutdown sequence: stop() sets the stop flag and notifies all threads,
34  
    then the destructor joins threads and destroys any remaining queued
34  
    then the destructor joins threads and destroys any remaining queued
35  
    work without executing it.
35  
    work without executing it.
36  
*/
36  
*/
37  

37  

38  
namespace boost {
38  
namespace boost {
39  
namespace capy {
39  
namespace capy {
40  

40  

41  
//------------------------------------------------------------------------------
41  
//------------------------------------------------------------------------------
42  

42  

43  
class thread_pool::impl
43  
class thread_pool::impl
44  
{
44  
{
45  
    struct work : detail::intrusive_queue<work>::node
45  
    struct work : detail::intrusive_queue<work>::node
46  
    {
46  
    {
47  
        coro h_;
47  
        coro h_;
48  

48  

49  
        explicit work(coro h) noexcept
49  
        explicit work(coro h) noexcept
50  
            : h_(h)
50  
            : h_(h)
51  
        {
51  
        {
52  
        }
52  
        }
53  

53  

54  
        void run()
54  
        void run()
55  
        {
55  
        {
56  
            auto h = h_;
56  
            auto h = h_;
57  
            delete this;
57  
            delete this;
58  
            h.resume();
58  
            h.resume();
59  
        }
59  
        }
60  

60  

61  
        void destroy()
61  
        void destroy()
62  
        {
62  
        {
63  
            delete this;
63  
            delete this;
64  
        }
64  
        }
65  
    };
65  
    };
66  

66  

67  
    std::mutex mutex_;
67  
    std::mutex mutex_;
68  
    std::condition_variable cv_;
68  
    std::condition_variable cv_;
69  
    detail::intrusive_queue<work> q_;
69  
    detail::intrusive_queue<work> q_;
70  
    std::vector<std::thread> threads_;
70  
    std::vector<std::thread> threads_;
71  
    std::atomic<bool> stop_{false};
71  
    std::atomic<bool> stop_{false};
72  
    std::size_t num_threads_;
72  
    std::size_t num_threads_;
73  
    char thread_name_prefix_[13]{};  // 12 chars max + null terminator
73  
    char thread_name_prefix_[13]{};  // 12 chars max + null terminator
74  
    std::once_flag start_flag_;
74  
    std::once_flag start_flag_;
75  

75  

76  
public:
76  
public:
77  
    ~impl()
77  
    ~impl()
78  
    {
78  
    {
79  
        stop();
79  
        stop();
80  
        for(auto& t : threads_)
80  
        for(auto& t : threads_)
81  
            if(t.joinable())
81  
            if(t.joinable())
82  
                t.join();
82  
                t.join();
83  

83  

84  
        while(auto* w = q_.pop())
84  
        while(auto* w = q_.pop())
85  
            w->destroy();
85  
            w->destroy();
86  
    }
86  
    }
87  

87  

88  
    impl(std::size_t num_threads, std::string_view thread_name_prefix)
88  
    impl(std::size_t num_threads, std::string_view thread_name_prefix)
89  
        : num_threads_(num_threads)
89  
        : num_threads_(num_threads)
90  
    {
90  
    {
91  
        if(num_threads_ == 0)
91  
        if(num_threads_ == 0)
92  
            num_threads_ = std::thread::hardware_concurrency();
92  
            num_threads_ = std::thread::hardware_concurrency();
93  
        if(num_threads_ == 0)
93  
        if(num_threads_ == 0)
94  
            num_threads_ = 1;
94  
            num_threads_ = 1;
95  

95  

96  
        // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
96  
        // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
97  
        auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
97  
        auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
98  
        thread_name_prefix_[n] = '\0';
98  
        thread_name_prefix_[n] = '\0';
99  
    }
99  
    }
100  

100  

101  
    void
101  
    void
102  
    post(coro h)
102  
    post(coro h)
103  
    {
103  
    {
104  
        ensure_started();
104  
        ensure_started();
105  
        auto* w = new work(h);
105  
        auto* w = new work(h);
106  
        {
106  
        {
107  
            std::lock_guard<std::mutex> lock(mutex_);
107  
            std::lock_guard<std::mutex> lock(mutex_);
108  
            q_.push(w);
108  
            q_.push(w);
109  
        }
109  
        }
110  
        cv_.notify_one();
110  
        cv_.notify_one();
111  
    }
111  
    }
112  

112  

113  
    void
113  
    void
114  
    stop() noexcept
114  
    stop() noexcept
115  
    {
115  
    {
116  
        stop_.store(true, std::memory_order_release);
116  
        stop_.store(true, std::memory_order_release);
117  
        cv_.notify_all();
117  
        cv_.notify_all();
118  
    }
118  
    }
119  

119  

120  
private:
120  
private:
121  
    void
121  
    void
122  
    ensure_started()
122  
    ensure_started()
123  
    {
123  
    {
124  
        std::call_once(start_flag_, [this]{
124  
        std::call_once(start_flag_, [this]{
125  
            threads_.reserve(num_threads_);
125  
            threads_.reserve(num_threads_);
126  
            for(std::size_t i = 0; i < num_threads_; ++i)
126  
            for(std::size_t i = 0; i < num_threads_; ++i)
127  
                threads_.emplace_back([this, i]{ run(i); });
127  
                threads_.emplace_back([this, i]{ run(i); });
128  
        });
128  
        });
129  
    }
129  
    }
130  

130  

131  
    void
131  
    void
132  
    run(std::size_t index)
132  
    run(std::size_t index)
133  
    {
133  
    {
134  
        // Build name; set_current_thread_name truncates to platform limits.
134  
        // Build name; set_current_thread_name truncates to platform limits.
135  
        char name[16];
135  
        char name[16];
136  
        std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
136  
        std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
137  
        set_current_thread_name(name);
137  
        set_current_thread_name(name);
138  

138  

139  
        for(;;)
139  
        for(;;)
140  
        {
140  
        {
141  
            work* w = nullptr;
141  
            work* w = nullptr;
142  
            {
142  
            {
143  
                std::unique_lock<std::mutex> lock(mutex_);
143  
                std::unique_lock<std::mutex> lock(mutex_);
144  
                cv_.wait(lock, [this]{
144  
                cv_.wait(lock, [this]{
145  
                    return !q_.empty() ||
145  
                    return !q_.empty() ||
146  
                        stop_.load(std::memory_order_acquire);
146  
                        stop_.load(std::memory_order_acquire);
147  
                });
147  
                });
148  
                if(stop_.load(std::memory_order_acquire) && q_.empty())
148  
                if(stop_.load(std::memory_order_acquire) && q_.empty())
149  
                    return;
149  
                    return;
150  
                w = q_.pop();
150  
                w = q_.pop();
151  
            }
151  
            }
152  
            if(w)
152  
            if(w)
153  
                w->run();
153  
                w->run();
154  
        }
154  
        }
155  
    }
155  
    }
156  
};
156  
};
157  

157  

158  
//------------------------------------------------------------------------------
158  
//------------------------------------------------------------------------------
159  

159  

160  
thread_pool::
160  
thread_pool::
161  
~thread_pool()
161  
~thread_pool()
162  
{
162  
{
163  
    shutdown();
163  
    shutdown();
164  
    destroy();
164  
    destroy();
165  
    delete impl_;
165  
    delete impl_;
166  
}
166  
}
167  

167  

168  
thread_pool::
168  
thread_pool::
169  
thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
169  
thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
170  
    : impl_(new impl(num_threads, thread_name_prefix))
170  
    : impl_(new impl(num_threads, thread_name_prefix))
171  
{
171  
{
172  
    this->set_frame_allocator(std::allocator<void>{});
172  
    this->set_frame_allocator(std::allocator<void>{});
173  
}
173  
}
174  

174  

175  
void
175  
void
176  
thread_pool::
176  
thread_pool::
177  
stop() noexcept
177  
stop() noexcept
178  
{
178  
{
179  
    impl_->stop();
179  
    impl_->stop();
180  
}
180  
}
181  

181  

182  
//------------------------------------------------------------------------------
182  
//------------------------------------------------------------------------------
183  

183  

184  
void
184  
void
185  
thread_pool::executor_type::
185  
thread_pool::executor_type::
186  
post(coro h) const
186  
post(coro h) const
187  
{
187  
{
188  
    pool_->impl_->post(h);
188  
    pool_->impl_->post(h);
189  
}
189  
}
190  

190  

191  
} // capy
191  
} // capy
192  
} // boost
192  
} // boost