b49b2e1df9c9d77033deac81c1bbb499ba1db340
2 * This file is part of DisOrder.
3 * Copyright (C) 2004, 2005 Richard Kettlewell
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26 #include <sys/types.h>
27 #include <sys/resource.h>
35 #include <sys/socket.h>
36 #include <netinet/in.h>
49 ev_timeout_callback
*callback
;
56 ev_fd_callback
*callback
;
69 struct sigaction oldsa
;
70 ev_signal_callback
*callback
;
77 ev_child_callback
*callback
;
82 struct fdmode mode
[ev_nmodes
];
83 struct timeout
*timeouts
;
84 struct signal signals
[NSIG
];
88 int nchildren
, nchildslots
;
89 struct child
*children
;
92 static const char *modenames
[] = { "read", "write", "except" };
94 /* utilities ******************************************************************/
96 static inline int gt(const struct timeval
*a
, const struct timeval
*b
) {
97 if(a
->tv_sec
> b
->tv_sec
)
99 if(a
->tv_sec
== b
->tv_sec
100 && a
->tv_usec
> b
->tv_usec
)
105 static inline int ge(const struct timeval
*a
, const struct timeval
*b
) {
109 /* creation *******************************************************************/
111 ev_source
*ev_new(void) {
112 ev_source
*ev
= xmalloc(sizeof *ev
);
115 memset(ev
, 0, sizeof *ev
);
116 for(n
= 0; n
< ev_nmodes
; ++n
)
117 FD_ZERO(&ev
->mode
[n
].enabled
);
118 ev
->sigpipe
[0] = ev
->sigpipe
[1] = -1;
119 sigemptyset(&ev
->sigmask
);
123 /* event loop *****************************************************************/
125 int ev_run(ev_source
*ev
) {
128 struct timeval delta
;
132 struct timeout
*t
, **tt
;
134 xgettimeofday(&now
, 0);
135 /* Handle timeouts. We don't want to handle any timeouts that are added
136 * while we're handling them (otherwise we'd have to break out of infinite
137 * loops, preferrably without starving better-behaved subsystems). Hence
138 * the slightly complicated two-phase approach here. */
139 for(t
= ev
->timeouts
;
140 t
&& ge(&now
, &t
->when
);
143 D(("calling timeout for %ld.%ld callback %p %p",
144 (long)t
->when
.tv_sec
, (long)t
->when
.tv_usec
,
145 (void *)t
->callback
, t
->u
));
146 ret
= t
->callback(ev
, &now
, t
->u
);
158 for(mode
= 0; mode
< ev_nmodes
; ++mode
) {
159 ev
->mode
[mode
].tripped
= ev
->mode
[mode
].enabled
;
160 if(ev
->mode
[mode
].maxfd
> maxfd
)
161 maxfd
= ev
->mode
[mode
].maxfd
;
163 xsigprocmask(SIG_UNBLOCK
, &ev
->sigmask
, 0);
166 xgettimeofday(&now
, 0);
167 delta
.tv_sec
= ev
->timeouts
->when
.tv_sec
- now
.tv_sec
;
168 delta
.tv_usec
= ev
->timeouts
->when
.tv_usec
- now
.tv_usec
;
169 if(delta
.tv_usec
< 0) {
170 delta
.tv_usec
+= 1000000;
174 delta
.tv_sec
= delta
.tv_usec
= 0;
175 n
= select(maxfd
+ 1,
176 &ev
->mode
[ev_read
].tripped
,
177 &ev
->mode
[ev_write
].tripped
,
178 &ev
->mode
[ev_except
].tripped
,
181 n
= select(maxfd
+ 1,
182 &ev
->mode
[ev_read
].tripped
,
183 &ev
->mode
[ev_write
].tripped
,
184 &ev
->mode
[ev_except
].tripped
,
187 } while(n
< 0 && errno
== EINTR
);
188 xsigprocmask(SIG_BLOCK
, &ev
->sigmask
, 0);
190 error(errno
, "error calling select");
194 /* if anything deranges the meaning of an fd, or re-orders the
195 * fds[] tables, we'd better give up; such operations will
196 * therefore set @escape@. */
198 for(mode
= 0; mode
< ev_nmodes
&& !ev
->escape
; ++mode
)
199 for(n
= 0; n
< ev
->mode
[mode
].nfds
&& !ev
->escape
; ++n
) {
200 int fd
= ev
->mode
[mode
].fds
[n
].fd
;
201 if(FD_ISSET(fd
, &ev
->mode
[mode
].tripped
)) {
202 D(("calling %s fd %d callback %p %p", modenames
[mode
], fd
,
203 (void *)ev
->mode
[mode
].fds
[n
].callback
,
204 ev
->mode
[mode
].fds
[n
].u
));
205 ret
= ev
->mode
[mode
].fds
[n
].callback(ev
, fd
,
206 ev
->mode
[mode
].fds
[n
].u
);
212 /* we'll pick up timeouts back round the loop */
216 /* file descriptors ***********************************************************/
218 int ev_fd(ev_source
*ev
,
221 ev_fd_callback
*callback
,
225 D(("registering %s fd %d callback %p %p", modenames
[mode
], fd
,
226 (void *)callback
, u
));
227 assert(mode
< ev_nmodes
);
228 if(ev
->mode
[mode
].nfds
>= ev
->mode
[mode
].fdslots
) {
229 ev
->mode
[mode
].fdslots
= (ev
->mode
[mode
].fdslots
230 ?
2 * ev
->mode
[mode
].fdslots
: 16);
231 D(("expanding %s fd table to %d entries", modenames
[mode
],
232 ev
->mode
[mode
].fdslots
));
233 ev
->mode
[mode
].fds
= xrealloc(ev
->mode
[mode
].fds
,
234 ev
->mode
[mode
].fdslots
* sizeof (struct fd
));
236 n
= ev
->mode
[mode
].nfds
++;
237 FD_SET(fd
, &ev
->mode
[mode
].enabled
);
238 ev
->mode
[mode
].fds
[n
].fd
= fd
;
239 ev
->mode
[mode
].fds
[n
].callback
= callback
;
240 ev
->mode
[mode
].fds
[n
].u
= u
;
241 if(fd
> ev
->mode
[mode
].maxfd
)
242 ev
->mode
[mode
].maxfd
= fd
;
247 int ev_fd_cancel(ev_source
*ev
, ev_fdmode mode
, int fd
) {
251 D(("cancelling mode %s fd %d", modenames
[mode
], fd
));
252 /* find the right struct fd */
253 for(n
= 0; n
< ev
->mode
[mode
].nfds
&& fd
!= ev
->mode
[mode
].fds
[n
].fd
; ++n
)
255 assert(n
< ev
->mode
[mode
].nfds
);
256 /* swap in the last fd and reduce the count */
257 if(n
!= ev
->mode
[mode
].nfds
- 1)
258 ev
->mode
[mode
].fds
[n
] = ev
->mode
[mode
].fds
[ev
->mode
[mode
].nfds
- 1];
259 --ev
->mode
[mode
].nfds
;
260 /* if that was the biggest fd, find the new biggest one */
261 if(fd
== ev
->mode
[mode
].maxfd
) {
263 for(n
= 0; n
< ev
->mode
[mode
].nfds
; ++n
)
264 if(ev
->mode
[mode
].fds
[n
].fd
> maxfd
)
265 maxfd
= ev
->mode
[mode
].fds
[n
].fd
;
266 ev
->mode
[mode
].maxfd
= maxfd
;
268 /* don't tell select about this fd any more */
269 FD_CLR(fd
, &ev
->mode
[mode
].enabled
);
274 int ev_fd_enable(ev_source
*ev
, ev_fdmode mode
, int fd
) {
275 D(("enabling mode %s fd %d", modenames
[mode
], fd
));
276 FD_SET(fd
, &ev
->mode
[mode
].enabled
);
280 int ev_fd_disable(ev_source
*ev
, ev_fdmode mode
, int fd
) {
281 D(("disabling mode %s fd %d", modenames
[mode
], fd
));
282 FD_CLR(fd
, &ev
->mode
[mode
].enabled
);
283 FD_CLR(fd
, &ev
->mode
[mode
].tripped
);
287 /* timeouts *******************************************************************/
289 int ev_timeout(ev_source
*ev
,
290 ev_timeout_handle
*handlep
,
291 const struct timeval
*when
,
292 ev_timeout_callback
*callback
,
294 struct timeout
*t
, *p
, **pp
;
296 D(("registering timeout at %ld.%ld callback %p %p",
297 when ?
(long)when
->tv_sec
: 0, when ?
(long)when
->tv_usec
: 0,
298 (void *)callback
, u
));
299 t
= xmalloc(sizeof *t
);
302 t
->callback
= callback
;
305 while((p
= *pp
) && gt(&t
->when
, &p
->when
))
314 int ev_timeout_cancel(ev_source
*ev
,
315 ev_timeout_handle handle
) {
316 struct timeout
*t
= handle
, *p
, **pp
;
318 for(pp
= &ev
->timeouts
; (p
= *pp
) && p
!= t
; pp
= &p
->next
)
327 /* signals ********************************************************************/
329 static int sigfd
[NSIG
];
331 static void sighandler(int s
) {
332 unsigned char sc
= s
;
333 static const char errmsg
[] = "error writing to signal pipe";
335 /* probably the reader has stopped listening for some reason */
336 if(write(sigfd
[s
], &sc
, 1) < 0) {
337 write(2, errmsg
, sizeof errmsg
- 1);
342 static int signal_read(ev_source
*ev
,
343 int attribute((unused
)) fd
,
344 void attribute((unused
)) *u
) {
349 if((n
= read(ev
->sigpipe
[0], &s
, 1)) == 1)
350 if((ret
= ev
->signals
[s
].callback(ev
, s
, ev
->signals
[s
].u
)))
353 if(n
< 0 && (errno
!= EINTR
&& errno
!= EAGAIN
)) {
354 error(errno
, "error reading from signal pipe %d", ev
->sigpipe
[0]);
360 static void close_sigpipe(ev_source
*ev
) {
361 int save_errno
= errno
;
363 xclose(ev
->sigpipe
[0]);
364 xclose(ev
->sigpipe
[1]);
365 ev
->sigpipe
[0] = ev
->sigpipe
[1] = -1;
369 int ev_signal(ev_source
*ev
,
371 ev_signal_callback
*callback
,
376 D(("registering signal %d handler callback %p %p", sig
, (void *)callback
, u
));
379 assert(sig
<= UCHAR_MAX
);
380 if(ev
->sigpipe
[0] == -1) {
381 D(("creating signal pipe"));
383 D(("signal pipe is %d, %d", ev
->sigpipe
[0], ev
->sigpipe
[1]));
384 for(n
= 0; n
< 2; ++n
) {
385 nonblock(ev
->sigpipe
[n
]);
386 cloexec(ev
->sigpipe
[n
]);
388 if(ev_fd(ev
, ev_read
, ev
->sigpipe
[0], signal_read
, 0)) {
393 sigaddset(&ev
->sigmask
, sig
);
394 xsigprocmask(SIG_BLOCK
, &ev
->sigmask
, 0);
395 sigfd
[sig
] = ev
->sigpipe
[1];
396 ev
->signals
[sig
].callback
= callback
;
397 ev
->signals
[sig
].u
= u
;
398 sa
.sa_handler
= sighandler
;
399 sigfillset(&sa
.sa_mask
);
400 sa
.sa_flags
= SA_RESTART
;
401 xsigaction(sig
, &sa
, &ev
->signals
[sig
].oldsa
);
406 int ev_signal_cancel(ev_source
*ev
,
410 xsigaction(sig
, &ev
->signals
[sig
].oldsa
, 0);
411 ev
->signals
[sig
].callback
= 0;
413 sigdelset(&ev
->sigmask
, sig
);
416 xsigprocmask(SIG_UNBLOCK
, &ss
, 0);
420 void ev_signal_atfork(ev_source
*ev
) {
423 if(ev
->sigpipe
[0] != -1) {
424 /* revert any handled signals to their original state */
425 for(sig
= 1; sig
< NSIG
; ++sig
) {
426 if(ev
->signals
[sig
].callback
!= 0)
427 xsigaction(sig
, &ev
->signals
[sig
].oldsa
, 0);
429 /* and then unblock them */
430 xsigprocmask(SIG_UNBLOCK
, &ev
->sigmask
, 0);
431 /* don't want a copy of the signal pipe open inside the fork */
432 xclose(ev
->sigpipe
[0]);
433 xclose(ev
->sigpipe
[1]);
437 /* child processes ************************************************************/
439 static int sigchld_callback(ev_source
*ev
,
440 int attribute((unused
)) sig
,
441 void attribute((unused
)) *u
) {
444 int status
, n
, ret
, revisit
;
448 for(n
= 0; n
< ev
->nchildren
; ++n
) {
449 r
= wait4(ev
->children
[n
].pid
,
451 ev
->children
[n
].options
| WNOHANG
,
454 ev_child_callback
*c
= ev
->children
[n
].callback
;
455 void *cu
= ev
->children
[n
].u
;
457 if(WIFEXITED(status
) || WIFSIGNALED(status
))
458 ev_child_cancel(ev
, r
);
460 if((ret
= c(ev
, r
, status
, &ru
, cu
)))
463 /* We should "never" get an ECHILD but it can in fact happen. For
464 * instance on Linux 2.4.31, and probably other versions, if someone
465 * straces a child process and then a different child process
466 * terminates, when we wait4() the trace process we will get ECHILD
467 * because it has been reparented to strace. Obviously this is a
468 * hopeless design flaw in the tracing infrastructure, but we don't
469 * want the disorder server to bomb out because of it. So we just log
470 * the problem and ignore it.
472 error(errno
, "error calling wait4 for PID %lu (broken ptrace?)",
473 (unsigned long)ev
->children
[n
].pid
);
482 int ev_child_setup(ev_source
*ev
) {
483 D(("installing SIGCHLD handler"));
484 return ev_signal(ev
, SIGCHLD
, sigchld_callback
, 0);
487 int ev_child(ev_source
*ev
,
490 ev_child_callback
*callback
,
494 D(("registering child handling %ld options %d callback %p %p",
495 (long)pid
, options
, (void *)callback
, u
));
496 assert(ev
->signals
[SIGCHLD
].callback
== sigchld_callback
);
497 if(ev
->nchildren
>= ev
->nchildslots
) {
498 ev
->nchildslots
= ev
->nchildslots ?
2 * ev
->nchildslots
: 16;
499 ev
->children
= xrealloc(ev
->children
,
500 ev
->nchildslots
* sizeof (struct child
));
503 ev
->children
[n
].pid
= pid
;
504 ev
->children
[n
].options
= options
;
505 ev
->children
[n
].callback
= callback
;
506 ev
->children
[n
].u
= u
;
510 int ev_child_cancel(ev_source
*ev
,
514 for(n
= 0; n
< ev
->nchildren
&& ev
->children
[n
].pid
!= pid
; ++n
)
516 assert(n
< ev
->nchildren
);
517 if(n
!= ev
->nchildren
- 1)
518 ev
->children
[n
] = ev
->children
[ev
->nchildren
- 1];
523 /* socket listeners ***********************************************************/
525 struct listen_state
{
526 ev_listen_callback
*callback
;
530 static int listen_callback(ev_source
*ev
, int fd
, void *u
) {
531 const struct listen_state
*l
= u
;
534 struct sockaddr_in in
;
535 #if HAVE_STRUCT_SOCKADDR_IN6
536 struct sockaddr_in6 in6
;
538 struct sockaddr_un un
;
544 D(("callback for listener fd %d", fd
));
545 while((addrlen
= sizeof addr
),
546 (newfd
= accept(fd
, &addr
.sa
, &addrlen
)) >= 0) {
547 if((ret
= l
->callback(ev
, newfd
, &addr
.sa
, addrlen
, l
->u
)))
556 error(errno
, "error calling accept");
561 /* XXX on some systems EPROTO should be fatal, but we don't know if
562 * we're running on one of them */
563 error(errno
, "error calling accept");
567 fatal(errno
, "error calling accept");
570 if(errno
!= EINTR
&& errno
!= EAGAIN
)
571 error(errno
, "error calling accept");
575 int ev_listen(ev_source
*ev
,
577 ev_listen_callback
*callback
,
579 struct listen_state
*l
= xmalloc(sizeof *l
);
581 D(("registering listener fd %d callback %p %p", fd
, (void *)callback
, u
));
582 l
->callback
= callback
;
584 return ev_fd(ev
, ev_read
, fd
, listen_callback
, l
);
587 int ev_listen_cancel(ev_source
*ev
, int fd
) {
588 D(("cancelling listener fd %d", fd
));
589 return ev_fd_cancel(ev
, ev_read
, fd
);
592 /* buffer *********************************************************************/
595 char *base
, *start
, *end
, *top
;
598 /* make sure there is @bytes@ available at @b->end@ */
599 static void buffer_space(struct buffer
*b
, size_t bytes
) {
600 D(("buffer_space %p %p %p %p want %lu",
601 (void *)b
->base
, (void *)b
->start
, (void *)b
->end
, (void *)b
->top
,
602 (unsigned long)bytes
));
603 if(b
->start
== b
->end
)
604 b
->start
= b
->end
= b
->base
;
605 if((size_t)(b
->top
- b
->end
) < bytes
) {
606 if((size_t)((b
->top
- b
->end
) + (b
->start
- b
->base
)) < bytes
) {
607 size_t newspace
= b
->end
- b
->start
+ bytes
, n
;
610 for(n
= 16; n
< newspace
; n
*= 2)
612 newbase
= xmalloc_noptr(n
);
613 memcpy(newbase
, b
->start
, b
->end
- b
->start
);
615 b
->end
= newbase
+ (b
->end
- b
->start
);
616 b
->top
= newbase
+ n
;
617 b
->start
= newbase
; /* must be last */
619 memmove(b
->base
, b
->start
, b
->end
- b
->start
);
620 b
->end
= b
->base
+ (b
->end
- b
->start
);
624 D(("result %p %p %p %p",
625 (void *)b
->base
, (void *)b
->start
, (void *)b
->end
, (void *)b
->top
));
628 /* buffered writer ************************************************************/
635 ev_error_callback
*callback
;
640 static int writer_callback(ev_source
*ev
, int fd
, void *u
) {
644 n
= write(fd
, w
->b
.start
, w
->b
.end
- w
->b
.start
);
645 D(("callback for writer fd %d, %ld bytes, n=%d, errno=%d",
646 fd
, (long)(w
->b
.end
- w
->b
.start
), n
, errno
));
649 if(w
->b
.start
== w
->b
.end
) {
651 ev_fd_cancel(ev
, ev_write
, fd
);
652 return w
->callback(ev
, fd
, 0, w
->u
);
654 ev_fd_disable(ev
, ev_write
, fd
);
662 ev_fd_cancel(ev
, ev_write
, fd
);
663 return w
->callback(ev
, fd
, errno
, w
->u
);
669 static int ev_writer_write(struct sink
*sk
, const void *s
, int n
) {
670 ev_writer
*w
= (ev_writer
*)sk
;
672 buffer_space(&w
->b
, n
);
673 if(w
->b
.start
== w
->b
.end
)
674 ev_fd_enable(w
->ev
, ev_write
, w
->fd
);
675 memcpy(w
->b
.end
, s
, n
);
680 ev_writer
*ev_writer_new(ev_source
*ev
,
682 ev_error_callback
*callback
,
684 ev_writer
*w
= xmalloc(sizeof *w
);
686 D(("registering writer fd %d callback %p %p", fd
, (void *)callback
, u
));
687 w
->s
.write
= ev_writer_write
;
689 w
->callback
= callback
;
692 if(ev_fd(ev
, ev_write
, fd
, writer_callback
, w
))
694 ev_fd_disable(ev
, ev_write
, fd
);
698 struct sink
*ev_writer_sink(ev_writer
*w
) {
702 static int writer_shutdown(ev_source
*ev
,
703 const attribute((unused
)) struct timeval
*now
,
707 return w
->callback(ev
, w
->fd
, 0, w
->u
);
710 int ev_writer_close(ev_writer
*w
) {
711 D(("close writer fd %d", w
->fd
));
713 if(w
->b
.start
== w
->b
.end
) {
714 /* we're already finished */
715 ev_fd_cancel(w
->ev
, ev_write
, w
->fd
);
716 return ev_timeout(w
->ev
, 0, 0, writer_shutdown
, w
);
721 int ev_writer_cancel(ev_writer
*w
) {
722 D(("cancel writer fd %d", w
->fd
));
723 return ev_fd_cancel(w
->ev
, ev_write
, w
->fd
);
726 int ev_writer_flush(ev_writer
*w
) {
727 return writer_callback(w
->ev
, w
->fd
, w
);
730 /* buffered reader ************************************************************/
735 ev_reader_callback
*callback
;
736 ev_error_callback
*error_callback
;
742 static int reader_callback(ev_source
*ev
, int fd
, void *u
) {
746 buffer_space(&r
->b
, 1);
747 n
= read(fd
, r
->b
.end
, r
->b
.top
- r
->b
.end
);
748 D(("read fd %d buffer %d returned %d errno %d",
749 fd
, (int)(r
->b
.top
- r
->b
.end
), n
, errno
));
752 return r
->callback(ev
, r
, fd
, r
->b
.start
, r
->b
.end
- r
->b
.start
, 0, r
->u
);
755 ev_fd_cancel(ev
, ev_read
, fd
);
756 return r
->callback(ev
, r
, fd
, r
->b
.start
, r
->b
.end
- r
->b
.start
, 1, r
->u
);
763 ev_fd_cancel(ev
, ev_read
, fd
);
764 return r
->error_callback(ev
, fd
, errno
, r
->u
);
770 ev_reader
*ev_reader_new(ev_source
*ev
,
772 ev_reader_callback
*callback
,
773 ev_error_callback
*error_callback
,
775 ev_reader
*r
= xmalloc(sizeof *r
);
777 D(("registering reader fd %d callback %p %p %p",
778 fd
, (void *)callback
, (void *)error_callback
, u
));
780 r
->callback
= callback
;
781 r
->error_callback
= error_callback
;
784 if(ev_fd(ev
, ev_read
, fd
, reader_callback
, r
))
789 void ev_reader_buffer(ev_reader
*r
, size_t nbytes
) {
790 buffer_space(&r
->b
, nbytes
- (r
->b
.end
- r
->b
.start
));
793 void ev_reader_consume(ev_reader
*r
, size_t n
) {
797 int ev_reader_cancel(ev_reader
*r
) {
798 D(("cancel reader fd %d", r
->fd
));
799 return ev_fd_cancel(r
->ev
, ev_read
, r
->fd
);
802 int ev_reader_disable(ev_reader
*r
) {
803 D(("disable reader fd %d", r
->fd
));
804 return r
->eof ?
0 : ev_fd_disable(r
->ev
, ev_read
, r
->fd
);
807 static int reader_continuation(ev_source
attribute((unused
)) *ev
,
808 const attribute((unused
)) struct timeval
*now
,
812 D(("reader continuation callback fd %d", r
->fd
));
813 if(ev_fd_enable(r
->ev
, ev_read
, r
->fd
)) return -1;
814 return r
->callback(ev
, r
, r
->fd
, r
->b
.start
, r
->b
.end
- r
->b
.start
, r
->eof
, r
->u
);
817 int ev_reader_incomplete(ev_reader
*r
) {
818 if(ev_fd_disable(r
->ev
, ev_read
, r
->fd
)) return -1;
819 return ev_timeout(r
->ev
, 0, 0, reader_continuation
, r
);
822 static int reader_enabled(ev_source
*ev
,
823 const attribute((unused
)) struct timeval
*now
,
827 D(("reader enabled callback fd %d", r
->fd
));
828 return r
->callback(ev
, r
, r
->fd
, r
->b
.start
, r
->b
.end
- r
->b
.start
, r
->eof
, r
->u
);
831 int ev_reader_enable(ev_reader
*r
) {
832 D(("enable reader fd %d", r
->fd
));
833 return ((r
->eof ?
0 : ev_fd_enable(r
->ev
, ev_read
, r
->fd
))
834 || ev_timeout(r
->ev
, 0, 0, reader_enabled
, r
)) ?
-1 : 0;
844 /* arch-tag:a81dd5068039481faac3eea28c995570 */