libs/corosio/src/corosio/src/detail/epoll/sockets.cpp

69.7% Lines (310/445) 91.7% Functions (33/36) 51.8% Branches (131/253)
libs/corosio/src/corosio/src/detail/epoll/sockets.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/sockets.hpp"
15 #include "src/detail/endpoint_convert.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/resume_coro.hpp"
18
19 #include <boost/corosio/detail/except.hpp>
20 #include <boost/capy/buffers.hpp>
21
22 #include <utility>
23
24 #include <errno.h>
25 #include <netinet/in.h>
26 #include <netinet/tcp.h>
27 #include <sys/epoll.h>
28 #include <sys/socket.h>
29 #include <unistd.h>
30
31 namespace boost::corosio::detail {
32
33 void
34 104 epoll_op::canceller::
35 operator()() const noexcept
36 {
37 104 op->cancel();
38 104 }
39
40 void
41 epoll_connect_op::
42 cancel() noexcept
43 {
44 if (socket_impl_)
45 socket_impl_->cancel_single_op(*this);
46 else
47 request_cancel();
48 }
49
50 void
51 98 epoll_read_op::
52 cancel() noexcept
53 {
54
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (socket_impl_)
55 98 socket_impl_->cancel_single_op(*this);
56 else
57 request_cancel();
58 98 }
59
60 void
61 epoll_write_op::
62 cancel() noexcept
63 {
64 if (socket_impl_)
65 socket_impl_->cancel_single_op(*this);
66 else
67 request_cancel();
68 }
69
70 void
71 2602 epoll_connect_op::
72 operator()()
73 {
74 2602 stop_cb.reset();
75
76
3/4
✓ Branch 0 taken 2601 times.
✓ Branch 1 taken 1 time.
✓ Branch 3 taken 2601 times.
✗ Branch 4 not taken.
2602 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
77
78 // Cache endpoints on successful connect
79
3/4
✓ Branch 0 taken 2601 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 2601 times.
✗ Branch 3 not taken.
2602 if (success && socket_impl_)
80 {
81 // Query local endpoint via getsockname (may fail, but remote is always known)
82 2601 endpoint local_ep;
83 2601 sockaddr_in local_addr{};
84 2601 socklen_t local_len = sizeof(local_addr);
85
1/2
✓ Branch 1 taken 2601 times.
✗ Branch 2 not taken.
2601 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
86 2601 local_ep = from_sockaddr_in(local_addr);
87 // Always cache remote endpoint; local may be default if getsockname failed
88 2601 static_cast<epoll_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
89 }
90
91
1/2
✓ Branch 0 taken 2602 times.
✗ Branch 1 not taken.
2602 if (ec_out)
92 {
93
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2602 times.
2602 if (cancelled.load(std::memory_order_acquire))
94 *ec_out = capy::error::canceled;
95
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 2601 times.
2602 else if (errn != 0)
96 1 *ec_out = make_err(errn);
97 else
98 2601 *ec_out = {};
99 }
100
101
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2602 times.
2602 if (bytes_out)
102 *bytes_out = bytes_transferred;
103
104 // Move to stack before resuming. See epoll_op::operator()() for rationale.
105 2602 capy::executor_ref saved_ex( std::move( ex ) );
106 2602 capy::coro saved_h( std::move( h ) );
107 2602 auto prevent_premature_destruction = std::move(impl_ptr);
108
1/1
✓ Branch 1 taken 2602 times.
2602 resume_coro(saved_ex, saved_h);
109 2602 }
110
111 5214 epoll_socket_impl::
112 5214 epoll_socket_impl(epoll_socket_service& svc) noexcept
113 5214 : svc_(svc)
114 {
115 5214 }
116
117 5214 epoll_socket_impl::
118 ~epoll_socket_impl() = default;
119
120 void
121 5214 epoll_socket_impl::
122 release()
123 {
124 5214 close_socket();
125 5214 svc_.destroy_impl(*this);
126 5214 }
127
128 std::coroutine_handle<>
129 2602 epoll_socket_impl::
130 connect(
131 std::coroutine_handle<> h,
132 capy::executor_ref ex,
133 endpoint ep,
134 std::stop_token token,
135 std::error_code* ec)
136 {
137 2602 auto& op = conn_;
138 2602 op.reset();
139 2602 op.h = h;
140 2602 op.ex = ex;
141 2602 op.ec_out = ec;
142 2602 op.fd = fd_;
143 2602 op.target_endpoint = ep; // Store target for endpoint caching
144 2602 op.start(token, this);
145
146 2602 sockaddr_in addr = detail::to_sockaddr_in(ep);
147
1/1
✓ Branch 1 taken 2602 times.
2602 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
148
149
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2602 times.
2602 if (result == 0)
150 {
151 // Sync success - cache endpoints immediately
152 // Remote is always known; local may fail but we still cache remote
153 sockaddr_in local_addr{};
154 socklen_t local_len = sizeof(local_addr);
155 if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
156 local_endpoint_ = detail::from_sockaddr_in(local_addr);
157 remote_endpoint_ = ep;
158
159 op.complete(0, 0);
160 op.impl_ptr = shared_from_this();
161 svc_.post(&op);
162 // completion is always posted to scheduler queue, never inline.
163 return std::noop_coroutine();
164 }
165
166
1/2
✓ Branch 0 taken 2602 times.
✗ Branch 1 not taken.
2602 if (errno == EINPROGRESS)
167 {
168 2602 svc_.work_started();
169
1/1
✓ Branch 1 taken 2602 times.
2602 op.impl_ptr = shared_from_this();
170
171
1/1
✓ Branch 1 taken 2602 times.
2602 std::lock_guard lock(desc_state_.mutex);
172
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2602 times.
2602 if (desc_state_.write_ready)
173 {
174 desc_state_.write_ready = false;
175 op.perform_io();
176 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
177 {
178 op.errn = 0;
179 if (op.cancelled.load(std::memory_order_acquire))
180 {
181 svc_.post(&op);
182 svc_.work_finished();
183 }
184 else
185 {
186 desc_state_.connect_op = &op;
187 }
188 }
189 else
190 {
191 svc_.post(&op);
192 svc_.work_finished();
193 }
194 }
195 else
196 {
197
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2602 times.
2602 if (op.cancelled.load(std::memory_order_acquire))
198 {
199 svc_.post(&op);
200 svc_.work_finished();
201 }
202 else
203 {
204 2602 desc_state_.connect_op = &op;
205 }
206 }
207 2602 return std::noop_coroutine();
208 2602 }
209
210 op.complete(errno, 0);
211 op.impl_ptr = shared_from_this();
212 svc_.post(&op);
213 // completion is always posted to scheduler queue, never inline.
214 return std::noop_coroutine();
215 }
216
217 void
218 168 epoll_socket_impl::
219 do_read_io()
220 {
221 168 auto& op = rd_;
222
223 ssize_t n;
224 do {
225 168 n = ::readv(fd_, op.iovecs, op.iovec_count);
226
2/4
✓ Branch 0 taken 168 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 168 times.
168 } while (n < 0 && errno == EINTR);
227
228
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 168 times.
168 if (n > 0)
229 {
230 {
231 std::lock_guard lock(desc_state_.mutex);
232 desc_state_.read_ready = false;
233 }
234 op.complete(0, static_cast<std::size_t>(n));
235 svc_.post(&op);
236 return;
237 }
238
239
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 168 times.
168 if (n == 0)
240 {
241 {
242 std::lock_guard lock(desc_state_.mutex);
243 desc_state_.read_ready = false;
244 }
245 op.complete(0, 0);
246 svc_.post(&op);
247 return;
248 }
249
250
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
168 if (errno == EAGAIN || errno == EWOULDBLOCK)
251 {
252 168 svc_.work_started();
253
254
1/1
✓ Branch 1 taken 168 times.
168 std::lock_guard lock(desc_state_.mutex);
255
2/2
✓ Branch 0 taken 76 times.
✓ Branch 1 taken 92 times.
168 if (desc_state_.read_ready)
256 {
257 76 desc_state_.read_ready = false;
258 76 op.perform_io();
259
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 76 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
76 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
260 {
261 76 op.errn = 0;
262
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 76 times.
152 if (op.cancelled.load(std::memory_order_acquire))
263 {
264 svc_.post(&op);
265 svc_.work_finished();
266 }
267 else
268 {
269 76 desc_state_.read_op = &op;
270 }
271 }
272 else
273 {
274 svc_.post(&op);
275 svc_.work_finished();
276 }
277 }
278 else
279 {
280
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 92 times.
92 if (op.cancelled.load(std::memory_order_acquire))
281 {
282 svc_.post(&op);
283 svc_.work_finished();
284 }
285 else
286 {
287 92 desc_state_.read_op = &op;
288 }
289 }
290 168 return;
291 168 }
292
293 op.complete(errno, 0);
294 svc_.post(&op);
295 }
296
297 void
298 epoll_socket_impl::
299 do_write_io()
300 {
301 auto& op = wr_;
302
303 msghdr msg{};
304 msg.msg_iov = op.iovecs;
305 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
306
307 ssize_t n;
308 do {
309 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
310 } while (n < 0 && errno == EINTR);
311
312 if (n > 0)
313 {
314 {
315 std::lock_guard lock(desc_state_.mutex);
316 desc_state_.write_ready = false;
317 }
318 op.complete(0, static_cast<std::size_t>(n));
319 svc_.post(&op);
320 return;
321 }
322
323 if (errno == EAGAIN || errno == EWOULDBLOCK)
324 {
325 svc_.work_started();
326
327 std::lock_guard lock(desc_state_.mutex);
328 if (desc_state_.write_ready)
329 {
330 desc_state_.write_ready = false;
331 op.perform_io();
332 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
333 {
334 op.errn = 0;
335 if (op.cancelled.load(std::memory_order_acquire))
336 {
337 svc_.post(&op);
338 svc_.work_finished();
339 }
340 else
341 {
342 desc_state_.write_op = &op;
343 }
344 }
345 else
346 {
347 svc_.post(&op);
348 svc_.work_finished();
349 }
350 }
351 else
352 {
353 if (op.cancelled.load(std::memory_order_acquire))
354 {
355 svc_.post(&op);
356 svc_.work_finished();
357 }
358 else
359 {
360 desc_state_.write_op = &op;
361 }
362 }
363 return;
364 }
365
366 op.complete(errno ? errno : EIO, 0);
367 svc_.post(&op);
368 }
369
370 std::coroutine_handle<>
371 82262 epoll_socket_impl::
372 read_some(
373 std::coroutine_handle<> h,
374 capy::executor_ref ex,
375 io_buffer_param param,
376 std::stop_token token,
377 std::error_code* ec,
378 std::size_t* bytes_out)
379 {
380 82262 auto& op = rd_;
381 82262 op.reset();
382 82262 op.h = h;
383 82262 op.ex = ex;
384 82262 op.ec_out = ec;
385 82262 op.bytes_out = bytes_out;
386 82262 op.fd = fd_;
387 82262 op.start(token, this);
388
1/1
✓ Branch 1 taken 82262 times.
82262 op.impl_ptr = shared_from_this();
389
390 // Must prepare buffers before initiator runs
391 82262 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
392 82262 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
393
394
6/8
✓ Branch 0 taken 82261 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 82261 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 82261 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 82261 times.
82262 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
395 {
396 1 op.empty_buffer_read = true;
397 1 op.complete(0, 0);
398
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
399 1 return std::noop_coroutine();
400 }
401
402
2/2
✓ Branch 0 taken 82261 times.
✓ Branch 1 taken 82261 times.
164522 for (int i = 0; i < op.iovec_count; ++i)
403 {
404 82261 op.iovecs[i].iov_base = bufs[i].data();
405 82261 op.iovecs[i].iov_len = bufs[i].size();
406 }
407
408 // Speculative read: bypass initiator when data is ready
409 ssize_t n;
410 do {
411
1/1
✓ Branch 1 taken 82261 times.
82261 n = ::readv(fd_, op.iovecs, op.iovec_count);
412
3/4
✓ Branch 0 taken 168 times.
✓ Branch 1 taken 82093 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 168 times.
82261 } while (n < 0 && errno == EINTR);
413
414
2/2
✓ Branch 0 taken 82088 times.
✓ Branch 1 taken 173 times.
82261 if (n > 0)
415 {
416 82088 op.complete(0, static_cast<std::size_t>(n));
417
1/1
✓ Branch 1 taken 82088 times.
82088 svc_.post(&op);
418 82088 return std::noop_coroutine();
419 }
420
421
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 168 times.
173 if (n == 0)
422 {
423 5 op.complete(0, 0);
424
1/1
✓ Branch 1 taken 5 times.
5 svc_.post(&op);
425 5 return std::noop_coroutine();
426 }
427
428
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
168 if (errno != EAGAIN && errno != EWOULDBLOCK)
429 {
430 op.complete(errno, 0);
431 svc_.post(&op);
432 return std::noop_coroutine();
433 }
434
435 // EAGAIN — full async path
436
1/1
✓ Branch 1 taken 168 times.
168 return read_initiator_.start<&epoll_socket_impl::do_read_io>(this);
437 }
438
439 std::coroutine_handle<>
440 82141 epoll_socket_impl::
441 write_some(
442 std::coroutine_handle<> h,
443 capy::executor_ref ex,
444 io_buffer_param param,
445 std::stop_token token,
446 std::error_code* ec,
447 std::size_t* bytes_out)
448 {
449 82141 auto& op = wr_;
450 82141 op.reset();
451 82141 op.h = h;
452 82141 op.ex = ex;
453 82141 op.ec_out = ec;
454 82141 op.bytes_out = bytes_out;
455 82141 op.fd = fd_;
456 82141 op.start(token, this);
457
1/1
✓ Branch 1 taken 82141 times.
82141 op.impl_ptr = shared_from_this();
458
459 82141 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
460 82141 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
461
462
6/8
✓ Branch 0 taken 82140 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 82140 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 82140 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 82140 times.
82141 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
463 {
464 1 op.complete(0, 0);
465
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
466 1 return std::noop_coroutine();
467 }
468
469
2/2
✓ Branch 0 taken 82140 times.
✓ Branch 1 taken 82140 times.
164280 for (int i = 0; i < op.iovec_count; ++i)
470 {
471 82140 op.iovecs[i].iov_base = bufs[i].data();
472 82140 op.iovecs[i].iov_len = bufs[i].size();
473 }
474
475 // Speculative write: bypass initiator when buffer space is ready
476 82140 msghdr msg{};
477 82140 msg.msg_iov = op.iovecs;
478 82140 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
479
480 ssize_t n;
481 do {
482
1/1
✓ Branch 1 taken 82140 times.
82140 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
483
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 82139 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
82140 } while (n < 0 && errno == EINTR);
484
485
2/2
✓ Branch 0 taken 82139 times.
✓ Branch 1 taken 1 time.
82140 if (n > 0)
486 {
487 82139 op.complete(0, static_cast<std::size_t>(n));
488
1/1
✓ Branch 1 taken 82139 times.
82139 svc_.post(&op);
489 82139 return std::noop_coroutine();
490 }
491
492
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if (n == 0)
493 {
494 op.complete(0, 0);
495 svc_.post(&op);
496 return std::noop_coroutine();
497 }
498
499
2/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
1 if (errno != EAGAIN && errno != EWOULDBLOCK)
500 {
501 1 op.complete(errno, 0);
502
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
503 1 return std::noop_coroutine();
504 }
505
506 // EAGAIN — full async path
507 return write_initiator_.start<&epoll_socket_impl::do_write_io>(this);
508 }
509
510 std::error_code
511 3 epoll_socket_impl::
512 shutdown(tcp_socket::shutdown_type what) noexcept
513 {
514 int how;
515
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
3 switch (what)
516 {
517 1 case tcp_socket::shutdown_receive: how = SHUT_RD; break;
518 1 case tcp_socket::shutdown_send: how = SHUT_WR; break;
519 1 case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
520 default:
521 return make_err(EINVAL);
522 }
523
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
524 return make_err(errno);
525 3 return {};
526 }
527
528 std::error_code
529 5 epoll_socket_impl::
530 set_no_delay(bool value) noexcept
531 {
532
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 int flag = value ? 1 : 0;
533
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
534 return make_err(errno);
535 5 return {};
536 }
537
538 bool
539 5 epoll_socket_impl::
540 no_delay(std::error_code& ec) const noexcept
541 {
542 5 int flag = 0;
543 5 socklen_t len = sizeof(flag);
544
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
545 {
546 ec = make_err(errno);
547 return false;
548 }
549 5 ec = {};
550 5 return flag != 0;
551 }
552
553 std::error_code
554 4 epoll_socket_impl::
555 set_keep_alive(bool value) noexcept
556 {
557
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 int flag = value ? 1 : 0;
558
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
559 return make_err(errno);
560 4 return {};
561 }
562
563 bool
564 4 epoll_socket_impl::
565 keep_alive(std::error_code& ec) const noexcept
566 {
567 4 int flag = 0;
568 4 socklen_t len = sizeof(flag);
569
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
570 {
571 ec = make_err(errno);
572 return false;
573 }
574 4 ec = {};
575 4 return flag != 0;
576 }
577
578 std::error_code
579 1 epoll_socket_impl::
580 set_receive_buffer_size(int size) noexcept
581 {
582
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
583 return make_err(errno);
584 1 return {};
585 }
586
587 int
588 3 epoll_socket_impl::
589 receive_buffer_size(std::error_code& ec) const noexcept
590 {
591 3 int size = 0;
592 3 socklen_t len = sizeof(size);
593
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
594 {
595 ec = make_err(errno);
596 return 0;
597 }
598 3 ec = {};
599 3 return size;
600 }
601
602 std::error_code
603 1 epoll_socket_impl::
604 set_send_buffer_size(int size) noexcept
605 {
606
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
607 return make_err(errno);
608 1 return {};
609 }
610
611 int
612 3 epoll_socket_impl::
613 send_buffer_size(std::error_code& ec) const noexcept
614 {
615 3 int size = 0;
616 3 socklen_t len = sizeof(size);
617
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
618 {
619 ec = make_err(errno);
620 return 0;
621 }
622 3 ec = {};
623 3 return size;
624 }
625
626 std::error_code
627 4 epoll_socket_impl::
628 set_linger(bool enabled, int timeout) noexcept
629 {
630
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 3 times.
4 if (timeout < 0)
631 1 return make_err(EINVAL);
632 struct ::linger lg;
633
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 time.
3 lg.l_onoff = enabled ? 1 : 0;
634 3 lg.l_linger = timeout;
635
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
636 return make_err(errno);
637 3 return {};
638 }
639
640 tcp_socket::linger_options
641 3 epoll_socket_impl::
642 linger(std::error_code& ec) const noexcept
643 {
644 3 struct ::linger lg{};
645 3 socklen_t len = sizeof(lg);
646
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
647 {
648 ec = make_err(errno);
649 return {};
650 }
651 3 ec = {};
652 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
653 }
654
655 void
656 7923 epoll_socket_impl::
657 cancel() noexcept
658 {
659 7923 std::shared_ptr<epoll_socket_impl> self;
660 try {
661
1/1
✓ Branch 1 taken 7923 times.
7923 self = shared_from_this();
662 } catch (const std::bad_weak_ptr&) {
663 return;
664 }
665
666 7923 conn_.request_cancel();
667 7923 rd_.request_cancel();
668 7923 wr_.request_cancel();
669
670 7923 epoll_op* conn_claimed = nullptr;
671 7923 epoll_op* rd_claimed = nullptr;
672 7923 epoll_op* wr_claimed = nullptr;
673 {
674 7923 std::lock_guard lock(desc_state_.mutex);
675
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7923 times.
7923 if (desc_state_.connect_op == &conn_)
676 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
677
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 7872 times.
7923 if (desc_state_.read_op == &rd_)
678 51 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
679
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7923 times.
7923 if (desc_state_.write_op == &wr_)
680 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
681 7923 }
682
683
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7923 times.
7923 if (conn_claimed)
684 {
685 conn_.impl_ptr = self;
686 svc_.post(&conn_);
687 svc_.work_finished();
688 }
689
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 7872 times.
7923 if (rd_claimed)
690 {
691 51 rd_.impl_ptr = self;
692 51 svc_.post(&rd_);
693 51 svc_.work_finished();
694 }
695
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7923 times.
7923 if (wr_claimed)
696 {
697 wr_.impl_ptr = self;
698 svc_.post(&wr_);
699 svc_.work_finished();
700 }
701 7923 }
702
703 void
704 98 epoll_socket_impl::
705 cancel_single_op(epoll_op& op) noexcept
706 {
707 98 op.request_cancel();
708
709 98 epoll_op** desc_op_ptr = nullptr;
710
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 98 times.
98 if (&op == &conn_) desc_op_ptr = &desc_state_.connect_op;
711
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 else if (&op == &rd_) desc_op_ptr = &desc_state_.read_op;
712 else if (&op == &wr_) desc_op_ptr = &desc_state_.write_op;
713
714
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (desc_op_ptr)
715 {
716 98 epoll_op* claimed = nullptr;
717 {
718 98 std::lock_guard lock(desc_state_.mutex);
719
2/2
✓ Branch 0 taken 66 times.
✓ Branch 1 taken 32 times.
98 if (*desc_op_ptr == &op)
720 66 claimed = std::exchange(*desc_op_ptr, nullptr);
721 98 }
722
2/2
✓ Branch 0 taken 66 times.
✓ Branch 1 taken 32 times.
98 if (claimed)
723 {
724 try {
725
1/1
✓ Branch 1 taken 66 times.
66 op.impl_ptr = shared_from_this();
726 } catch (const std::bad_weak_ptr&) {}
727 66 svc_.post(&op);
728 66 svc_.work_finished();
729 }
730 }
731 98 }
732
733 void
734 7827 epoll_socket_impl::
735 close_socket() noexcept
736 {
737 7827 cancel();
738
739 // Keep impl alive if descriptor_state is queued in the scheduler.
740 // Without this, destroy_impl() drops the last shared_ptr while
741 // the queued descriptor_state node would become dangling.
742
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 7823 times.
7827 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
743 {
744 try {
745
1/1
✓ Branch 1 taken 4 times.
4 desc_state_.impl_ref_ = shared_from_this();
746 } catch (std::bad_weak_ptr const&) {}
747 }
748
749
2/2
✓ Branch 0 taken 5214 times.
✓ Branch 1 taken 2613 times.
7827 if (fd_ >= 0)
750 {
751
1/2
✓ Branch 0 taken 5214 times.
✗ Branch 1 not taken.
5214 if (desc_state_.registered_events != 0)
752 5214 svc_.scheduler().deregister_descriptor(fd_);
753 5214 ::close(fd_);
754 5214 fd_ = -1;
755 }
756
757 7827 desc_state_.fd = -1;
758 {
759 7827 std::lock_guard lock(desc_state_.mutex);
760 7827 desc_state_.read_op = nullptr;
761 7827 desc_state_.write_op = nullptr;
762 7827 desc_state_.connect_op = nullptr;
763 7827 desc_state_.read_ready = false;
764 7827 desc_state_.write_ready = false;
765 7827 }
766 7827 desc_state_.registered_events = 0;
767
768 7827 local_endpoint_ = endpoint{};
769 7827 remote_endpoint_ = endpoint{};
770 7827 }
771
772 189 epoll_socket_service::
773 189 epoll_socket_service(capy::execution_context& ctx)
774
2/2
✓ Branch 2 taken 189 times.
✓ Branch 5 taken 189 times.
189 : state_(std::make_unique<epoll_socket_state>(ctx.use_service<epoll_scheduler>()))
775 {
776 189 }
777
778 378 epoll_socket_service::
779 189 ~epoll_socket_service()
780 {
781 378 }
782
783 void
784 189 epoll_socket_service::
785 shutdown()
786 {
787
1/1
✓ Branch 2 taken 189 times.
189 std::lock_guard lock(state_->mutex_);
788
789
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 189 times.
189 while (auto* impl = state_->socket_list_.pop_front())
790 impl->close_socket();
791
792 189 state_->socket_ptrs_.clear();
793 189 }
794
795 tcp_socket::socket_impl&
796 5214 epoll_socket_service::
797 create_impl()
798 {
799
1/1
✓ Branch 1 taken 5214 times.
5214 auto impl = std::make_shared<epoll_socket_impl>(*this);
800 5214 auto* raw = impl.get();
801
802 {
803
1/1
✓ Branch 2 taken 5214 times.
5214 std::lock_guard lock(state_->mutex_);
804 5214 state_->socket_list_.push_back(raw);
805
1/1
✓ Branch 3 taken 5214 times.
5214 state_->socket_ptrs_.emplace(raw, std::move(impl));
806 5214 }
807
808 5214 return *raw;
809 5214 }
810
811 void
812 5214 epoll_socket_service::
813 destroy_impl(tcp_socket::socket_impl& impl)
814 {
815 5214 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
816
1/1
✓ Branch 2 taken 5214 times.
5214 std::lock_guard lock(state_->mutex_);
817 5214 state_->socket_list_.remove(epoll_impl);
818
1/1
✓ Branch 2 taken 5214 times.
5214 state_->socket_ptrs_.erase(epoll_impl);
819 5214 }
820
821 std::error_code
822 2613 epoll_socket_service::
823 open_socket(tcp_socket::socket_impl& impl)
824 {
825 2613 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
826 2613 epoll_impl->close_socket();
827
828 2613 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
829
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2613 times.
2613 if (fd < 0)
830 return make_err(errno);
831
832 2613 epoll_impl->fd_ = fd;
833
834 // Register fd with epoll (edge-triggered mode)
835 2613 epoll_impl->desc_state_.fd = fd;
836 {
837
1/1
✓ Branch 1 taken 2613 times.
2613 std::lock_guard lock(epoll_impl->desc_state_.mutex);
838 2613 epoll_impl->desc_state_.read_op = nullptr;
839 2613 epoll_impl->desc_state_.write_op = nullptr;
840 2613 epoll_impl->desc_state_.connect_op = nullptr;
841 2613 }
842 2613 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
843
844 2613 return {};
845 }
846
847 void
848 164352 epoll_socket_service::
849 post(epoll_op* op)
850 {
851 164352 state_->sched_.post(op);
852 164352 }
853
854 void
855 2770 epoll_socket_service::
856 work_started() noexcept
857 {
858 2770 state_->sched_.work_started();
859 2770 }
860
861 void
862 117 epoll_socket_service::
863 work_finished() noexcept
864 {
865 117 state_->sched_.work_finished();
866 117 }
867
868 } // namespace boost::corosio::detail
869
870 #endif // BOOST_COROSIO_HAS_EPOLL
871