libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

80.5% Lines (391/486) 89.1% Functions (41/46) 68.7% Branches (204/297)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/posix/resolver_service.hpp"
18 #include "src/detail/posix/signals.hpp"
19
20 #include <boost/corosio/detail/except.hpp>
21 #include <boost/corosio/detail/thread_local_ptr.hpp>
22
23 #include <chrono>
24 #include <limits>
25 #include <utility>
26
27 #include <errno.h>
28 #include <fcntl.h>
29 #include <sys/epoll.h>
30 #include <sys/eventfd.h>
31 #include <sys/socket.h>
32 #include <sys/timerfd.h>
33 #include <unistd.h>
34
35 /*
36 epoll Scheduler - Single Reactor Model
37 ======================================
38
39 This scheduler uses a thread coordination strategy to provide handler
40 parallelism and avoid the thundering herd problem.
41 Instead of all threads blocking on epoll_wait(), one thread becomes the
42 "reactor" while others wait on a condition variable for handler work.
43
44 Thread Model
45 ------------
46 - ONE thread runs epoll_wait() at a time (the reactor thread)
47 - OTHER threads wait on cond_ (condition variable) for handlers
48 - When work is posted, exactly one waiting thread wakes via notify_one()
49 - This matches Windows IOCP semantics where N posted items wake N threads
50
51 Event Loop Structure (do_one)
52 -----------------------------
53 1. Lock mutex, try to pop handler from queue
54 2. If got handler: execute it (unlocked), return
55 3. If queue empty and no reactor running: become reactor
56 - Run epoll_wait (unlocked), queue I/O completions, loop back
57 4. If queue empty and reactor running: wait on condvar for work
58
59 The task_running_ flag ensures only one thread owns epoll_wait().
60 After the reactor queues I/O completions, it loops back to try getting
61 a handler, giving priority to handler execution over more I/O polling.
62
63 Signaling State (state_)
64 ------------------------
65 The state_ variable encodes two pieces of information:
66 - Bit 0: signaled flag (1 = signaled, persists until cleared)
67 - Upper bits: waiter count (each waiter adds 2 before blocking)
68
69 This allows efficient coordination:
70 - Signalers only call notify when waiters exist (state_ > 1)
71 - Waiters check if already signaled before blocking (fast-path)
72
73 Wake Coordination (wake_one_thread_and_unlock)
74 ----------------------------------------------
75 When posting work:
76 - If waiters exist (state_ > 1): signal and notify_one()
77 - Else if reactor running: interrupt via eventfd write
78 - Else: no-op (thread will find work when it checks queue)
79
80 This avoids waking threads unnecessarily. With cascading wakes,
81 each handler execution wakes at most one additional thread if
82 more work exists in the queue.
83
84 Work Counting
85 -------------
86 outstanding_work_ tracks pending operations. When it hits zero, run()
87 returns. Each operation increments on start, decrements on completion.
88
89 Timer Integration
90 -----------------
91 Timers are handled by timer_service. The reactor adjusts epoll_wait
92 timeout to wake for the nearest timer expiry. When a new timer is
93 scheduled earlier than current, timer_service calls interrupt_reactor()
94 to re-evaluate the timeout.
95 */
96
97 namespace boost::corosio::detail {
98
99 struct scheduler_context
100 {
101 epoll_scheduler const* key;
102 scheduler_context* next;
103 op_queue private_queue;
104 long private_outstanding_work;
105
106 161 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
107 161 : key(k)
108 161 , next(n)
109 161 , private_outstanding_work(0)
110 {
111 161 }
112 };
113
114 namespace {
115
116 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
117
118 struct thread_context_guard
119 {
120 scheduler_context frame_;
121
122 161 explicit thread_context_guard(
123 epoll_scheduler const* ctx) noexcept
124 161 : frame_(ctx, context_stack.get())
125 {
126 161 context_stack.set(&frame_);
127 161 }
128
129 161 ~thread_context_guard() noexcept
130 {
131
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 161 times.
161 if (!frame_.private_queue.empty())
132 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
133 161 context_stack.set(frame_.next);
134 161 }
135 };
136
137 scheduler_context*
138 253841 find_context(epoll_scheduler const* self) noexcept
139 {
140
2/2
✓ Branch 1 taken 252192 times.
✓ Branch 2 taken 1649 times.
253841 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
141
1/2
✓ Branch 0 taken 252192 times.
✗ Branch 1 not taken.
252192 if (c->key == self)
142 252192 return c;
143 1649 return nullptr;
144 }
145
146 } // namespace
147
148 void
149 87421 descriptor_state::
150 operator()()
151 {
152 87421 is_enqueued_.store(false, std::memory_order_relaxed);
153
154 // Take ownership of impl ref set by close_socket() to prevent
155 // the owning impl from being freed while we're executing
156 87421 auto prevent_impl_destruction = std::move(impl_ref_);
157
158 87421 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
159
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 87421 times.
87421 if (ev == 0)
160 {
161 scheduler_->compensating_work_started();
162 return;
163 }
164
165 87421 op_queue local_ops;
166
167 87421 int err = 0;
168
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 87420 times.
87421 if (ev & EPOLLERR)
169 {
170 1 socklen_t len = sizeof(err);
171
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
172 err = errno;
173
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (err == 0)
174 1 err = EIO;
175 }
176
177 {
178
1/1
✓ Branch 1 taken 87421 times.
87421 std::lock_guard lock(mutex);
179
2/2
✓ Branch 0 taken 34703 times.
✓ Branch 1 taken 52718 times.
87421 if (ev & EPOLLIN)
180 {
181
2/2
✓ Branch 0 taken 2650 times.
✓ Branch 1 taken 32053 times.
34703 if (read_op)
182 {
183 2650 auto* rd = read_op;
184
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2650 times.
2650 if (err)
185 rd->complete(err, 0);
186 else
187 2650 rd->perform_io();
188
189
2/4
✓ Branch 0 taken 2650 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2650 times.
2650 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
190 {
191 rd->errn = 0;
192 }
193 else
194 {
195 2650 read_op = nullptr;
196 2650 local_ops.push(rd);
197 }
198 }
199 else
200 {
201 32053 read_ready = true;
202 }
203 }
204
2/2
✓ Branch 0 taken 84822 times.
✓ Branch 1 taken 2599 times.
87421 if (ev & EPOLLOUT)
205 {
206
3/4
✓ Branch 0 taken 82220 times.
✓ Branch 1 taken 2602 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 82220 times.
84822 bool had_write_op = (connect_op || write_op);
207
2/2
✓ Branch 0 taken 2602 times.
✓ Branch 1 taken 82220 times.
84822 if (connect_op)
208 {
209 2602 auto* cn = connect_op;
210
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2602 times.
2602 if (err)
211 cn->complete(err, 0);
212 else
213 2602 cn->perform_io();
214 2602 connect_op = nullptr;
215 2602 local_ops.push(cn);
216 }
217
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 84822 times.
84822 if (write_op)
218 {
219 auto* wr = write_op;
220 if (err)
221 wr->complete(err, 0);
222 else
223 wr->perform_io();
224
225 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
226 {
227 wr->errn = 0;
228 }
229 else
230 {
231 write_op = nullptr;
232 local_ops.push(wr);
233 }
234 }
235
2/2
✓ Branch 0 taken 82220 times.
✓ Branch 1 taken 2602 times.
84822 if (!had_write_op)
236 82220 write_ready = true;
237 }
238
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 87420 times.
87421 if (err)
239 {
240
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (read_op)
241 {
242 read_op->complete(err, 0);
243 local_ops.push(std::exchange(read_op, nullptr));
244 }
245
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (write_op)
246 {
247 write_op->complete(err, 0);
248 local_ops.push(std::exchange(write_op, nullptr));
249 }
250
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (connect_op)
251 {
252 connect_op->complete(err, 0);
253 local_ops.push(std::exchange(connect_op, nullptr));
254 }
255 }
256 87421 }
257
258 // Execute first handler inline — the scheduler's work_cleanup
259 // accounts for this as the "consumed" work item
260 87421 scheduler_op* first = local_ops.pop();
261
2/2
✓ Branch 0 taken 5252 times.
✓ Branch 1 taken 82169 times.
87421 if (first)
262 {
263
1/1
✓ Branch 1 taken 5252 times.
5252 scheduler_->post_deferred_completions(local_ops);
264
1/1
✓ Branch 1 taken 5252 times.
5252 (*first)();
265 }
266 else
267 {
268 82169 scheduler_->compensating_work_started();
269 }
270 87421 }
271
272 189 epoll_scheduler::
273 epoll_scheduler(
274 capy::execution_context& ctx,
275 189 int)
276 189 : epoll_fd_(-1)
277 189 , event_fd_(-1)
278 189 , timer_fd_(-1)
279 189 , outstanding_work_(0)
280 189 , stopped_(false)
281 189 , shutdown_(false)
282 189 , task_running_{false}
283 189 , task_interrupted_(false)
284 378 , state_(0)
285 {
286 189 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
287
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (epoll_fd_ < 0)
288 detail::throw_system_error(make_err(errno), "epoll_create1");
289
290 189 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
291
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (event_fd_ < 0)
292 {
293 int errn = errno;
294 ::close(epoll_fd_);
295 detail::throw_system_error(make_err(errn), "eventfd");
296 }
297
298 189 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
299
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (timer_fd_ < 0)
300 {
301 int errn = errno;
302 ::close(event_fd_);
303 ::close(epoll_fd_);
304 detail::throw_system_error(make_err(errn), "timerfd_create");
305 }
306
307 189 epoll_event ev{};
308 189 ev.events = EPOLLIN | EPOLLET;
309 189 ev.data.ptr = nullptr;
310
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
311 {
312 int errn = errno;
313 ::close(timer_fd_);
314 ::close(event_fd_);
315 ::close(epoll_fd_);
316 detail::throw_system_error(make_err(errn), "epoll_ctl");
317 }
318
319 189 epoll_event timer_ev{};
320 189 timer_ev.events = EPOLLIN | EPOLLERR;
321 189 timer_ev.data.ptr = &timer_fd_;
322
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
323 {
324 int errn = errno;
325 ::close(timer_fd_);
326 ::close(event_fd_);
327 ::close(epoll_fd_);
328 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
329 }
330
331
1/1
✓ Branch 1 taken 189 times.
189 timer_svc_ = &get_timer_service(ctx, *this);
332
1/1
✓ Branch 3 taken 189 times.
189 timer_svc_->set_on_earliest_changed(
333 timer_service::callback(
334 this,
335 [](void* p) {
336 2794 auto* self = static_cast<epoll_scheduler*>(p);
337 2794 self->timerfd_stale_.store(true, std::memory_order_release);
338
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2794 times.
2794 if (self->task_running_.load(std::memory_order_acquire))
339 self->interrupt_reactor();
340 2794 }));
341
342 // Initialize resolver service
343
1/1
✓ Branch 1 taken 189 times.
189 get_resolver_service(ctx, *this);
344
345 // Initialize signal service
346
1/1
✓ Branch 1 taken 189 times.
189 get_signal_service(ctx, *this);
347
348 // Push task sentinel to interleave reactor runs with handler execution
349 189 completed_ops_.push(&task_op_);
350 189 }
351
352 378 epoll_scheduler::
353 189 ~epoll_scheduler()
354 {
355
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (timer_fd_ >= 0)
356 189 ::close(timer_fd_);
357
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
358 189 ::close(event_fd_);
359
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (epoll_fd_ >= 0)
360 189 ::close(epoll_fd_);
361 378 }
362
363 void
364 189 epoll_scheduler::
365 shutdown()
366 {
367 {
368
1/1
✓ Branch 1 taken 189 times.
189 std::unique_lock lock(mutex_);
369 189 shutdown_ = true;
370
371
2/2
✓ Branch 1 taken 189 times.
✓ Branch 2 taken 189 times.
378 while (auto* h = completed_ops_.pop())
372 {
373
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (h == &task_op_)
374 189 continue;
375 lock.unlock();
376 h->destroy();
377 lock.lock();
378 189 }
379
380 189 signal_all(lock);
381 189 }
382
383 189 outstanding_work_.store(0, std::memory_order_release);
384
385
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
386 189 interrupt_reactor();
387 189 }
388
389 void
390 4473 epoll_scheduler::
391 post(capy::coro h) const
392 {
393 struct post_handler final
394 : scheduler_op
395 {
396 capy::coro h_;
397
398 explicit
399 4473 post_handler(capy::coro h)
400 4473 : h_(h)
401 {
402 4473 }
403
404 8946 ~post_handler() = default;
405
406 4473 void operator()() override
407 {
408 4473 auto h = h_;
409
1/2
✓ Branch 0 taken 4473 times.
✗ Branch 1 not taken.
4473 delete this;
410
1/1
✓ Branch 1 taken 4473 times.
4473 h.resume();
411 4473 }
412
413 void destroy() override
414 {
415 delete this;
416 }
417 };
418
419
1/1
✓ Branch 1 taken 4473 times.
4473 auto ph = std::make_unique<post_handler>(h);
420
421 // Fast path: same thread posts to private queue
422 // Only count locally; work_cleanup batches to global counter
423
2/2
✓ Branch 1 taken 2850 times.
✓ Branch 2 taken 1623 times.
4473 if (auto* ctx = find_context(this))
424 {
425 2850 ++ctx->private_outstanding_work;
426 2850 ctx->private_queue.push(ph.release());
427 2850 return;
428 }
429
430 // Slow path: cross-thread post requires mutex
431 1623 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
432
433
1/1
✓ Branch 1 taken 1623 times.
1623 std::unique_lock lock(mutex_);
434 1623 completed_ops_.push(ph.release());
435
1/1
✓ Branch 1 taken 1623 times.
1623 wake_one_thread_and_unlock(lock);
436 4473 }
437
438 void
439 167199 epoll_scheduler::
440 post(scheduler_op* h) const
441 {
442 // Fast path: same thread posts to private queue
443 // Only count locally; work_cleanup batches to global counter
444
2/2
✓ Branch 1 taken 167173 times.
✓ Branch 2 taken 26 times.
167199 if (auto* ctx = find_context(this))
445 {
446 167173 ++ctx->private_outstanding_work;
447 167173 ctx->private_queue.push(h);
448 167173 return;
449 }
450
451 // Slow path: cross-thread post requires mutex
452 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
453
454
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
455 26 completed_ops_.push(h);
456
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
457 26 }
458
459 void
460 3292 epoll_scheduler::
461 on_work_started() noexcept
462 {
463 3292 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
464 3292 }
465
466 void
467 3260 epoll_scheduler::
468 on_work_finished() noexcept
469 {
470
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3260 times.
6520 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
471 stop();
472 3260 }
473
474 bool
475 3080 epoll_scheduler::
476 running_in_this_thread() const noexcept
477 {
478
2/2
✓ Branch 1 taken 2870 times.
✓ Branch 2 taken 210 times.
3080 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
479
1/2
✓ Branch 0 taken 2870 times.
✗ Branch 1 not taken.
2870 if (c->key == this)
480 2870 return true;
481 210 return false;
482 }
483
484 void
485 37 epoll_scheduler::
486 stop()
487 {
488
1/1
✓ Branch 1 taken 37 times.
37 std::unique_lock lock(mutex_);
489
2/2
✓ Branch 0 taken 19 times.
✓ Branch 1 taken 18 times.
37 if (!stopped_)
490 {
491 19 stopped_ = true;
492 19 signal_all(lock);
493
1/1
✓ Branch 1 taken 19 times.
19 interrupt_reactor();
494 }
495 37 }
496
497 bool
498 16 epoll_scheduler::
499 stopped() const noexcept
500 {
501 16 std::unique_lock lock(mutex_);
502 32 return stopped_;
503 16 }
504
505 void
506 49 epoll_scheduler::
507 restart()
508 {
509
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
510 49 stopped_ = false;
511 49 }
512
513 std::size_t
514 175 epoll_scheduler::
515 run()
516 {
517
2/2
✓ Branch 1 taken 28 times.
✓ Branch 2 taken 147 times.
350 if (outstanding_work_.load(std::memory_order_acquire) == 0)
518 {
519
1/1
✓ Branch 1 taken 28 times.
28 stop();
520 28 return 0;
521 }
522
523 147 thread_context_guard ctx(this);
524
1/1
✓ Branch 1 taken 147 times.
147 std::unique_lock lock(mutex_);
525
526 147 std::size_t n = 0;
527 for (;;)
528 {
529
3/3
✓ Branch 1 taken 259225 times.
✓ Branch 3 taken 147 times.
✓ Branch 4 taken 259078 times.
259225 if (!do_one(lock, -1, &ctx.frame_))
530 147 break;
531
1/2
✓ Branch 1 taken 259078 times.
✗ Branch 2 not taken.
259078 if (n != (std::numeric_limits<std::size_t>::max)())
532 259078 ++n;
533
2/2
✓ Branch 1 taken 91901 times.
✓ Branch 2 taken 167177 times.
259078 if (!lock.owns_lock())
534
1/1
✓ Branch 1 taken 91901 times.
91901 lock.lock();
535 }
536 147 return n;
537 147 }
538
539 std::size_t
540 2 epoll_scheduler::
541 run_one()
542 {
543
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
544 {
545 stop();
546 return 0;
547 }
548
549 2 thread_context_guard ctx(this);
550
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
551
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
552 2 }
553
554 std::size_t
555 14 epoll_scheduler::
556 wait_one(long usec)
557 {
558
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 9 times.
28 if (outstanding_work_.load(std::memory_order_acquire) == 0)
559 {
560
1/1
✓ Branch 1 taken 5 times.
5 stop();
561 5 return 0;
562 }
563
564 9 thread_context_guard ctx(this);
565
1/1
✓ Branch 1 taken 9 times.
9 std::unique_lock lock(mutex_);
566
1/1
✓ Branch 1 taken 9 times.
9 return do_one(lock, usec, &ctx.frame_);
567 9 }
568
569 std::size_t
570 2 epoll_scheduler::
571 poll()
572 {
573
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
574 {
575
1/1
✓ Branch 1 taken 1 time.
1 stop();
576 1 return 0;
577 }
578
579 1 thread_context_guard ctx(this);
580
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
581
582 1 std::size_t n = 0;
583 for (;;)
584 {
585
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
586 1 break;
587
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
588 2 ++n;
589
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
590
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
591 }
592 1 return n;
593 1 }
594
595 std::size_t
596 4 epoll_scheduler::
597 poll_one()
598 {
599
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
600 {
601
1/1
✓ Branch 1 taken 2 times.
2 stop();
602 2 return 0;
603 }
604
605 2 thread_context_guard ctx(this);
606
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
607
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
608 2 }
609
610 void
611 5276 epoll_scheduler::
612 register_descriptor(int fd, descriptor_state* desc) const
613 {
614 5276 epoll_event ev{};
615 5276 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
616 5276 ev.data.ptr = desc;
617
618
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5276 times.
5276 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
619 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
620
621 5276 desc->registered_events = ev.events;
622 5276 desc->fd = fd;
623 5276 desc->scheduler_ = this;
624
625
1/1
✓ Branch 1 taken 5276 times.
5276 std::lock_guard lock(desc->mutex);
626 5276 desc->read_ready = false;
627 5276 desc->write_ready = false;
628 5276 }
629
630 void
631 5276 epoll_scheduler::
632 deregister_descriptor(int fd) const
633 {
634 5276 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
635 5276 }
636
637 void
638 5378 epoll_scheduler::
639 work_started() const noexcept
640 {
641 5378 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
642 5378 }
643
644 void
645 9902 epoll_scheduler::
646 work_finished() const noexcept
647 {
648
2/2
✓ Branch 0 taken 148 times.
✓ Branch 1 taken 9754 times.
19804 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
649 {
650 // Last work item completed - wake all threads so they can exit.
651 // signal_all() wakes threads waiting on the condvar.
652 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
653 // Both are needed because they target different blocking mechanisms.
654 148 std::unique_lock lock(mutex_);
655 148 signal_all(lock);
656
5/6
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 145 times.
✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 3 times.
✓ Branch 6 taken 145 times.
148 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
657 {
658 3 task_interrupted_ = true;
659 3 lock.unlock();
660 3 interrupt_reactor();
661 }
662 148 }
663 9902 }
664
665 void
666 82169 epoll_scheduler::
667 compensating_work_started() const noexcept
668 {
669 82169 auto* ctx = find_context(this);
670
1/2
✓ Branch 0 taken 82169 times.
✗ Branch 1 not taken.
82169 if (ctx)
671 82169 ++ctx->private_outstanding_work;
672 82169 }
673
674 void
675 epoll_scheduler::
676 drain_thread_queue(op_queue& queue, long count) const
677 {
678 // Note: outstanding_work_ was already incremented when posting
679 std::unique_lock lock(mutex_);
680 completed_ops_.splice(queue);
681 if (count > 0)
682 maybe_unlock_and_signal_one(lock);
683 }
684
685 void
686 5252 epoll_scheduler::
687 post_deferred_completions(op_queue& ops) const
688 {
689
1/2
✓ Branch 1 taken 5252 times.
✗ Branch 2 not taken.
5252 if (ops.empty())
690 5252 return;
691
692 // Fast path: if on scheduler thread, use private queue
693 if (auto* ctx = find_context(this))
694 {
695 ctx->private_queue.splice(ops);
696 return;
697 }
698
699 // Slow path: add to global queue and wake a thread
700 std::unique_lock lock(mutex_);
701 completed_ops_.splice(ops);
702 wake_one_thread_and_unlock(lock);
703 }
704
705 void
706 237 epoll_scheduler::
707 interrupt_reactor() const
708 {
709 // Only write if not already armed to avoid redundant writes
710 237 bool expected = false;
711
2/2
✓ Branch 1 taken 224 times.
✓ Branch 2 taken 13 times.
237 if (eventfd_armed_.compare_exchange_strong(expected, true,
712 std::memory_order_release, std::memory_order_relaxed))
713 {
714 224 std::uint64_t val = 1;
715
1/1
✓ Branch 1 taken 224 times.
224 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
716 }
717 237 }
718
719 void
720 356 epoll_scheduler::
721 signal_all(std::unique_lock<std::mutex>&) const
722 {
723 356 state_ |= 1;
724 356 cond_.notify_all();
725 356 }
726
727 bool
728 1649 epoll_scheduler::
729 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
730 {
731 1649 state_ |= 1;
732
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1649 times.
1649 if (state_ > 1)
733 {
734 lock.unlock();
735 cond_.notify_one();
736 return true;
737 }
738 1649 return false;
739 }
740
741 void
742 338836 epoll_scheduler::
743 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
744 {
745 338836 state_ |= 1;
746 338836 bool have_waiters = state_ > 1;
747 338836 lock.unlock();
748
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 338836 times.
338836 if (have_waiters)
749 cond_.notify_one();
750 338836 }
751
752 void
753 epoll_scheduler::
754 clear_signal() const
755 {
756 state_ &= ~std::size_t(1);
757 }
758
759 void
760 epoll_scheduler::
761 wait_for_signal(std::unique_lock<std::mutex>& lock) const
762 {
763 while ((state_ & 1) == 0)
764 {
765 state_ += 2;
766 cond_.wait(lock);
767 state_ -= 2;
768 }
769 }
770
771 void
772 epoll_scheduler::
773 wait_for_signal_for(
774 std::unique_lock<std::mutex>& lock,
775 long timeout_us) const
776 {
777 if ((state_ & 1) == 0)
778 {
779 state_ += 2;
780 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
781 state_ -= 2;
782 }
783 }
784
785 void
786 1649 epoll_scheduler::
787 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
788 {
789
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1649 times.
1649 if (maybe_unlock_and_signal_one(lock))
790 return;
791
792
5/6
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 1623 times.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 26 times.
✓ Branch 6 taken 1623 times.
1649 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
793 {
794 26 task_interrupted_ = true;
795 26 lock.unlock();
796 26 interrupt_reactor();
797 }
798 else
799 {
800 1623 lock.unlock();
801 }
802 }
803
804 /** RAII guard for handler execution work accounting.
805
806 Handler consumes 1 work item, may produce N new items via fast-path posts.
807 Net change = N - 1:
808 - If N > 1: add (N-1) to global (more work produced than consumed)
809 - If N == 1: net zero, do nothing
810 - If N < 1: call work_finished() (work consumed, may trigger stop)
811
812 Also drains private queue to global for other threads to process.
813 */
814 struct work_cleanup
815 {
816 epoll_scheduler const* scheduler;
817 std::unique_lock<std::mutex>* lock;
818 scheduler_context* ctx;
819
820 259093 ~work_cleanup()
821 {
822
1/2
✓ Branch 0 taken 259093 times.
✗ Branch 1 not taken.
259093 if (ctx)
823 {
824 259093 long produced = ctx->private_outstanding_work;
825
2/2
✓ Branch 0 taken 47 times.
✓ Branch 1 taken 259046 times.
259093 if (produced > 1)
826 47 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
827
2/2
✓ Branch 0 taken 9744 times.
✓ Branch 1 taken 249302 times.
259046 else if (produced < 1)
828 9744 scheduler->work_finished();
829 // produced == 1: net zero, handler consumed what it produced
830 259093 ctx->private_outstanding_work = 0;
831
832
2/2
✓ Branch 1 taken 167180 times.
✓ Branch 2 taken 91913 times.
259093 if (!ctx->private_queue.empty())
833 {
834 167180 lock->lock();
835 167180 scheduler->completed_ops_.splice(ctx->private_queue);
836 }
837 }
838 else
839 {
840 // No thread context - slow-path op was already counted globally
841 scheduler->work_finished();
842 }
843 259093 }
844 };
845
846 /** RAII guard for reactor work accounting.
847
848 Reactor only produces work via timer/signal callbacks posting handlers.
849 Unlike handler execution which consumes 1, the reactor consumes nothing.
850 All produced work must be flushed to global counter.
851 */
852 struct task_cleanup
853 {
854 epoll_scheduler const* scheduler;
855 std::unique_lock<std::mutex>* lock;
856 scheduler_context* ctx;
857
858 85164 ~task_cleanup()
859 85164 {
860
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 85164 times.
85164 if (!ctx)
861 return;
862
863
2/2
✓ Branch 0 taken 2795 times.
✓ Branch 1 taken 82369 times.
85164 if (ctx->private_outstanding_work > 0)
864 {
865 2795 scheduler->outstanding_work_.fetch_add(
866 2795 ctx->private_outstanding_work, std::memory_order_relaxed);
867 2795 ctx->private_outstanding_work = 0;
868 }
869
870
2/2
✓ Branch 1 taken 2795 times.
✓ Branch 2 taken 82369 times.
85164 if (!ctx->private_queue.empty())
871 {
872
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2795 times.
2795 if (!lock->owns_lock())
873 lock->lock();
874 2795 scheduler->completed_ops_.splice(ctx->private_queue);
875 }
876 85164 }
877 };
878
879 void
880 5585 epoll_scheduler::
881 update_timerfd() const
882 {
883 5585 auto nearest = timer_svc_->nearest_expiry();
884
885 5585 itimerspec ts{};
886 5585 int flags = 0;
887
888
3/3
✓ Branch 2 taken 5585 times.
✓ Branch 4 taken 5544 times.
✓ Branch 5 taken 41 times.
5585 if (nearest == timer_service::time_point::max())
889 {
890 // No timers - disarm by setting to 0 (relative)
891 }
892 else
893 {
894 5544 auto now = std::chrono::steady_clock::now();
895
3/3
✓ Branch 1 taken 5544 times.
✓ Branch 4 taken 20 times.
✓ Branch 5 taken 5524 times.
5544 if (nearest <= now)
896 {
897 // Use 1ns instead of 0 - zero disarms the timerfd
898 20 ts.it_value.tv_nsec = 1;
899 }
900 else
901 {
902 5524 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
903
1/1
✓ Branch 1 taken 5524 times.
11048 nearest - now).count();
904 5524 ts.it_value.tv_sec = nsec / 1000000000;
905 5524 ts.it_value.tv_nsec = nsec % 1000000000;
906 // Ensure non-zero to avoid disarming if duration rounds to 0
907
3/4
✓ Branch 0 taken 5520 times.
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5520 times.
5524 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
908 ts.it_value.tv_nsec = 1;
909 }
910 }
911
912
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5585 times.
5585 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
913 detail::throw_system_error(make_err(errno), "timerfd_settime");
914 5585 }
915
916 void
917 85164 epoll_scheduler::
918 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
919 {
920
2/2
✓ Branch 0 taken 79743 times.
✓ Branch 1 taken 5421 times.
85164 int timeout_ms = task_interrupted_ ? 0 : -1;
921
922
2/2
✓ Branch 1 taken 5421 times.
✓ Branch 2 taken 79743 times.
85164 if (lock.owns_lock())
923
1/1
✓ Branch 1 taken 5421 times.
5421 lock.unlock();
924
925 85164 task_cleanup on_exit{this, &lock, ctx};
926
927 // Flush deferred timerfd programming before blocking
928
2/2
✓ Branch 1 taken 2790 times.
✓ Branch 2 taken 82374 times.
85164 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
929
1/1
✓ Branch 1 taken 2790 times.
2790 update_timerfd();
930
931 // Event loop runs without mutex held
932 epoll_event events[128];
933
1/1
✓ Branch 1 taken 85164 times.
85164 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
934
935
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 85164 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
85164 if (nfds < 0 && errno != EINTR)
936 detail::throw_system_error(make_err(errno), "epoll_wait");
937
938 85164 bool check_timers = false;
939 85164 op_queue local_ops;
940
941 // Process events without holding the mutex
942
2/2
✓ Branch 0 taken 90251 times.
✓ Branch 1 taken 85164 times.
175415 for (int i = 0; i < nfds; ++i)
943 {
944
2/2
✓ Branch 0 taken 35 times.
✓ Branch 1 taken 90216 times.
90251 if (events[i].data.ptr == nullptr)
945 {
946 std::uint64_t val;
947
1/1
✓ Branch 1 taken 35 times.
35 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
948 35 eventfd_armed_.store(false, std::memory_order_relaxed);
949 35 continue;
950 35 }
951
952
2/2
✓ Branch 0 taken 2795 times.
✓ Branch 1 taken 87421 times.
90216 if (events[i].data.ptr == &timer_fd_)
953 {
954 std::uint64_t expirations;
955
1/1
✓ Branch 1 taken 2795 times.
2795 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
956 2795 check_timers = true;
957 2795 continue;
958 2795 }
959
960 // Deferred I/O: just set ready events and enqueue descriptor
961 // No per-descriptor mutex locking in reactor hot path!
962 87421 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
963 87421 desc->add_ready_events(events[i].events);
964
965 // Only enqueue if not already enqueued
966 87421 bool expected = false;
967
1/2
✓ Branch 1 taken 87421 times.
✗ Branch 2 not taken.
87421 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
968 std::memory_order_release, std::memory_order_relaxed))
969 {
970 87421 local_ops.push(desc);
971 }
972 }
973
974 // Process timers only when timerfd fires
975
2/2
✓ Branch 0 taken 2795 times.
✓ Branch 1 taken 82369 times.
85164 if (check_timers)
976 {
977
1/1
✓ Branch 1 taken 2795 times.
2795 timer_svc_->process_expired();
978
1/1
✓ Branch 1 taken 2795 times.
2795 update_timerfd();
979 }
980
981
1/1
✓ Branch 1 taken 85164 times.
85164 lock.lock();
982
983
2/2
✓ Branch 1 taken 47286 times.
✓ Branch 2 taken 37878 times.
85164 if (!local_ops.empty())
984 47286 completed_ops_.splice(local_ops);
985 85164 }
986
987 std::size_t
988 259241 epoll_scheduler::
989 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
990 {
991 for (;;)
992 {
993
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 344403 times.
344405 if (stopped_)
994 2 return 0;
995
996 344403 scheduler_op* op = completed_ops_.pop();
997
998 // Handle reactor sentinel - time to poll for I/O
999
2/2
✓ Branch 0 taken 85306 times.
✓ Branch 1 taken 259097 times.
344403 if (op == &task_op_)
1000 {
1001 85306 bool more_handlers = !completed_ops_.empty();
1002
1003 // Nothing to run the reactor for: no pending work to wait on,
1004 // or caller requested a non-blocking poll
1005
4/4
✓ Branch 0 taken 5563 times.
✓ Branch 1 taken 79743 times.
✓ Branch 2 taken 142 times.
✓ Branch 3 taken 85164 times.
90869 if (!more_handlers &&
1006
3/4
✓ Branch 1 taken 5421 times.
✓ Branch 2 taken 142 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 5421 times.
11126 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1007 timeout_us == 0))
1008 {
1009 142 completed_ops_.push(&task_op_);
1010 142 return 0;
1011 }
1012
1013
3/4
✓ Branch 0 taken 5421 times.
✓ Branch 1 taken 79743 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5421 times.
85164 task_interrupted_ = more_handlers || timeout_us == 0;
1014 85164 task_running_.store(true, std::memory_order_release);
1015
1016
2/2
✓ Branch 0 taken 79743 times.
✓ Branch 1 taken 5421 times.
85164 if (more_handlers)
1017 79743 unlock_and_signal_one(lock);
1018
1019 85164 run_task(lock, ctx);
1020
1021 85164 task_running_.store(false, std::memory_order_relaxed);
1022 85164 completed_ops_.push(&task_op_);
1023 85164 continue;
1024 85164 }
1025
1026 // Handle operation
1027
2/2
✓ Branch 0 taken 259093 times.
✓ Branch 1 taken 4 times.
259097 if (op != nullptr)
1028 {
1029
1/2
✓ Branch 1 taken 259093 times.
✗ Branch 2 not taken.
259093 if (!completed_ops_.empty())
1030
1/1
✓ Branch 1 taken 259093 times.
259093 unlock_and_signal_one(lock);
1031 else
1032 lock.unlock();
1033
1034 259093 work_cleanup on_exit{this, &lock, ctx};
1035
1036
1/1
✓ Branch 1 taken 259093 times.
259093 (*op)();
1037 259093 return 1;
1038 259093 }
1039
1040 // No pending work to wait on, or caller requested non-blocking poll
1041
2/6
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1042 timeout_us == 0)
1043 4 return 0;
1044
1045 clear_signal();
1046 if (timeout_us < 0)
1047 wait_for_signal(lock);
1048 else
1049 wait_for_signal_for(lock, timeout_us);
1050 85164 }
1051 }
1052
1053 } // namespace boost::corosio::detail
1054
1055 #endif
1056