Commit | Line | Data |
---|---|---|
460b9539 | 1 | /* |
2 | * This file is part of DisOrder. | |
e8c92ba7 | 3 | * Copyright (C) 2004, 2005, 2007 Richard Kettlewell |
460b9539 | 4 | * |
5 | * This program is free software; you can redistribute it and/or modify | |
6 | * it under the terms of the GNU General Public License as published by | |
7 | * the Free Software Foundation; either version 2 of the License, or | |
8 | * (at your option) any later version. | |
9 | * | |
10 | * This program is distributed in the hope that it will be useful, but | |
11 | * WITHOUT ANY WARRANTY; without even the implied warranty of | |
12 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |
13 | * General Public License for more details. | |
14 | * | |
15 | * You should have received a copy of the GNU General Public License | |
16 | * along with this program; if not, write to the Free Software | |
17 | * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 | |
18 | * USA | |
19 | */ | |
768d7355 RK |
20 | /** @file lib/event.c |
21 | * @brief DisOrder event loop | |
22 | */ | |
460b9539 | 23 | |
24 | #include <config.h> | |
25 | ||
26 | #include <unistd.h> | |
27 | #include <fcntl.h> | |
28 | #include <sys/time.h> | |
29 | #include <sys/types.h> | |
30 | #include <sys/resource.h> | |
31 | #include <sys/wait.h> | |
32 | #include <unistd.h> | |
33 | #include <assert.h> | |
34 | #include <signal.h> | |
35 | #include <errno.h> | |
36 | #include <string.h> | |
37 | #include <limits.h> | |
38 | #include <sys/socket.h> | |
39 | #include <netinet/in.h> | |
40 | #include <sys/un.h> | |
41 | #include <stdio.h> | |
42 | #include "event.h" | |
43 | #include "mem.h" | |
44 | #include "log.h" | |
45 | #include "syscalls.h" | |
46 | #include "printf.h" | |
47 | #include "sink.h" | |
768d7355 | 48 | #include "vector.h" |
460b9539 | 49 | |
768d7355 | 50 | /** @brief A timeout */ |
460b9539 | 51 | struct timeout { |
52 | struct timeout *next; | |
53 | struct timeval when; | |
54 | ev_timeout_callback *callback; | |
55 | void *u; | |
56 | int resolve; | |
57 | }; | |
58 | ||
768d7355 | 59 | /** @brief A file descriptor in one mode */ |
460b9539 | 60 | struct fd { |
61 | int fd; | |
62 | ev_fd_callback *callback; | |
63 | void *u; | |
e8c92ba7 | 64 | const char *what; |
460b9539 | 65 | }; |
66 | ||
768d7355 | 67 | /** @brief All the file descriptors in a given mode */ |
460b9539 | 68 | struct fdmode { |
768d7355 | 69 | /** @brief Mask of active file descriptors passed to @c select() */ |
460b9539 | 70 | fd_set enabled; |
768d7355 RK |
71 | |
72 | /** @brief File descriptor mask returned from @c select() */ | |
460b9539 | 73 | fd_set tripped; |
768d7355 RK |
74 | |
75 | /** @brief Number of file descriptors in @p fds */ | |
76 | int nfds; | |
77 | ||
78 | /** @brief Number of slots in @p fds */ | |
79 | int fdslots; | |
80 | ||
81 | /** @brief Array of all active file descriptors */ | |
460b9539 | 82 | struct fd *fds; |
768d7355 RK |
83 | |
84 | /** @brief Highest-numbered file descriptor or 0 */ | |
460b9539 | 85 | int maxfd; |
86 | }; | |
87 | ||
768d7355 | 88 | /** @brief A signal handler */ |
460b9539 | 89 | struct signal { |
90 | struct sigaction oldsa; | |
91 | ev_signal_callback *callback; | |
92 | void *u; | |
93 | }; | |
94 | ||
768d7355 | 95 | /** @brief A child process */ |
460b9539 | 96 | struct child { |
97 | pid_t pid; | |
98 | int options; | |
99 | ev_child_callback *callback; | |
100 | void *u; | |
101 | }; | |
102 | ||
768d7355 | 103 | /** @brief An event loop */ |
460b9539 | 104 | struct ev_source { |
768d7355 | 105 | /** @brief File descriptors, per mode */ |
460b9539 | 106 | struct fdmode mode[ev_nmodes]; |
768d7355 RK |
107 | |
108 | /** @brief Sorted linked list of timeouts | |
109 | * | |
110 | * We could use @ref HEAP_TYPE now, but there aren't many timeouts. | |
111 | */ | |
460b9539 | 112 | struct timeout *timeouts; |
768d7355 RK |
113 | |
114 | /** @brief Array of handled signals */ | |
460b9539 | 115 | struct signal signals[NSIG]; |
768d7355 RK |
116 | |
117 | /** @brief Mask of handled signals */ | |
460b9539 | 118 | sigset_t sigmask; |
768d7355 RK |
119 | |
120 | /** @brief Escape early from handling of @c select() results | |
121 | * | |
122 | * This is set if any of the file descriptor arrays are invalidated, since | |
123 | * it's then not safe for processing of them to continue. | |
124 | */ | |
460b9539 | 125 | int escape; |
768d7355 RK |
126 | |
127 | /** @brief Signal handling pipe | |
128 | * | |
129 | * The signal handle writes signal numbers down this pipe. | |
130 | */ | |
460b9539 | 131 | int sigpipe[2]; |
768d7355 RK |
132 | |
133 | /** @brief Number of child processes in @p children */ | |
134 | int nchildren; | |
135 | ||
136 | /** @brief Number of slots in @p children */ | |
137 | int nchildslots; | |
138 | ||
139 | /** @brief Array of child processes */ | |
460b9539 | 140 | struct child *children; |
141 | }; | |
142 | ||
768d7355 | 143 | /** @brief Names of file descriptor modes */ |
460b9539 | 144 | static const char *modenames[] = { "read", "write", "except" }; |
145 | ||
146 | /* utilities ******************************************************************/ | |
147 | ||
768d7355 RK |
148 | /** @brief Great-than comparison for timevals |
149 | * | |
150 | * Ought to be in @file lib/timeval.h | |
151 | */ | |
460b9539 | 152 | static inline int gt(const struct timeval *a, const struct timeval *b) { |
153 | if(a->tv_sec > b->tv_sec) | |
154 | return 1; | |
155 | if(a->tv_sec == b->tv_sec | |
156 | && a->tv_usec > b->tv_usec) | |
157 | return 1; | |
158 | return 0; | |
159 | } | |
160 | ||
768d7355 RK |
161 | /** @brief Greater-than-or-equal comparison for timevals |
162 | * | |
163 | * Ought to be in @file lib/timeval.h | |
164 | */ | |
460b9539 | 165 | static inline int ge(const struct timeval *a, const struct timeval *b) { |
166 | return !gt(b, a); | |
167 | } | |
168 | ||
169 | /* creation *******************************************************************/ | |
170 | ||
768d7355 | 171 | /** @brief Create a new event loop */ |
460b9539 | 172 | ev_source *ev_new(void) { |
173 | ev_source *ev = xmalloc(sizeof *ev); | |
174 | int n; | |
175 | ||
176 | memset(ev, 0, sizeof *ev); | |
177 | for(n = 0; n < ev_nmodes; ++n) | |
178 | FD_ZERO(&ev->mode[n].enabled); | |
179 | ev->sigpipe[0] = ev->sigpipe[1] = -1; | |
180 | sigemptyset(&ev->sigmask); | |
181 | return ev; | |
182 | } | |
183 | ||
184 | /* event loop *****************************************************************/ | |
185 | ||
768d7355 RK |
186 | /** @brief Run the event loop |
187 | * @return -1 on error, non-0 if any callback returned non-0 | |
188 | */ | |
460b9539 | 189 | int ev_run(ev_source *ev) { |
190 | for(;;) { | |
191 | struct timeval now; | |
192 | struct timeval delta; | |
193 | int n, mode; | |
194 | int ret; | |
195 | int maxfd; | |
196 | struct timeout *t, **tt; | |
e8c92ba7 | 197 | struct stat sb; |
460b9539 | 198 | |
199 | xgettimeofday(&now, 0); | |
200 | /* Handle timeouts. We don't want to handle any timeouts that are added | |
201 | * while we're handling them (otherwise we'd have to break out of infinite | |
202 | * loops, preferrably without starving better-behaved subsystems). Hence | |
203 | * the slightly complicated two-phase approach here. */ | |
204 | for(t = ev->timeouts; | |
205 | t && ge(&now, &t->when); | |
206 | t = t->next) { | |
207 | t->resolve = 1; | |
208 | D(("calling timeout for %ld.%ld callback %p %p", | |
209 | (long)t->when.tv_sec, (long)t->when.tv_usec, | |
210 | (void *)t->callback, t->u)); | |
211 | ret = t->callback(ev, &now, t->u); | |
212 | if(ret) | |
213 | return ret; | |
214 | } | |
215 | tt = &ev->timeouts; | |
216 | while((t = *tt)) { | |
217 | if(t->resolve) | |
218 | *tt = t->next; | |
219 | else | |
220 | tt = &t->next; | |
221 | } | |
222 | maxfd = 0; | |
223 | for(mode = 0; mode < ev_nmodes; ++mode) { | |
224 | ev->mode[mode].tripped = ev->mode[mode].enabled; | |
225 | if(ev->mode[mode].maxfd > maxfd) | |
226 | maxfd = ev->mode[mode].maxfd; | |
227 | } | |
228 | xsigprocmask(SIG_UNBLOCK, &ev->sigmask, 0); | |
229 | do { | |
230 | if(ev->timeouts) { | |
231 | xgettimeofday(&now, 0); | |
232 | delta.tv_sec = ev->timeouts->when.tv_sec - now.tv_sec; | |
233 | delta.tv_usec = ev->timeouts->when.tv_usec - now.tv_usec; | |
234 | if(delta.tv_usec < 0) { | |
235 | delta.tv_usec += 1000000; | |
236 | --delta.tv_sec; | |
237 | } | |
238 | if(delta.tv_sec < 0) | |
239 | delta.tv_sec = delta.tv_usec = 0; | |
240 | n = select(maxfd + 1, | |
241 | &ev->mode[ev_read].tripped, | |
242 | &ev->mode[ev_write].tripped, | |
243 | &ev->mode[ev_except].tripped, | |
244 | &delta); | |
245 | } else { | |
246 | n = select(maxfd + 1, | |
247 | &ev->mode[ev_read].tripped, | |
248 | &ev->mode[ev_write].tripped, | |
249 | &ev->mode[ev_except].tripped, | |
250 | 0); | |
251 | } | |
252 | } while(n < 0 && errno == EINTR); | |
253 | xsigprocmask(SIG_BLOCK, &ev->sigmask, 0); | |
254 | if(n < 0) { | |
255 | error(errno, "error calling select"); | |
e8c92ba7 RK |
256 | if(errno == EBADF) { |
257 | /* If there's a bad FD in the mix then check them all and log what we | |
258 | * find, to ease debugging */ | |
259 | for(mode = 0; mode < ev_nmodes; ++mode) { | |
260 | for(n = 0; n < ev->mode[mode].nfds; ++n) { | |
261 | const int fd = ev->mode[mode].fds[n].fd; | |
262 | ||
263 | if(FD_ISSET(fd, &ev->mode[mode].enabled) | |
264 | && fstat(fd, &sb) < 0) | |
265 | error(errno, "fstat %d (%s)", fd, ev->mode[mode].fds[n].what); | |
266 | } | |
267 | } | |
268 | } | |
460b9539 | 269 | return -1; |
270 | } | |
271 | if(n > 0) { | |
272 | /* if anything deranges the meaning of an fd, or re-orders the | |
273 | * fds[] tables, we'd better give up; such operations will | |
274 | * therefore set @escape@. */ | |
275 | ev->escape = 0; | |
276 | for(mode = 0; mode < ev_nmodes && !ev->escape; ++mode) | |
277 | for(n = 0; n < ev->mode[mode].nfds && !ev->escape; ++n) { | |
278 | int fd = ev->mode[mode].fds[n].fd; | |
279 | if(FD_ISSET(fd, &ev->mode[mode].tripped)) { | |
280 | D(("calling %s fd %d callback %p %p", modenames[mode], fd, | |
281 | (void *)ev->mode[mode].fds[n].callback, | |
282 | ev->mode[mode].fds[n].u)); | |
283 | ret = ev->mode[mode].fds[n].callback(ev, fd, | |
284 | ev->mode[mode].fds[n].u); | |
285 | if(ret) | |
286 | return ret; | |
287 | } | |
288 | } | |
289 | } | |
290 | /* we'll pick up timeouts back round the loop */ | |
291 | } | |
292 | } | |
293 | ||
294 | /* file descriptors ***********************************************************/ | |
295 | ||
768d7355 RK |
296 | /** @brief Register a file descriptor |
297 | * @param ev Event loop | |
298 | * @param mode @c ev_read or @c ev_write | |
299 | * @param fd File descriptor | |
300 | * @param callback Called when @p is readable/writable | |
301 | * @param u Passed to @p callback | |
302 | * @param what Text description | |
303 | * @return 0 on success, non-0 on error | |
304 | * | |
305 | * Sets @ref ev_source::escape, so no further processing of file descriptors | |
306 | * will occur this time round the event loop. | |
307 | */ | |
460b9539 | 308 | int ev_fd(ev_source *ev, |
309 | ev_fdmode mode, | |
310 | int fd, | |
311 | ev_fd_callback *callback, | |
e8c92ba7 RK |
312 | void *u, |
313 | const char *what) { | |
460b9539 | 314 | int n; |
315 | ||
316 | D(("registering %s fd %d callback %p %p", modenames[mode], fd, | |
317 | (void *)callback, u)); | |
318 | assert(mode < ev_nmodes); | |
319 | if(ev->mode[mode].nfds >= ev->mode[mode].fdslots) { | |
320 | ev->mode[mode].fdslots = (ev->mode[mode].fdslots | |
321 | ? 2 * ev->mode[mode].fdslots : 16); | |
322 | D(("expanding %s fd table to %d entries", modenames[mode], | |
323 | ev->mode[mode].fdslots)); | |
324 | ev->mode[mode].fds = xrealloc(ev->mode[mode].fds, | |
325 | ev->mode[mode].fdslots * sizeof (struct fd)); | |
326 | } | |
327 | n = ev->mode[mode].nfds++; | |
328 | FD_SET(fd, &ev->mode[mode].enabled); | |
329 | ev->mode[mode].fds[n].fd = fd; | |
330 | ev->mode[mode].fds[n].callback = callback; | |
331 | ev->mode[mode].fds[n].u = u; | |
e8c92ba7 | 332 | ev->mode[mode].fds[n].what = what; |
460b9539 | 333 | if(fd > ev->mode[mode].maxfd) |
334 | ev->mode[mode].maxfd = fd; | |
335 | ev->escape = 1; | |
336 | return 0; | |
337 | } | |
338 | ||
768d7355 RK |
339 | /** @brief Cancel a file descriptor |
340 | * @param ev Event loop | |
341 | * @param mode @c ev_read or @c ev_write | |
342 | * @param fd File descriptor | |
343 | * @return 0 on success, non-0 on error | |
344 | * | |
345 | * Sets @ref ev_source::escape, so no further processing of file descriptors | |
346 | * will occur this time round the event loop. | |
347 | */ | |
460b9539 | 348 | int ev_fd_cancel(ev_source *ev, ev_fdmode mode, int fd) { |
349 | int n; | |
350 | int maxfd; | |
351 | ||
352 | D(("cancelling mode %s fd %d", modenames[mode], fd)); | |
353 | /* find the right struct fd */ | |
354 | for(n = 0; n < ev->mode[mode].nfds && fd != ev->mode[mode].fds[n].fd; ++n) | |
355 | ; | |
356 | assert(n < ev->mode[mode].nfds); | |
357 | /* swap in the last fd and reduce the count */ | |
358 | if(n != ev->mode[mode].nfds - 1) | |
359 | ev->mode[mode].fds[n] = ev->mode[mode].fds[ev->mode[mode].nfds - 1]; | |
360 | --ev->mode[mode].nfds; | |
361 | /* if that was the biggest fd, find the new biggest one */ | |
362 | if(fd == ev->mode[mode].maxfd) { | |
363 | maxfd = 0; | |
364 | for(n = 0; n < ev->mode[mode].nfds; ++n) | |
365 | if(ev->mode[mode].fds[n].fd > maxfd) | |
366 | maxfd = ev->mode[mode].fds[n].fd; | |
367 | ev->mode[mode].maxfd = maxfd; | |
368 | } | |
369 | /* don't tell select about this fd any more */ | |
370 | FD_CLR(fd, &ev->mode[mode].enabled); | |
371 | ev->escape = 1; | |
372 | return 0; | |
373 | } | |
374 | ||
768d7355 RK |
375 | /** @brief Re-enable a file descriptor |
376 | * @param ev Event loop | |
377 | * @param mode @c ev_read or @c ev_write | |
378 | * @param fd File descriptor | |
379 | * @return 0 on success, non-0 on error | |
380 | * | |
381 | * It is harmless if @p fd is currently disabled, but it must not have been | |
382 | * cancelled. | |
383 | */ | |
460b9539 | 384 | int ev_fd_enable(ev_source *ev, ev_fdmode mode, int fd) { |
385 | D(("enabling mode %s fd %d", modenames[mode], fd)); | |
386 | FD_SET(fd, &ev->mode[mode].enabled); | |
387 | return 0; | |
388 | } | |
389 | ||
768d7355 RK |
390 | /** @brief Temporarily disable a file descriptor |
391 | * @param ev Event loop | |
392 | * @param mode @c ev_read or @c ev_write | |
393 | * @param fd File descriptor | |
394 | * @return 0 on success, non-0 on error | |
395 | * | |
396 | * Re-enable with ev_fd_enable(). It is harmless if @p fd is already disabled, | |
397 | * but it must not have been cancelled. | |
398 | */ | |
460b9539 | 399 | int ev_fd_disable(ev_source *ev, ev_fdmode mode, int fd) { |
400 | D(("disabling mode %s fd %d", modenames[mode], fd)); | |
401 | FD_CLR(fd, &ev->mode[mode].enabled); | |
402 | FD_CLR(fd, &ev->mode[mode].tripped); | |
403 | return 0; | |
404 | } | |
405 | ||
768d7355 RK |
406 | /** @brief Log a report of file descriptor state */ |
407 | void ev_report(ev_source *ev) { | |
408 | int n, fd; | |
409 | ev_fdmode mode; | |
410 | struct dynstr d[1]; | |
411 | char b[4096]; | |
412 | ||
413 | dynstr_init(d); | |
414 | for(mode = 0; mode < ev_nmodes; ++mode) { | |
415 | info("mode %s maxfd %d", modenames[mode], ev->mode[mode].maxfd); | |
416 | for(n = 0; n < ev->mode[mode].nfds; ++n) { | |
417 | fd = ev->mode[mode].fds[n].fd; | |
418 | info("fd %s %d%s%s (%s)", modenames[mode], fd, | |
419 | FD_ISSET(fd, &ev->mode[mode].enabled) ? " enabled" : "", | |
420 | FD_ISSET(fd, &ev->mode[mode].tripped) ? " tripped" : "", | |
421 | ev->mode[mode].fds[n].what); | |
422 | } | |
423 | d->nvec = 0; | |
424 | for(fd = 0; fd <= ev->mode[mode].maxfd; ++fd) { | |
425 | if(!FD_ISSET(fd, &ev->mode[mode].enabled)) | |
426 | continue; | |
427 | for(n = 0; n < ev->mode[mode].nfds; ++n) { | |
428 | if(ev->mode[mode].fds[n].fd == fd) | |
429 | break; | |
430 | } | |
431 | if(n < ev->mode[mode].nfds) | |
432 | snprintf(b, sizeof b, "%d(%s)", fd, ev->mode[mode].fds[n].what); | |
433 | else | |
434 | snprintf(b, sizeof b, "%d", fd); | |
435 | dynstr_append(d, ' '); | |
436 | dynstr_append_string(d, b); | |
437 | } | |
438 | dynstr_terminate(d); | |
439 | info("%s enabled:%s", modenames[mode], d->vec); | |
440 | } | |
441 | } | |
442 | ||
460b9539 | 443 | /* timeouts *******************************************************************/ |
444 | ||
768d7355 RK |
445 | /** @brief Register a timeout |
446 | * @param ev Event source | |
447 | * @param handle Where to store timeout handle, or @c NULL | |
448 | * @param when Earliest time to call @p callback, or @c NULL | |
449 | * @param callback Function to call at or after @p when | |
450 | * @param u Passed to @p callback | |
451 | * @return 0 on success, non-0 on error | |
452 | * | |
453 | * If @p when is a null pointer then a time of 0 is assumed. The effect is to | |
454 | * call the timeout handler from ev_run() next time around the event loop. | |
455 | * This is used internally to schedule various operations if it is not | |
456 | * convenient to call them from the current place in the call stack, or | |
457 | * externally to ensure that other clients of the event loop get a look in when | |
458 | * performing some lengthy operation. | |
459 | */ | |
460b9539 | 460 | int ev_timeout(ev_source *ev, |
461 | ev_timeout_handle *handlep, | |
462 | const struct timeval *when, | |
463 | ev_timeout_callback *callback, | |
464 | void *u) { | |
465 | struct timeout *t, *p, **pp; | |
466 | ||
467 | D(("registering timeout at %ld.%ld callback %p %p", | |
468 | when ? (long)when->tv_sec : 0, when ? (long)when->tv_usec : 0, | |
469 | (void *)callback, u)); | |
470 | t = xmalloc(sizeof *t); | |
471 | if(when) | |
472 | t->when = *when; | |
473 | t->callback = callback; | |
474 | t->u = u; | |
475 | pp = &ev->timeouts; | |
476 | while((p = *pp) && gt(&t->when, &p->when)) | |
477 | pp = &p->next; | |
478 | t->next = p; | |
479 | *pp = t; | |
480 | if(handlep) | |
481 | *handlep = t; | |
482 | return 0; | |
483 | } | |
484 | ||
768d7355 RK |
485 | /** @brief Cancel a timeout |
486 | * @param ev Event loop | |
487 | * @param handle Handle returned from ev_timeout() | |
488 | * @return 0 on success, non-0 on error | |
489 | */ | |
460b9539 | 490 | int ev_timeout_cancel(ev_source *ev, |
491 | ev_timeout_handle handle) { | |
492 | struct timeout *t = handle, *p, **pp; | |
493 | ||
494 | for(pp = &ev->timeouts; (p = *pp) && p != t; pp = &p->next) | |
495 | ; | |
496 | if(p) { | |
497 | *pp = p->next; | |
498 | return 0; | |
499 | } else | |
500 | return -1; | |
501 | } | |
502 | ||
503 | /* signals ********************************************************************/ | |
504 | ||
768d7355 RK |
505 | /** @brief Mapping of signals to pipe write ends |
506 | * | |
507 | * The pipes are per-event loop, it's possible in theory for there to be | |
508 | * multiple event loops (e.g. in different threads), although in fact DisOrder | |
509 | * does not do this. | |
510 | */ | |
460b9539 | 511 | static int sigfd[NSIG]; |
512 | ||
768d7355 RK |
513 | /** @brief The signal handler |
514 | * @param s Signal number | |
515 | * | |
516 | * Writes to @c sigfd[s]. | |
517 | */ | |
460b9539 | 518 | static void sighandler(int s) { |
519 | unsigned char sc = s; | |
520 | static const char errmsg[] = "error writing to signal pipe"; | |
521 | ||
522 | /* probably the reader has stopped listening for some reason */ | |
523 | if(write(sigfd[s], &sc, 1) < 0) { | |
524 | write(2, errmsg, sizeof errmsg - 1); | |
525 | abort(); | |
526 | } | |
527 | } | |
528 | ||
768d7355 | 529 | /** @brief Read callback for signals */ |
460b9539 | 530 | static int signal_read(ev_source *ev, |
531 | int attribute((unused)) fd, | |
532 | void attribute((unused)) *u) { | |
533 | unsigned char s; | |
534 | int n; | |
535 | int ret; | |
536 | ||
537 | if((n = read(ev->sigpipe[0], &s, 1)) == 1) | |
538 | if((ret = ev->signals[s].callback(ev, s, ev->signals[s].u))) | |
539 | return ret; | |
540 | assert(n != 0); | |
541 | if(n < 0 && (errno != EINTR && errno != EAGAIN)) { | |
542 | error(errno, "error reading from signal pipe %d", ev->sigpipe[0]); | |
543 | return -1; | |
544 | } | |
545 | return 0; | |
546 | } | |
547 | ||
768d7355 | 548 | /** @brief Close the signal pipe */ |
460b9539 | 549 | static void close_sigpipe(ev_source *ev) { |
550 | int save_errno = errno; | |
551 | ||
552 | xclose(ev->sigpipe[0]); | |
553 | xclose(ev->sigpipe[1]); | |
554 | ev->sigpipe[0] = ev->sigpipe[1] = -1; | |
555 | errno = save_errno; | |
556 | } | |
557 | ||
768d7355 RK |
558 | /** @brief Register a signal handler |
559 | * @param ev Event loop | |
560 | * @param sig Signal to handle | |
561 | * @param callback Called when signal is delivered | |
562 | * @param u Passed to @p callback | |
563 | * @return 0 on success, non-0 on error | |
564 | * | |
565 | * Note that @p callback is called from inside ev_run(), not from inside the | |
566 | * signal handler, so the usual restrictions on signal handlers do not apply. | |
567 | */ | |
460b9539 | 568 | int ev_signal(ev_source *ev, |
569 | int sig, | |
570 | ev_signal_callback *callback, | |
571 | void *u) { | |
572 | int n; | |
573 | struct sigaction sa; | |
574 | ||
575 | D(("registering signal %d handler callback %p %p", sig, (void *)callback, u)); | |
576 | assert(sig > 0); | |
577 | assert(sig < NSIG); | |
578 | assert(sig <= UCHAR_MAX); | |
579 | if(ev->sigpipe[0] == -1) { | |
580 | D(("creating signal pipe")); | |
581 | xpipe(ev->sigpipe); | |
582 | D(("signal pipe is %d, %d", ev->sigpipe[0], ev->sigpipe[1])); | |
583 | for(n = 0; n < 2; ++n) { | |
584 | nonblock(ev->sigpipe[n]); | |
585 | cloexec(ev->sigpipe[n]); | |
586 | } | |
e8c92ba7 | 587 | if(ev_fd(ev, ev_read, ev->sigpipe[0], signal_read, 0, "sigpipe read")) { |
460b9539 | 588 | close_sigpipe(ev); |
589 | return -1; | |
590 | } | |
591 | } | |
592 | sigaddset(&ev->sigmask, sig); | |
593 | xsigprocmask(SIG_BLOCK, &ev->sigmask, 0); | |
594 | sigfd[sig] = ev->sigpipe[1]; | |
595 | ev->signals[sig].callback = callback; | |
596 | ev->signals[sig].u = u; | |
597 | sa.sa_handler = sighandler; | |
598 | sigfillset(&sa.sa_mask); | |
599 | sa.sa_flags = SA_RESTART; | |
600 | xsigaction(sig, &sa, &ev->signals[sig].oldsa); | |
601 | ev->escape = 1; | |
602 | return 0; | |
603 | } | |
604 | ||
768d7355 RK |
605 | /** @brief Cancel a signal handler |
606 | * @param ev Event loop | |
607 | * @param sig Signal to cancel | |
608 | * @return 0 on success, non-0 on error | |
609 | */ | |
460b9539 | 610 | int ev_signal_cancel(ev_source *ev, |
611 | int sig) { | |
612 | sigset_t ss; | |
613 | ||
614 | xsigaction(sig, &ev->signals[sig].oldsa, 0); | |
615 | ev->signals[sig].callback = 0; | |
616 | ev->escape = 1; | |
617 | sigdelset(&ev->sigmask, sig); | |
618 | sigemptyset(&ss); | |
619 | sigaddset(&ss, sig); | |
620 | xsigprocmask(SIG_UNBLOCK, &ss, 0); | |
621 | return 0; | |
622 | } | |
623 | ||
768d7355 RK |
624 | /** @brief Clean up signal handling |
625 | * @param ev Event loop | |
626 | * | |
627 | * This function can be called from inside a fork. It restores signal | |
628 | * handlers, unblocks the signals, and closes the signal pipe for @p ev. | |
629 | */ | |
460b9539 | 630 | void ev_signal_atfork(ev_source *ev) { |
631 | int sig; | |
632 | ||
633 | if(ev->sigpipe[0] != -1) { | |
634 | /* revert any handled signals to their original state */ | |
635 | for(sig = 1; sig < NSIG; ++sig) { | |
636 | if(ev->signals[sig].callback != 0) | |
637 | xsigaction(sig, &ev->signals[sig].oldsa, 0); | |
638 | } | |
639 | /* and then unblock them */ | |
640 | xsigprocmask(SIG_UNBLOCK, &ev->sigmask, 0); | |
641 | /* don't want a copy of the signal pipe open inside the fork */ | |
642 | xclose(ev->sigpipe[0]); | |
643 | xclose(ev->sigpipe[1]); | |
644 | } | |
645 | } | |
646 | ||
647 | /* child processes ************************************************************/ | |
648 | ||
768d7355 | 649 | /** @brief Called on SIGCHLD */ |
460b9539 | 650 | static int sigchld_callback(ev_source *ev, |
651 | int attribute((unused)) sig, | |
652 | void attribute((unused)) *u) { | |
653 | struct rusage ru; | |
654 | pid_t r; | |
655 | int status, n, ret, revisit; | |
656 | ||
657 | do { | |
658 | revisit = 0; | |
659 | for(n = 0; n < ev->nchildren; ++n) { | |
660 | r = wait4(ev->children[n].pid, | |
661 | &status, | |
662 | ev->children[n].options | WNOHANG, | |
663 | &ru); | |
664 | if(r > 0) { | |
665 | ev_child_callback *c = ev->children[n].callback; | |
666 | void *cu = ev->children[n].u; | |
667 | ||
668 | if(WIFEXITED(status) || WIFSIGNALED(status)) | |
669 | ev_child_cancel(ev, r); | |
670 | revisit = 1; | |
671 | if((ret = c(ev, r, status, &ru, cu))) | |
672 | return ret; | |
673 | } else if(r < 0) { | |
674 | /* We should "never" get an ECHILD but it can in fact happen. For | |
675 | * instance on Linux 2.4.31, and probably other versions, if someone | |
676 | * straces a child process and then a different child process | |
677 | * terminates, when we wait4() the trace process we will get ECHILD | |
678 | * because it has been reparented to strace. Obviously this is a | |
679 | * hopeless design flaw in the tracing infrastructure, but we don't | |
680 | * want the disorder server to bomb out because of it. So we just log | |
681 | * the problem and ignore it. | |
682 | */ | |
683 | error(errno, "error calling wait4 for PID %lu (broken ptrace?)", | |
684 | (unsigned long)ev->children[n].pid); | |
685 | if(errno != ECHILD) | |
686 | return -1; | |
687 | } | |
688 | } | |
689 | } while(revisit); | |
690 | return 0; | |
691 | } | |
692 | ||
768d7355 RK |
693 | /** @brief Configure event loop for child process handling |
694 | * @return 0 on success, non-0 on error | |
695 | * | |
696 | * Currently at most one event loop can handle child processes and it must be | |
697 | * distinguished from others by calling this function on it. This could be | |
698 | * fixed but since no process ever makes use of more than one event loop there | |
699 | * is no need. | |
700 | */ | |
460b9539 | 701 | int ev_child_setup(ev_source *ev) { |
702 | D(("installing SIGCHLD handler")); | |
703 | return ev_signal(ev, SIGCHLD, sigchld_callback, 0); | |
704 | } | |
705 | ||
768d7355 RK |
706 | /** @brief Wait for a child process to terminate |
707 | * @param ev Event loop | |
708 | * @param pid Process ID of child | |
709 | * @param options Options to pass to @c wait4() | |
710 | * @param callback Called when child terminates (or possibly when it stops) | |
711 | * @param u Passed to @p callback | |
712 | * @return 0 on success, non-0 on error | |
713 | * | |
714 | * You must have called ev_child_setup() on @p ev once first. | |
715 | */ | |
460b9539 | 716 | int ev_child(ev_source *ev, |
717 | pid_t pid, | |
718 | int options, | |
719 | ev_child_callback *callback, | |
720 | void *u) { | |
721 | int n; | |
722 | ||
723 | D(("registering child handling %ld options %d callback %p %p", | |
724 | (long)pid, options, (void *)callback, u)); | |
725 | assert(ev->signals[SIGCHLD].callback == sigchld_callback); | |
726 | if(ev->nchildren >= ev->nchildslots) { | |
727 | ev->nchildslots = ev->nchildslots ? 2 * ev->nchildslots : 16; | |
728 | ev->children = xrealloc(ev->children, | |
729 | ev->nchildslots * sizeof (struct child)); | |
730 | } | |
731 | n = ev->nchildren++; | |
732 | ev->children[n].pid = pid; | |
733 | ev->children[n].options = options; | |
734 | ev->children[n].callback = callback; | |
735 | ev->children[n].u = u; | |
736 | return 0; | |
737 | } | |
738 | ||
768d7355 RK |
739 | /** @brief Stop waiting for a child process |
740 | * @param ev Event loop | |
741 | * @param pid Child process ID | |
742 | * @return 0 on success, non-0 on error | |
743 | */ | |
460b9539 | 744 | int ev_child_cancel(ev_source *ev, |
745 | pid_t pid) { | |
746 | int n; | |
747 | ||
748 | for(n = 0; n < ev->nchildren && ev->children[n].pid != pid; ++n) | |
749 | ; | |
750 | assert(n < ev->nchildren); | |
751 | if(n != ev->nchildren - 1) | |
752 | ev->children[n] = ev->children[ev->nchildren - 1]; | |
753 | --ev->nchildren; | |
754 | return 0; | |
755 | } | |
756 | ||
757 | /* socket listeners ***********************************************************/ | |
758 | ||
768d7355 | 759 | /** @brief State for a socket listener */ |
460b9539 | 760 | struct listen_state { |
761 | ev_listen_callback *callback; | |
762 | void *u; | |
763 | }; | |
764 | ||
768d7355 | 765 | /** @brief Called when a listenign socket is readable */ |
460b9539 | 766 | static int listen_callback(ev_source *ev, int fd, void *u) { |
767 | const struct listen_state *l = u; | |
768 | int newfd; | |
769 | union { | |
770 | struct sockaddr_in in; | |
771 | #if HAVE_STRUCT_SOCKADDR_IN6 | |
772 | struct sockaddr_in6 in6; | |
773 | #endif | |
774 | struct sockaddr_un un; | |
775 | struct sockaddr sa; | |
776 | } addr; | |
777 | socklen_t addrlen; | |
778 | int ret; | |
779 | ||
780 | D(("callback for listener fd %d", fd)); | |
781 | while((addrlen = sizeof addr), | |
782 | (newfd = accept(fd, &addr.sa, &addrlen)) >= 0) { | |
783 | if((ret = l->callback(ev, newfd, &addr.sa, addrlen, l->u))) | |
784 | return ret; | |
785 | } | |
786 | switch(errno) { | |
787 | case EINTR: | |
788 | case EAGAIN: | |
789 | break; | |
790 | #ifdef ECONNABORTED | |
791 | case ECONNABORTED: | |
792 | error(errno, "error calling accept"); | |
793 | break; | |
794 | #endif | |
795 | #ifdef EPROTO | |
796 | case EPROTO: | |
797 | /* XXX on some systems EPROTO should be fatal, but we don't know if | |
798 | * we're running on one of them */ | |
799 | error(errno, "error calling accept"); | |
800 | break; | |
801 | #endif | |
802 | default: | |
803 | fatal(errno, "error calling accept"); | |
804 | break; | |
805 | } | |
806 | if(errno != EINTR && errno != EAGAIN) | |
807 | error(errno, "error calling accept"); | |
808 | return 0; | |
809 | } | |
810 | ||
768d7355 RK |
811 | /** @brief Listen on a socket for inbound stream connections |
812 | * @param ev Event source | |
813 | * @param fd File descriptor of socket | |
814 | * @param callback Called when a new connection arrives | |
815 | * @param u Passed to @p callback | |
816 | * @param what Text description of socket | |
817 | * @return 0 on success, non-0 on error | |
818 | */ | |
460b9539 | 819 | int ev_listen(ev_source *ev, |
820 | int fd, | |
821 | ev_listen_callback *callback, | |
e8c92ba7 RK |
822 | void *u, |
823 | const char *what) { | |
460b9539 | 824 | struct listen_state *l = xmalloc(sizeof *l); |
825 | ||
826 | D(("registering listener fd %d callback %p %p", fd, (void *)callback, u)); | |
827 | l->callback = callback; | |
828 | l->u = u; | |
e8c92ba7 | 829 | return ev_fd(ev, ev_read, fd, listen_callback, l, what); |
460b9539 | 830 | } |
831 | ||
768d7355 RK |
832 | /** @brief Stop listening on a socket |
833 | * @param ev Event loop | |
834 | * @param fd File descriptor of socket | |
835 | * @return 0 on success, non-0 on error | |
836 | */ | |
460b9539 | 837 | int ev_listen_cancel(ev_source *ev, int fd) { |
838 | D(("cancelling listener fd %d", fd)); | |
839 | return ev_fd_cancel(ev, ev_read, fd); | |
840 | } | |
841 | ||
842 | /* buffer *********************************************************************/ | |
843 | ||
768d7355 | 844 | /** @brief Buffer structure */ |
460b9539 | 845 | struct buffer { |
846 | char *base, *start, *end, *top; | |
847 | }; | |
848 | ||
768d7355 | 849 | /* @brief Make sure there is @p bytes available at @c b->end */ |
460b9539 | 850 | static void buffer_space(struct buffer *b, size_t bytes) { |
851 | D(("buffer_space %p %p %p %p want %lu", | |
852 | (void *)b->base, (void *)b->start, (void *)b->end, (void *)b->top, | |
853 | (unsigned long)bytes)); | |
854 | if(b->start == b->end) | |
855 | b->start = b->end = b->base; | |
856 | if((size_t)(b->top - b->end) < bytes) { | |
857 | if((size_t)((b->top - b->end) + (b->start - b->base)) < bytes) { | |
858 | size_t newspace = b->end - b->start + bytes, n; | |
859 | char *newbase; | |
860 | ||
861 | for(n = 16; n < newspace; n *= 2) | |
862 | ; | |
863 | newbase = xmalloc_noptr(n); | |
864 | memcpy(newbase, b->start, b->end - b->start); | |
865 | b->base = newbase; | |
866 | b->end = newbase + (b->end - b->start); | |
867 | b->top = newbase + n; | |
868 | b->start = newbase; /* must be last */ | |
869 | } else { | |
870 | memmove(b->base, b->start, b->end - b->start); | |
871 | b->end = b->base + (b->end - b->start); | |
872 | b->start = b->base; | |
873 | } | |
874 | } | |
875 | D(("result %p %p %p %p", | |
876 | (void *)b->base, (void *)b->start, (void *)b->end, (void *)b->top)); | |
877 | } | |
878 | ||
879 | /* buffered writer ************************************************************/ | |
880 | ||
768d7355 | 881 | /** @brief State structure for a buffered writer */ |
460b9539 | 882 | struct ev_writer { |
883 | struct sink s; | |
884 | struct buffer b; | |
885 | int fd; | |
886 | int eof; | |
887 | ev_error_callback *callback; | |
888 | void *u; | |
889 | ev_source *ev; | |
890 | }; | |
891 | ||
768d7355 | 892 | /** @brief Called when a writer's file descriptor is writable */ |
460b9539 | 893 | static int writer_callback(ev_source *ev, int fd, void *u) { |
894 | ev_writer *w = u; | |
895 | int n; | |
896 | ||
897 | n = write(fd, w->b.start, w->b.end - w->b.start); | |
898 | D(("callback for writer fd %d, %ld bytes, n=%d, errno=%d", | |
899 | fd, (long)(w->b.end - w->b.start), n, errno)); | |
900 | if(n >= 0) { | |
901 | w->b.start += n; | |
902 | if(w->b.start == w->b.end) { | |
903 | if(w->eof) { | |
904 | ev_fd_cancel(ev, ev_write, fd); | |
905 | return w->callback(ev, fd, 0, w->u); | |
906 | } else | |
907 | ev_fd_disable(ev, ev_write, fd); | |
908 | } | |
909 | } else { | |
910 | switch(errno) { | |
911 | case EINTR: | |
912 | case EAGAIN: | |
913 | break; | |
914 | default: | |
915 | ev_fd_cancel(ev, ev_write, fd); | |
916 | return w->callback(ev, fd, errno, w->u); | |
917 | } | |
918 | } | |
919 | return 0; | |
920 | } | |
921 | ||
768d7355 RK |
922 | /** @brief Write bytes to a writer's buffer |
923 | * | |
924 | * This is the sink write callback. | |
925 | * | |
926 | * Calls ev_fd_enable() if necessary (i.e. if the buffer was empty but | |
927 | * now is not). | |
928 | */ | |
460b9539 | 929 | static int ev_writer_write(struct sink *sk, const void *s, int n) { |
930 | ev_writer *w = (ev_writer *)sk; | |
931 | ||
932 | buffer_space(&w->b, n); | |
933 | if(w->b.start == w->b.end) | |
934 | ev_fd_enable(w->ev, ev_write, w->fd); | |
935 | memcpy(w->b.end, s, n); | |
936 | w->b.end += n; | |
937 | return 0; | |
938 | } | |
939 | ||
768d7355 RK |
940 | /** @brief Create a new buffered writer |
941 | * @param ev Event loop | |
942 | * @param fd File descriptor to write to | |
943 | * @param callback Called if an error occurs and when finished | |
944 | * @param u Passed to @p callback | |
945 | * @param what Text description | |
946 | * @return New writer or @c NULL | |
947 | */ | |
460b9539 | 948 | ev_writer *ev_writer_new(ev_source *ev, |
949 | int fd, | |
950 | ev_error_callback *callback, | |
e8c92ba7 RK |
951 | void *u, |
952 | const char *what) { | |
460b9539 | 953 | ev_writer *w = xmalloc(sizeof *w); |
954 | ||
955 | D(("registering writer fd %d callback %p %p", fd, (void *)callback, u)); | |
956 | w->s.write = ev_writer_write; | |
957 | w->fd = fd; | |
958 | w->callback = callback; | |
959 | w->u = u; | |
960 | w->ev = ev; | |
e8c92ba7 | 961 | if(ev_fd(ev, ev_write, fd, writer_callback, w, what)) |
460b9539 | 962 | return 0; |
963 | ev_fd_disable(ev, ev_write, fd); | |
964 | return w; | |
965 | } | |
966 | ||
768d7355 RK |
967 | /** @brief Return the sink associated with a writer |
968 | * @param w Writer | |
969 | * @return Pointer to sink | |
970 | * | |
971 | * Writing to the sink will arrange for those bytes to be written to the file | |
972 | * descriptor as and when it is writable. | |
973 | */ | |
460b9539 | 974 | struct sink *ev_writer_sink(ev_writer *w) { |
975 | return &w->s; | |
976 | } | |
977 | ||
768d7355 RK |
978 | /** @brief Shutdown callback |
979 | * | |
980 | * See ev_writer_close(). | |
981 | */ | |
460b9539 | 982 | static int writer_shutdown(ev_source *ev, |
983 | const attribute((unused)) struct timeval *now, | |
984 | void *u) { | |
985 | ev_writer *w = u; | |
986 | ||
987 | return w->callback(ev, w->fd, 0, w->u); | |
988 | } | |
989 | ||
768d7355 RK |
990 | /** @brief Close a writer |
991 | * @param w Writer to close | |
992 | * @return 0 on success, non-0 on error | |
993 | * | |
994 | * Close a writer. No more bytes should be written to its sink. | |
995 | * | |
996 | * When the last byte has been written the callback will be called with an | |
997 | * error code of 0. It is guaranteed that this will NOT happen before | |
998 | * ev_writer_close() returns (although the file descriptor for the writer might | |
999 | * be cancelled by the time it returns). | |
1000 | */ | |
460b9539 | 1001 | int ev_writer_close(ev_writer *w) { |
1002 | D(("close writer fd %d", w->fd)); | |
1003 | w->eof = 1; | |
1004 | if(w->b.start == w->b.end) { | |
1005 | /* we're already finished */ | |
1006 | ev_fd_cancel(w->ev, ev_write, w->fd); | |
1007 | return ev_timeout(w->ev, 0, 0, writer_shutdown, w); | |
1008 | } | |
1009 | return 0; | |
1010 | } | |
1011 | ||
768d7355 RK |
1012 | /** @brief Cancel a writer discarding any buffered data |
1013 | * @param w Writer to close | |
1014 | * @return 0 on success, non-0 on error | |
1015 | * | |
1016 | * This cancels a writer immediately. Any unwritten buffered data is discarded | |
1017 | * and the error callback is never called. This is appropriate to call if (for | |
1018 | * instance) the read half of a TCP connection is known to have failed and the | |
1019 | * writer is therefore obsolete. | |
1020 | */ | |
460b9539 | 1021 | int ev_writer_cancel(ev_writer *w) { |
1022 | D(("cancel writer fd %d", w->fd)); | |
1023 | return ev_fd_cancel(w->ev, ev_write, w->fd); | |
1024 | } | |
1025 | ||
768d7355 RK |
1026 | /** @brief Attempt to flush a writer |
1027 | * @param w Writer to flush | |
1028 | * @return 0 on success, non-0 on error | |
1029 | * | |
1030 | * Does a speculative write of any buffered data. Does not block if it cannot | |
1031 | * be written. | |
1032 | */ | |
460b9539 | 1033 | int ev_writer_flush(ev_writer *w) { |
1034 | return writer_callback(w->ev, w->fd, w); | |
1035 | } | |
1036 | ||
1037 | /* buffered reader ************************************************************/ | |
1038 | ||
768d7355 | 1039 | /** @brief State structure for a buffered reader */ |
460b9539 | 1040 | struct ev_reader { |
1041 | struct buffer b; | |
1042 | int fd; | |
1043 | ev_reader_callback *callback; | |
1044 | ev_error_callback *error_callback; | |
1045 | void *u; | |
1046 | ev_source *ev; | |
1047 | int eof; | |
1048 | }; | |
1049 | ||
768d7355 | 1050 | /** @brief Called when a reader's @p fd is readable */ |
460b9539 | 1051 | static int reader_callback(ev_source *ev, int fd, void *u) { |
1052 | ev_reader *r = u; | |
1053 | int n; | |
1054 | ||
1055 | buffer_space(&r->b, 1); | |
1056 | n = read(fd, r->b.end, r->b.top - r->b.end); | |
1057 | D(("read fd %d buffer %d returned %d errno %d", | |
1058 | fd, (int)(r->b.top - r->b.end), n, errno)); | |
1059 | if(n > 0) { | |
1060 | r->b.end += n; | |
1061 | return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 0, r->u); | |
1062 | } else if(n == 0) { | |
1063 | r->eof = 1; | |
1064 | ev_fd_cancel(ev, ev_read, fd); | |
1065 | return r->callback(ev, r, fd, r->b.start, r->b.end - r->b.start, 1, r->u); | |
1066 | } else { | |
1067 | switch(errno) { | |
1068 | case EINTR: | |
1069 | case EAGAIN: | |
1070 | break; | |
1071 | default: | |
1072 | ev_fd_cancel(ev, ev_read, fd); | |
1073 | return r->error_callback(ev, fd, errno, r->u); | |
1074 | } | |
1075 | } | |
1076 | return 0; | |
1077 | } | |
1078 | ||
768d7355 RK |
1079 | /** @brief Create a new buffered reader |
1080 | * @param ev Event loop | |
1081 | * @param fd File descriptor to read from | |
1082 | * @param callback Called when new data is available | |
1083 | * @param error_callback Called if an error occurs | |
1084 | * @param u Passed to callbacks | |
1085 | * @param what Text description | |
1086 | * @return New reader or @c NULL | |
1087 | */ | |
460b9539 | 1088 | ev_reader *ev_reader_new(ev_source *ev, |
1089 | int fd, | |
1090 | ev_reader_callback *callback, | |
1091 | ev_error_callback *error_callback, | |
e8c92ba7 RK |
1092 | void *u, |
1093 | const char *what) { | |
460b9539 | 1094 | ev_reader *r = xmalloc(sizeof *r); |
1095 | ||
1096 | D(("registering reader fd %d callback %p %p %p", | |
1097 | fd, (void *)callback, (void *)error_callback, u)); | |
1098 | r->fd = fd; | |
1099 | r->callback = callback; | |
1100 | r->error_callback = error_callback; | |
1101 | r->u = u; | |
1102 | r->ev = ev; | |
e8c92ba7 | 1103 | if(ev_fd(ev, ev_read, fd, reader_callback, r, what)) |
460b9539 | 1104 | return 0; |
1105 | return r; | |
1106 | } | |
1107 | ||
1108 | void ev_reader_buffer(ev_reader *r, size_t nbytes) { | |
1109 | buffer_space(&r->b, nbytes - (r->b.end - r->b.start)); | |
1110 | } | |
1111 | ||
768d7355 RK |
1112 | /** @brief Consume @p n bytes from the reader's buffer |
1113 | * @param r Reader | |
1114 | * @param n Number of bytes to consume | |
1115 | * | |
1116 | * Tells the reader than the next @p n bytes have been dealt with and can now | |
1117 | * be discarded. | |
1118 | */ | |
460b9539 | 1119 | void ev_reader_consume(ev_reader *r, size_t n) { |
1120 | r->b.start += n; | |
1121 | } | |
1122 | ||
768d7355 RK |
1123 | /** @brief Cancel a reader |
1124 | * @param r Reader | |
1125 | * @return 0 on success, non-0 on error | |
1126 | */ | |
460b9539 | 1127 | int ev_reader_cancel(ev_reader *r) { |
1128 | D(("cancel reader fd %d", r->fd)); | |
1129 | return ev_fd_cancel(r->ev, ev_read, r->fd); | |
1130 | } | |
1131 | ||
768d7355 RK |
1132 | /** @brief Temporarily disable a reader |
1133 | * @param r Reader | |
1134 | * @return 0 on success, non-0 on error | |
1135 | * | |
1136 | * No further callbacks for this reader will be made. Re-enable with | |
1137 | * ev_reader_enable(). | |
1138 | */ | |
460b9539 | 1139 | int ev_reader_disable(ev_reader *r) { |
1140 | D(("disable reader fd %d", r->fd)); | |
1141 | return r->eof ? 0 : ev_fd_disable(r->ev, ev_read, r->fd); | |
1142 | } | |
1143 | ||
768d7355 | 1144 | /** @brief Called from ev_run() for ev_reader_incomplete() */ |
460b9539 | 1145 | static int reader_continuation(ev_source attribute((unused)) *ev, |
1146 | const attribute((unused)) struct timeval *now, | |
1147 | void *u) { | |
1148 | ev_reader *r = u; | |
1149 | ||
1150 | D(("reader continuation callback fd %d", r->fd)); | |
1151 | if(ev_fd_enable(r->ev, ev_read, r->fd)) return -1; | |
1152 | return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u); | |
1153 | } | |
1154 | ||
768d7355 RK |
1155 | /** @brief Arrange another callback |
1156 | * @param r reader | |
1157 | * @return 0 on success, non-0 on error | |
1158 | * | |
1159 | * Indicates that the reader can process more input but would like to yield to | |
1160 | * other clients of the event loop. Input will be disabled but it will be | |
1161 | * re-enabled on the next iteration of the event loop and the read callback | |
1162 | * will be called again (even if no further bytes are available). | |
1163 | */ | |
460b9539 | 1164 | int ev_reader_incomplete(ev_reader *r) { |
1165 | if(ev_fd_disable(r->ev, ev_read, r->fd)) return -1; | |
1166 | return ev_timeout(r->ev, 0, 0, reader_continuation, r); | |
1167 | } | |
1168 | ||
1169 | static int reader_enabled(ev_source *ev, | |
1170 | const attribute((unused)) struct timeval *now, | |
1171 | void *u) { | |
1172 | ev_reader *r = u; | |
1173 | ||
1174 | D(("reader enabled callback fd %d", r->fd)); | |
1175 | return r->callback(ev, r, r->fd, r->b.start, r->b.end - r->b.start, r->eof, r->u); | |
1176 | } | |
1177 | ||
768d7355 RK |
1178 | /** @brief Re-enable reading |
1179 | * @param r reader | |
1180 | * @return 0 on success, non-0 on error | |
1181 | * | |
1182 | * If there is unconsumed data then you get a callback next time round the | |
1183 | * event loop even if nothing new has been read. | |
1184 | * | |
1185 | * The idea is in your read callback you come across a line (or whatever) that | |
1186 | * can't be processed immediately. So you set up processing and disable | |
1187 | * reading with ev_reader_disable(). Later when you finish processing you | |
1188 | * re-enable. You'll automatically get another callback directly from the | |
1189 | * event loop (i.e. not from inside ev_reader_enable()) so you can handle the | |
1190 | * next line (or whatever) if the whole thing has in fact already arrived. | |
1191 | */ | |
460b9539 | 1192 | int ev_reader_enable(ev_reader *r) { |
1193 | D(("enable reader fd %d", r->fd)); | |
1194 | return ((r->eof ? 0 : ev_fd_enable(r->ev, ev_read, r->fd)) | |
1195 | || ev_timeout(r->ev, 0, 0, reader_enabled, r)) ? -1 : 0; | |
1196 | } | |
1197 | ||
1198 | /* | |
1199 | Local Variables: | |
1200 | c-basic-offset:2 | |
1201 | comment-column:40 | |
1202 | fill-column:79 | |
1203 | End: | |
1204 | */ |