libs/corosio/src/corosio/src/detail/epoll/acceptors.cpp

81.9% Lines (194/237) 100.0% Functions (18/18) 52.3% Branches (68/130)
libs/corosio/src/corosio/src/detail/epoll/acceptors.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/acceptors.hpp"
15 #include "src/detail/epoll/sockets.hpp"
16 #include "src/detail/endpoint_convert.hpp"
17 #include "src/detail/make_err.hpp"
18
19 #include <utility>
20
21 #include <errno.h>
22 #include <netinet/in.h>
23 #include <sys/epoll.h>
24 #include <sys/socket.h>
25 #include <unistd.h>
26
27 namespace boost::corosio::detail {
28
29 void
30 6 epoll_accept_op::
31 cancel() noexcept
32 {
33
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (acceptor_impl_)
34 6 acceptor_impl_->cancel_single_op(*this);
35 else
36 request_cancel();
37 6 }
38
39 void
40 2610 epoll_accept_op::
41 operator()()
42 {
43 2610 stop_cb.reset();
44
45
3/4
✓ Branch 0 taken 2610 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 2601 times.
✓ Branch 4 taken 9 times.
2610 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
46
47
1/2
✓ Branch 0 taken 2610 times.
✗ Branch 1 not taken.
2610 if (ec_out)
48 {
49
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 2601 times.
2610 if (cancelled.load(std::memory_order_acquire))
50 9 *ec_out = capy::error::canceled;
51
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2601 times.
2601 else if (errn != 0)
52 *ec_out = make_err(errn);
53 else
54 2601 *ec_out = {};
55 }
56
57
3/4
✓ Branch 0 taken 2601 times.
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 2601 times.
✗ Branch 3 not taken.
2610 if (success && accepted_fd >= 0)
58 {
59
1/2
✓ Branch 0 taken 2601 times.
✗ Branch 1 not taken.
2601 if (acceptor_impl_)
60 {
61 2601 auto* socket_svc = static_cast<epoll_acceptor_impl*>(acceptor_impl_)
62 2601 ->service().socket_service();
63
1/2
✓ Branch 0 taken 2601 times.
✗ Branch 1 not taken.
2601 if (socket_svc)
64 {
65
1/1
✓ Branch 1 taken 2601 times.
2601 auto& impl = static_cast<epoll_socket_impl&>(socket_svc->create_impl());
66 2601 impl.set_socket(accepted_fd);
67
68 // Register accepted socket with epoll (edge-triggered mode)
69 2601 impl.desc_state_.fd = accepted_fd;
70 {
71
1/1
✓ Branch 1 taken 2601 times.
2601 std::lock_guard lock(impl.desc_state_.mutex);
72 2601 impl.desc_state_.read_op = nullptr;
73 2601 impl.desc_state_.write_op = nullptr;
74 2601 impl.desc_state_.connect_op = nullptr;
75 2601 }
76
1/1
✓ Branch 2 taken 2601 times.
2601 socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
77
78 2601 sockaddr_in local_addr{};
79 2601 socklen_t local_len = sizeof(local_addr);
80 2601 sockaddr_in remote_addr{};
81 2601 socklen_t remote_len = sizeof(remote_addr);
82
83 2601 endpoint local_ep, remote_ep;
84
1/2
✓ Branch 1 taken 2601 times.
✗ Branch 2 not taken.
2601 if (::getsockname(accepted_fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
85 2601 local_ep = from_sockaddr_in(local_addr);
86
1/2
✓ Branch 1 taken 2601 times.
✗ Branch 2 not taken.
2601 if (::getpeername(accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr), &remote_len) == 0)
87 2601 remote_ep = from_sockaddr_in(remote_addr);
88
89 2601 impl.set_endpoints(local_ep, remote_ep);
90
91
1/2
✓ Branch 0 taken 2601 times.
✗ Branch 1 not taken.
2601 if (impl_out)
92 2601 *impl_out = &impl;
93
94 2601 accepted_fd = -1;
95 }
96 else
97 {
98 if (ec_out && !*ec_out)
99 *ec_out = make_err(ENOENT);
100 ::close(accepted_fd);
101 accepted_fd = -1;
102 if (impl_out)
103 *impl_out = nullptr;
104 }
105 }
106 else
107 {
108 ::close(accepted_fd);
109 accepted_fd = -1;
110 if (impl_out)
111 *impl_out = nullptr;
112 }
113 2601 }
114 else
115 {
116
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (accepted_fd >= 0)
117 {
118 ::close(accepted_fd);
119 accepted_fd = -1;
120 }
121
122
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (peer_impl)
123 {
124 peer_impl->release();
125 peer_impl = nullptr;
126 }
127
128
1/2
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
9 if (impl_out)
129 9 *impl_out = nullptr;
130 }
131
132 // Move to stack before resuming. See epoll_op::operator()() for rationale.
133 2610 capy::executor_ref saved_ex( std::move( ex ) );
134 2610 capy::coro saved_h( std::move( h ) );
135 2610 auto prevent_premature_destruction = std::move(impl_ptr);
136
1/1
✓ Branch 1 taken 2610 times.
2610 saved_ex.dispatch( saved_h );
137 2610 }
138
139 64 epoll_acceptor_impl::
140 64 epoll_acceptor_impl(epoll_acceptor_service& svc) noexcept
141 64 : svc_(svc)
142 {
143 64 }
144
145 void
146 64 epoll_acceptor_impl::
147 release()
148 {
149 64 close_socket();
150 64 svc_.destroy_acceptor_impl(*this);
151 64 }
152
153 std::coroutine_handle<>
154 2610 epoll_acceptor_impl::
155 accept(
156 std::coroutine_handle<> h,
157 capy::executor_ref ex,
158 std::stop_token token,
159 std::error_code* ec,
160 io_object::io_object_impl** impl_out)
161 {
162 2610 auto& op = acc_;
163 2610 op.reset();
164 2610 op.h = h;
165 2610 op.ex = ex;
166 2610 op.ec_out = ec;
167 2610 op.impl_out = impl_out;
168 2610 op.fd = fd_;
169 2610 op.start(token, this);
170
171 2610 sockaddr_in addr{};
172 2610 socklen_t addrlen = sizeof(addr);
173 int accepted;
174 do {
175
1/1
✓ Branch 1 taken 2610 times.
2610 accepted = ::accept4(fd_, reinterpret_cast<sockaddr*>(&addr),
176 &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
177
3/4
✓ Branch 0 taken 2608 times.
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2608 times.
2610 } while (accepted < 0 && errno == EINTR);
178
179
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2608 times.
2610 if (accepted >= 0)
180 {
181 {
182
1/1
✓ Branch 1 taken 2 times.
2 std::lock_guard lock(desc_state_.mutex);
183 2 desc_state_.read_ready = false;
184 2 }
185 2 op.accepted_fd = accepted;
186 2 op.complete(0, 0);
187
1/1
✓ Branch 1 taken 2 times.
2 op.impl_ptr = shared_from_this();
188
1/1
✓ Branch 1 taken 2 times.
2 svc_.post(&op);
189 // completion is always posted to scheduler queue, never inline.
190 2 return std::noop_coroutine();
191 }
192
193
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 2608 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
2608 if (errno == EAGAIN || errno == EWOULDBLOCK)
194 {
195 2608 svc_.work_started();
196
1/1
✓ Branch 1 taken 2608 times.
2608 op.impl_ptr = shared_from_this();
197
198
1/1
✓ Branch 1 taken 2608 times.
2608 std::lock_guard lock(desc_state_.mutex);
199
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2608 times.
2608 if (desc_state_.read_ready)
200 {
201 desc_state_.read_ready = false;
202 op.perform_io();
203 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
204 {
205 op.errn = 0;
206 if (op.cancelled.load(std::memory_order_acquire))
207 {
208 svc_.post(&op);
209 svc_.work_finished();
210 }
211 else
212 {
213 desc_state_.read_op = &op;
214 }
215 }
216 else
217 {
218 svc_.post(&op);
219 svc_.work_finished();
220 }
221 }
222 else
223 {
224
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2608 times.
2608 if (op.cancelled.load(std::memory_order_acquire))
225 {
226 svc_.post(&op);
227 svc_.work_finished();
228 }
229 else
230 {
231 2608 desc_state_.read_op = &op;
232 }
233 }
234 2608 return std::noop_coroutine();
235 2608 }
236
237 op.complete(errno, 0);
238 op.impl_ptr = shared_from_this();
239 svc_.post(&op);
240 // completion is always posted to scheduler queue, never inline.
241 return std::noop_coroutine();
242 }
243
244 void
245 129 epoll_acceptor_impl::
246 cancel() noexcept
247 {
248 129 std::shared_ptr<epoll_acceptor_impl> self;
249 try {
250
1/1
✓ Branch 1 taken 129 times.
129 self = shared_from_this();
251 } catch (const std::bad_weak_ptr&) {
252 return;
253 }
254
255 129 acc_.request_cancel();
256
257 129 epoll_op* claimed = nullptr;
258 {
259 129 std::lock_guard lock(desc_state_.mutex);
260
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 126 times.
129 if (desc_state_.read_op == &acc_)
261 3 claimed = std::exchange(desc_state_.read_op, nullptr);
262 129 }
263
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 126 times.
129 if (claimed)
264 {
265 3 acc_.impl_ptr = self;
266 3 svc_.post(&acc_);
267 3 svc_.work_finished();
268 }
269 129 }
270
271 void
272 6 epoll_acceptor_impl::
273 cancel_single_op(epoll_op& op) noexcept
274 {
275 6 op.request_cancel();
276
277 6 epoll_op* claimed = nullptr;
278 {
279 6 std::lock_guard lock(desc_state_.mutex);
280
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (desc_state_.read_op == &op)
281 6 claimed = std::exchange(desc_state_.read_op, nullptr);
282 6 }
283
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (claimed)
284 {
285 try {
286
1/1
✓ Branch 1 taken 6 times.
6 op.impl_ptr = shared_from_this();
287 } catch (const std::bad_weak_ptr&) {}
288 6 svc_.post(&op);
289 6 svc_.work_finished();
290 }
291 6 }
292
293 void
294 128 epoll_acceptor_impl::
295 close_socket() noexcept
296 {
297 128 cancel();
298
299
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 128 times.
128 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
300 {
301 try {
302 desc_state_.impl_ref_ = shared_from_this();
303 } catch (std::bad_weak_ptr const&) {}
304 }
305
306
2/2
✓ Branch 0 taken 62 times.
✓ Branch 1 taken 66 times.
128 if (fd_ >= 0)
307 {
308
1/2
✓ Branch 0 taken 62 times.
✗ Branch 1 not taken.
62 if (desc_state_.registered_events != 0)
309 62 svc_.scheduler().deregister_descriptor(fd_);
310 62 ::close(fd_);
311 62 fd_ = -1;
312 }
313
314 128 desc_state_.fd = -1;
315 {
316 128 std::lock_guard lock(desc_state_.mutex);
317 128 desc_state_.read_op = nullptr;
318 128 desc_state_.read_ready = false;
319 128 desc_state_.write_ready = false;
320 128 }
321 128 desc_state_.registered_events = 0;
322
323 // Clear cached endpoint
324 128 local_endpoint_ = endpoint{};
325 128 }
326
327 189 epoll_acceptor_service::
328 189 epoll_acceptor_service(capy::execution_context& ctx)
329 189 : ctx_(ctx)
330
2/2
✓ Branch 2 taken 189 times.
✓ Branch 5 taken 189 times.
189 , state_(std::make_unique<epoll_acceptor_state>(ctx.use_service<epoll_scheduler>()))
331 {
332 189 }
333
334 378 epoll_acceptor_service::
335 189 ~epoll_acceptor_service()
336 {
337 378 }
338
339 void
340 189 epoll_acceptor_service::
341 shutdown()
342 {
343
1/1
✓ Branch 2 taken 189 times.
189 std::lock_guard lock(state_->mutex_);
344
345
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 189 times.
189 while (auto* impl = state_->acceptor_list_.pop_front())
346 impl->close_socket();
347
348 189 state_->acceptor_ptrs_.clear();
349 189 }
350
351 tcp_acceptor::acceptor_impl&
352 64 epoll_acceptor_service::
353 create_acceptor_impl()
354 {
355
1/1
✓ Branch 1 taken 64 times.
64 auto impl = std::make_shared<epoll_acceptor_impl>(*this);
356 64 auto* raw = impl.get();
357
358
1/1
✓ Branch 2 taken 64 times.
64 std::lock_guard lock(state_->mutex_);
359 64 state_->acceptor_list_.push_back(raw);
360
1/1
✓ Branch 3 taken 64 times.
64 state_->acceptor_ptrs_.emplace(raw, std::move(impl));
361
362 64 return *raw;
363 64 }
364
365 void
366 64 epoll_acceptor_service::
367 destroy_acceptor_impl(tcp_acceptor::acceptor_impl& impl)
368 {
369 64 auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
370
1/1
✓ Branch 2 taken 64 times.
64 std::lock_guard lock(state_->mutex_);
371 64 state_->acceptor_list_.remove(epoll_impl);
372
1/1
✓ Branch 2 taken 64 times.
64 state_->acceptor_ptrs_.erase(epoll_impl);
373 64 }
374
375 std::error_code
376 64 epoll_acceptor_service::
377 open_acceptor(
378 tcp_acceptor::acceptor_impl& impl,
379 endpoint ep,
380 int backlog)
381 {
382 64 auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
383 64 epoll_impl->close_socket();
384
385 64 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
386
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 64 times.
64 if (fd < 0)
387 return make_err(errno);
388
389 64 int reuse = 1;
390 64 ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
391
392 64 sockaddr_in addr = detail::to_sockaddr_in(ep);
393
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 62 times.
64 if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
394 {
395 2 int errn = errno;
396
1/1
✓ Branch 1 taken 2 times.
2 ::close(fd);
397 2 return make_err(errn);
398 }
399
400
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 62 times.
62 if (::listen(fd, backlog) < 0)
401 {
402 int errn = errno;
403 ::close(fd);
404 return make_err(errn);
405 }
406
407 62 epoll_impl->fd_ = fd;
408
409 // Register fd with epoll (edge-triggered mode)
410 62 epoll_impl->desc_state_.fd = fd;
411 {
412
1/1
✓ Branch 1 taken 62 times.
62 std::lock_guard lock(epoll_impl->desc_state_.mutex);
413 62 epoll_impl->desc_state_.read_op = nullptr;
414 62 }
415
1/1
✓ Branch 2 taken 62 times.
62 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
416
417 // Cache the local endpoint (queries OS for ephemeral port if port was 0)
418 62 sockaddr_in local_addr{};
419 62 socklen_t local_len = sizeof(local_addr);
420
1/2
✓ Branch 1 taken 62 times.
✗ Branch 2 not taken.
62 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
421 62 epoll_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
422
423 62 return {};
424 }
425
426 void
427 11 epoll_acceptor_service::
428 post(epoll_op* op)
429 {
430 11 state_->sched_.post(op);
431 11 }
432
433 void
434 2608 epoll_acceptor_service::
435 work_started() noexcept
436 {
437 2608 state_->sched_.work_started();
438 2608 }
439
440 void
441 9 epoll_acceptor_service::
442 work_finished() noexcept
443 {
444 9 state_->sched_.work_finished();
445 9 }
446
447 epoll_socket_service*
448 2601 epoll_acceptor_service::
449 socket_service() const noexcept
450 {
451 2601 auto* svc = ctx_.find_service<detail::socket_service>();
452
2/4
✓ Branch 0 taken 2601 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 2601 times.
✗ Branch 3 not taken.
2601 return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
453 }
454
455 } // namespace boost::corosio::detail
456
457 #endif // BOOST_COROSIO_HAS_EPOLL
458