2 * This file is part of DisOrder.
3 * Copyright (C) 2004, 2005, 2007 Richard Kettlewell
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
26 #include <sys/types.h>
27 #include <sys/resource.h>
35 #include <sys/socket.h>
36 #include <netinet/in.h>
49 ev_timeout_callback
*callback
;
56 ev_fd_callback
*callback
;
70 struct sigaction oldsa
;
71 ev_signal_callback
*callback
;
78 ev_child_callback
*callback
;
83 struct fdmode mode
[ev_nmodes
];
84 struct timeout
*timeouts
;
85 struct signal signals
[NSIG
];
89 int nchildren
, nchildslots
;
90 struct child
*children
;
93 static const char *modenames
[] = { "read", "write", "except" };
95 /* utilities ******************************************************************/
97 static inline int gt(const struct timeval
*a
, const struct timeval
*b
) {
98 if(a
->tv_sec
> b
->tv_sec
)
100 if(a
->tv_sec
== b
->tv_sec
101 && a
->tv_usec
> b
->tv_usec
)
106 static inline int ge(const struct timeval
*a
, const struct timeval
*b
) {
110 /* creation *******************************************************************/
112 ev_source
*ev_new(void) {
113 ev_source
*ev
= xmalloc(sizeof *ev
);
116 memset(ev
, 0, sizeof *ev
);
117 for(n
= 0; n
< ev_nmodes
; ++n
)
118 FD_ZERO(&ev
->mode
[n
].enabled
);
119 ev
->sigpipe
[0] = ev
->sigpipe
[1] = -1;
120 sigemptyset(&ev
->sigmask
);
124 /* event loop *****************************************************************/
126 int ev_run(ev_source
*ev
) {
129 struct timeval delta
;
133 struct timeout
*t
, **tt
;
136 xgettimeofday(&now
, 0);
137 /* Handle timeouts. We don't want to handle any timeouts that are added
138 * while we're handling them (otherwise we'd have to break out of infinite
139 * loops, preferrably without starving better-behaved subsystems). Hence
140 * the slightly complicated two-phase approach here. */
141 for(t
= ev
->timeouts
;
142 t
&& ge(&now
, &t
->when
);
145 D(("calling timeout for %ld.%ld callback %p %p",
146 (long)t
->when
.tv_sec
, (long)t
->when
.tv_usec
,
147 (void *)t
->callback
, t
->u
));
148 ret
= t
->callback(ev
, &now
, t
->u
);
160 for(mode
= 0; mode
< ev_nmodes
; ++mode
) {
161 ev
->mode
[mode
].tripped
= ev
->mode
[mode
].enabled
;
162 if(ev
->mode
[mode
].maxfd
> maxfd
)
163 maxfd
= ev
->mode
[mode
].maxfd
;
165 xsigprocmask(SIG_UNBLOCK
, &ev
->sigmask
, 0);
168 xgettimeofday(&now
, 0);
169 delta
.tv_sec
= ev
->timeouts
->when
.tv_sec
- now
.tv_sec
;
170 delta
.tv_usec
= ev
->timeouts
->when
.tv_usec
- now
.tv_usec
;
171 if(delta
.tv_usec
< 0) {
172 delta
.tv_usec
+= 1000000;
176 delta
.tv_sec
= delta
.tv_usec
= 0;
177 n
= select(maxfd
+ 1,
178 &ev
->mode
[ev_read
].tripped
,
179 &ev
->mode
[ev_write
].tripped
,
180 &ev
->mode
[ev_except
].tripped
,
183 n
= select(maxfd
+ 1,
184 &ev
->mode
[ev_read
].tripped
,
185 &ev
->mode
[ev_write
].tripped
,
186 &ev
->mode
[ev_except
].tripped
,
189 } while(n
< 0 && errno
== EINTR
);
190 xsigprocmask(SIG_BLOCK
, &ev
->sigmask
, 0);
192 error(errno
, "error calling select");
194 /* If there's a bad FD in the mix then check them all and log what we
195 * find, to ease debugging */
196 for(mode
= 0; mode
< ev_nmodes
; ++mode
) {
197 for(n
= 0; n
< ev
->mode
[mode
].nfds
; ++n
) {
198 const int fd
= ev
->mode
[mode
].fds
[n
].fd
;
200 if(FD_ISSET(fd
, &ev
->mode
[mode
].enabled
)
201 && fstat(fd
, &sb
) < 0)
202 error(errno
, "fstat %d (%s)", fd
, ev
->mode
[mode
].fds
[n
].what
);
209 /* if anything deranges the meaning of an fd, or re-orders the
210 * fds[] tables, we'd better give up; such operations will
211 * therefore set @escape@. */
213 for(mode
= 0; mode
< ev_nmodes
&& !ev
->escape
; ++mode
)
214 for(n
= 0; n
< ev
->mode
[mode
].nfds
&& !ev
->escape
; ++n
) {
215 int fd
= ev
->mode
[mode
].fds
[n
].fd
;
216 if(FD_ISSET(fd
, &ev
->mode
[mode
].tripped
)) {
217 D(("calling %s fd %d callback %p %p", modenames
[mode
], fd
,
218 (void *)ev
->mode
[mode
].fds
[n
].callback
,
219 ev
->mode
[mode
].fds
[n
].u
));
220 ret
= ev
->mode
[mode
].fds
[n
].callback(ev
, fd
,
221 ev
->mode
[mode
].fds
[n
].u
);
227 /* we'll pick up timeouts back round the loop */
231 /* file descriptors ***********************************************************/
233 int ev_fd(ev_source
*ev
,
236 ev_fd_callback
*callback
,
241 D(("registering %s fd %d callback %p %p", modenames
[mode
], fd
,
242 (void *)callback
, u
));
243 assert(mode
< ev_nmodes
);
244 if(ev
->mode
[mode
].nfds
>= ev
->mode
[mode
].fdslots
) {
245 ev
->mode
[mode
].fdslots
= (ev
->mode
[mode
].fdslots
246 ?
2 * ev
->mode
[mode
].fdslots
: 16);
247 D(("expanding %s fd table to %d entries", modenames
[mode
],
248 ev
->mode
[mode
].fdslots
));
249 ev
->mode
[mode
].fds
= xrealloc(ev
->mode
[mode
].fds
,
250 ev
->mode
[mode
].fdslots
* sizeof (struct fd
));
252 n
= ev
->mode
[mode
].nfds
++;
253 FD_SET(fd
, &ev
->mode
[mode
].enabled
);
254 ev
->mode
[mode
].fds
[n
].fd
= fd
;
255 ev
->mode
[mode
].fds
[n
].callback
= callback
;
256 ev
->mode
[mode
].fds
[n
].u
= u
;
257 ev
->mode
[mode
].fds
[n
].what
= what
;
258 if(fd
> ev
->mode
[mode
].maxfd
)
259 ev
->mode
[mode
].maxfd
= fd
;
264 int ev_fd_cancel(ev_source
*ev
, ev_fdmode mode
, int fd
) {
268 D(("cancelling mode %s fd %d", modenames
[mode
], fd
));
269 /* find the right struct fd */
270 for(n
= 0; n
< ev
->mode
[mode
].nfds
&& fd
!= ev
->mode
[mode
].fds
[n
].fd
; ++n
)
272 assert(n
< ev
->mode
[mode
].nfds
);
273 /* swap in the last fd and reduce the count */
274 if(n
!= ev
->mode
[mode
].nfds
- 1)
275 ev
->mode
[mode
].fds
[n
] = ev
->mode
[mode
].fds
[ev
->mode
[mode
].nfds
- 1];
276 --ev
->mode
[mode
].nfds
;
277 /* if that was the biggest fd, find the new biggest one */
278 if(fd
== ev
->mode
[mode
].maxfd
) {
280 for(n
= 0; n
< ev
->mode
[mode
].nfds
; ++n
)
281 if(ev
->mode
[mode
].fds
[n
].fd
> maxfd
)
282 maxfd
= ev
->mode
[mode
].fds
[n
].fd
;
283 ev
->mode
[mode
].maxfd
= maxfd
;
285 /* don't tell select about this fd any more */
286 FD_CLR(fd
, &ev
->mode
[mode
].enabled
);
291 int ev_fd_enable(ev_source
*ev
, ev_fdmode mode
, int fd
) {
292 D(("enabling mode %s fd %d", modenames
[mode
], fd
));
293 FD_SET(fd
, &ev
->mode
[mode
].enabled
);
297 int ev_fd_disable(ev_source
*ev
, ev_fdmode mode
, int fd
) {
298 D(("disabling mode %s fd %d", modenames
[mode
], fd
));
299 FD_CLR(fd
, &ev
->mode
[mode
].enabled
);
300 FD_CLR(fd
, &ev
->mode
[mode
].tripped
);
304 /* timeouts *******************************************************************/
306 int ev_timeout(ev_source
*ev
,
307 ev_timeout_handle
*handlep
,
308 const struct timeval
*when
,
309 ev_timeout_callback
*callback
,
311 struct timeout
*t
, *p
, **pp
;
313 D(("registering timeout at %ld.%ld callback %p %p",
314 when ?
(long)when
->tv_sec
: 0, when ?
(long)when
->tv_usec
: 0,
315 (void *)callback
, u
));
316 t
= xmalloc(sizeof *t
);
319 t
->callback
= callback
;
322 while((p
= *pp
) && gt(&t
->when
, &p
->when
))
331 int ev_timeout_cancel(ev_source
*ev
,
332 ev_timeout_handle handle
) {
333 struct timeout
*t
= handle
, *p
, **pp
;
335 for(pp
= &ev
->timeouts
; (p
= *pp
) && p
!= t
; pp
= &p
->next
)
344 /* signals ********************************************************************/
346 static int sigfd
[NSIG
];
348 static void sighandler(int s
) {
349 unsigned char sc
= s
;
350 static const char errmsg
[] = "error writing to signal pipe";
352 /* probably the reader has stopped listening for some reason */
353 if(write(sigfd
[s
], &sc
, 1) < 0) {
354 write(2, errmsg
, sizeof errmsg
- 1);
359 static int signal_read(ev_source
*ev
,
360 int attribute((unused
)) fd
,
361 void attribute((unused
)) *u
) {
366 if((n
= read(ev
->sigpipe
[0], &s
, 1)) == 1)
367 if((ret
= ev
->signals
[s
].callback(ev
, s
, ev
->signals
[s
].u
)))
370 if(n
< 0 && (errno
!= EINTR
&& errno
!= EAGAIN
)) {
371 error(errno
, "error reading from signal pipe %d", ev
->sigpipe
[0]);
377 static void close_sigpipe(ev_source
*ev
) {
378 int save_errno
= errno
;
380 xclose(ev
->sigpipe
[0]);
381 xclose(ev
->sigpipe
[1]);
382 ev
->sigpipe
[0] = ev
->sigpipe
[1] = -1;
386 int ev_signal(ev_source
*ev
,
388 ev_signal_callback
*callback
,
393 D(("registering signal %d handler callback %p %p", sig
, (void *)callback
, u
));
396 assert(sig
<= UCHAR_MAX
);
397 if(ev
->sigpipe
[0] == -1) {
398 D(("creating signal pipe"));
400 D(("signal pipe is %d, %d", ev
->sigpipe
[0], ev
->sigpipe
[1]));
401 for(n
= 0; n
< 2; ++n
) {
402 nonblock(ev
->sigpipe
[n
]);
403 cloexec(ev
->sigpipe
[n
]);
405 if(ev_fd(ev
, ev_read
, ev
->sigpipe
[0], signal_read
, 0, "sigpipe read")) {
410 sigaddset(&ev
->sigmask
, sig
);
411 xsigprocmask(SIG_BLOCK
, &ev
->sigmask
, 0);
412 sigfd
[sig
] = ev
->sigpipe
[1];
413 ev
->signals
[sig
].callback
= callback
;
414 ev
->signals
[sig
].u
= u
;
415 sa
.sa_handler
= sighandler
;
416 sigfillset(&sa
.sa_mask
);
417 sa
.sa_flags
= SA_RESTART
;
418 xsigaction(sig
, &sa
, &ev
->signals
[sig
].oldsa
);
423 int ev_signal_cancel(ev_source
*ev
,
427 xsigaction(sig
, &ev
->signals
[sig
].oldsa
, 0);
428 ev
->signals
[sig
].callback
= 0;
430 sigdelset(&ev
->sigmask
, sig
);
433 xsigprocmask(SIG_UNBLOCK
, &ss
, 0);
437 void ev_signal_atfork(ev_source
*ev
) {
440 if(ev
->sigpipe
[0] != -1) {
441 /* revert any handled signals to their original state */
442 for(sig
= 1; sig
< NSIG
; ++sig
) {
443 if(ev
->signals
[sig
].callback
!= 0)
444 xsigaction(sig
, &ev
->signals
[sig
].oldsa
, 0);
446 /* and then unblock them */
447 xsigprocmask(SIG_UNBLOCK
, &ev
->sigmask
, 0);
448 /* don't want a copy of the signal pipe open inside the fork */
449 xclose(ev
->sigpipe
[0]);
450 xclose(ev
->sigpipe
[1]);
454 /* child processes ************************************************************/
456 static int sigchld_callback(ev_source
*ev
,
457 int attribute((unused
)) sig
,
458 void attribute((unused
)) *u
) {
461 int status
, n
, ret
, revisit
;
465 for(n
= 0; n
< ev
->nchildren
; ++n
) {
466 r
= wait4(ev
->children
[n
].pid
,
468 ev
->children
[n
].options
| WNOHANG
,
471 ev_child_callback
*c
= ev
->children
[n
].callback
;
472 void *cu
= ev
->children
[n
].u
;
474 if(WIFEXITED(status
) || WIFSIGNALED(status
))
475 ev_child_cancel(ev
, r
);
477 if((ret
= c(ev
, r
, status
, &ru
, cu
)))
480 /* We should "never" get an ECHILD but it can in fact happen. For
481 * instance on Linux 2.4.31, and probably other versions, if someone
482 * straces a child process and then a different child process
483 * terminates, when we wait4() the trace process we will get ECHILD
484 * because it has been reparented to strace. Obviously this is a
485 * hopeless design flaw in the tracing infrastructure, but we don't
486 * want the disorder server to bomb out because of it. So we just log
487 * the problem and ignore it.
489 error(errno
, "error calling wait4 for PID %lu (broken ptrace?)",
490 (unsigned long)ev
->children
[n
].pid
);
499 int ev_child_setup(ev_source
*ev
) {
500 D(("installing SIGCHLD handler"));
501 return ev_signal(ev
, SIGCHLD
, sigchld_callback
, 0);
504 int ev_child(ev_source
*ev
,
507 ev_child_callback
*callback
,
511 D(("registering child handling %ld options %d callback %p %p",
512 (long)pid
, options
, (void *)callback
, u
));
513 assert(ev
->signals
[SIGCHLD
].callback
== sigchld_callback
);
514 if(ev
->nchildren
>= ev
->nchildslots
) {
515 ev
->nchildslots
= ev
->nchildslots ?
2 * ev
->nchildslots
: 16;
516 ev
->children
= xrealloc(ev
->children
,
517 ev
->nchildslots
* sizeof (struct child
));
520 ev
->children
[n
].pid
= pid
;
521 ev
->children
[n
].options
= options
;
522 ev
->children
[n
].callback
= callback
;
523 ev
->children
[n
].u
= u
;
527 int ev_child_cancel(ev_source
*ev
,
531 for(n
= 0; n
< ev
->nchildren
&& ev
->children
[n
].pid
!= pid
; ++n
)
533 assert(n
< ev
->nchildren
);
534 if(n
!= ev
->nchildren
- 1)
535 ev
->children
[n
] = ev
->children
[ev
->nchildren
- 1];
540 /* socket listeners ***********************************************************/
542 struct listen_state
{
543 ev_listen_callback
*callback
;
547 static int listen_callback(ev_source
*ev
, int fd
, void *u
) {
548 const struct listen_state
*l
= u
;
551 struct sockaddr_in in
;
552 #if HAVE_STRUCT_SOCKADDR_IN6
553 struct sockaddr_in6 in6
;
555 struct sockaddr_un un
;
561 D(("callback for listener fd %d", fd
));
562 while((addrlen
= sizeof addr
),
563 (newfd
= accept(fd
, &addr
.sa
, &addrlen
)) >= 0) {
564 if((ret
= l
->callback(ev
, newfd
, &addr
.sa
, addrlen
, l
->u
)))
573 error(errno
, "error calling accept");
578 /* XXX on some systems EPROTO should be fatal, but we don't know if
579 * we're running on one of them */
580 error(errno
, "error calling accept");
584 fatal(errno
, "error calling accept");
587 if(errno
!= EINTR
&& errno
!= EAGAIN
)
588 error(errno
, "error calling accept");
592 int ev_listen(ev_source
*ev
,
594 ev_listen_callback
*callback
,
597 struct listen_state
*l
= xmalloc(sizeof *l
);
599 D(("registering listener fd %d callback %p %p", fd
, (void *)callback
, u
));
600 l
->callback
= callback
;
602 return ev_fd(ev
, ev_read
, fd
, listen_callback
, l
, what
);
605 int ev_listen_cancel(ev_source
*ev
, int fd
) {
606 D(("cancelling listener fd %d", fd
));
607 return ev_fd_cancel(ev
, ev_read
, fd
);
610 /* buffer *********************************************************************/
613 char *base
, *start
, *end
, *top
;
616 /* make sure there is @bytes@ available at @b->end@ */
617 static void buffer_space(struct buffer
*b
, size_t bytes
) {
618 D(("buffer_space %p %p %p %p want %lu",
619 (void *)b
->base
, (void *)b
->start
, (void *)b
->end
, (void *)b
->top
,
620 (unsigned long)bytes
));
621 if(b
->start
== b
->end
)
622 b
->start
= b
->end
= b
->base
;
623 if((size_t)(b
->top
- b
->end
) < bytes
) {
624 if((size_t)((b
->top
- b
->end
) + (b
->start
- b
->base
)) < bytes
) {
625 size_t newspace
= b
->end
- b
->start
+ bytes
, n
;
628 for(n
= 16; n
< newspace
; n
*= 2)
630 newbase
= xmalloc_noptr(n
);
631 memcpy(newbase
, b
->start
, b
->end
- b
->start
);
633 b
->end
= newbase
+ (b
->end
- b
->start
);
634 b
->top
= newbase
+ n
;
635 b
->start
= newbase
; /* must be last */
637 memmove(b
->base
, b
->start
, b
->end
- b
->start
);
638 b
->end
= b
->base
+ (b
->end
- b
->start
);
642 D(("result %p %p %p %p",
643 (void *)b
->base
, (void *)b
->start
, (void *)b
->end
, (void *)b
->top
));
646 /* buffered writer ************************************************************/
653 ev_error_callback
*callback
;
658 static int writer_callback(ev_source
*ev
, int fd
, void *u
) {
662 n
= write(fd
, w
->b
.start
, w
->b
.end
- w
->b
.start
);
663 D(("callback for writer fd %d, %ld bytes, n=%d, errno=%d",
664 fd
, (long)(w
->b
.end
- w
->b
.start
), n
, errno
));
667 if(w
->b
.start
== w
->b
.end
) {
669 ev_fd_cancel(ev
, ev_write
, fd
);
670 return w
->callback(ev
, fd
, 0, w
->u
);
672 ev_fd_disable(ev
, ev_write
, fd
);
680 ev_fd_cancel(ev
, ev_write
, fd
);
681 return w
->callback(ev
, fd
, errno
, w
->u
);
687 static int ev_writer_write(struct sink
*sk
, const void *s
, int n
) {
688 ev_writer
*w
= (ev_writer
*)sk
;
690 buffer_space(&w
->b
, n
);
691 if(w
->b
.start
== w
->b
.end
)
692 ev_fd_enable(w
->ev
, ev_write
, w
->fd
);
693 memcpy(w
->b
.end
, s
, n
);
698 ev_writer
*ev_writer_new(ev_source
*ev
,
700 ev_error_callback
*callback
,
703 ev_writer
*w
= xmalloc(sizeof *w
);
705 D(("registering writer fd %d callback %p %p", fd
, (void *)callback
, u
));
706 w
->s
.write
= ev_writer_write
;
708 w
->callback
= callback
;
711 if(ev_fd(ev
, ev_write
, fd
, writer_callback
, w
, what
))
713 ev_fd_disable(ev
, ev_write
, fd
);
717 struct sink
*ev_writer_sink(ev_writer
*w
) {
721 static int writer_shutdown(ev_source
*ev
,
722 const attribute((unused
)) struct timeval
*now
,
726 return w
->callback(ev
, w
->fd
, 0, w
->u
);
729 int ev_writer_close(ev_writer
*w
) {
730 D(("close writer fd %d", w
->fd
));
732 if(w
->b
.start
== w
->b
.end
) {
733 /* we're already finished */
734 ev_fd_cancel(w
->ev
, ev_write
, w
->fd
);
735 return ev_timeout(w
->ev
, 0, 0, writer_shutdown
, w
);
740 int ev_writer_cancel(ev_writer
*w
) {
741 D(("cancel writer fd %d", w
->fd
));
742 return ev_fd_cancel(w
->ev
, ev_write
, w
->fd
);
745 int ev_writer_flush(ev_writer
*w
) {
746 return writer_callback(w
->ev
, w
->fd
, w
);
749 /* buffered reader ************************************************************/
754 ev_reader_callback
*callback
;
755 ev_error_callback
*error_callback
;
761 static int reader_callback(ev_source
*ev
, int fd
, void *u
) {
765 buffer_space(&r
->b
, 1);
766 n
= read(fd
, r
->b
.end
, r
->b
.top
- r
->b
.end
);
767 D(("read fd %d buffer %d returned %d errno %d",
768 fd
, (int)(r
->b
.top
- r
->b
.end
), n
, errno
));
771 return r
->callback(ev
, r
, fd
, r
->b
.start
, r
->b
.end
- r
->b
.start
, 0, r
->u
);
774 ev_fd_cancel(ev
, ev_read
, fd
);
775 return r
->callback(ev
, r
, fd
, r
->b
.start
, r
->b
.end
- r
->b
.start
, 1, r
->u
);
782 ev_fd_cancel(ev
, ev_read
, fd
);
783 return r
->error_callback(ev
, fd
, errno
, r
->u
);
789 ev_reader
*ev_reader_new(ev_source
*ev
,
791 ev_reader_callback
*callback
,
792 ev_error_callback
*error_callback
,
795 ev_reader
*r
= xmalloc(sizeof *r
);
797 D(("registering reader fd %d callback %p %p %p",
798 fd
, (void *)callback
, (void *)error_callback
, u
));
800 r
->callback
= callback
;
801 r
->error_callback
= error_callback
;
804 if(ev_fd(ev
, ev_read
, fd
, reader_callback
, r
, what
))
809 void ev_reader_buffer(ev_reader
*r
, size_t nbytes
) {
810 buffer_space(&r
->b
, nbytes
- (r
->b
.end
- r
->b
.start
));
813 void ev_reader_consume(ev_reader
*r
, size_t n
) {
817 int ev_reader_cancel(ev_reader
*r
) {
818 D(("cancel reader fd %d", r
->fd
));
819 return ev_fd_cancel(r
->ev
, ev_read
, r
->fd
);
822 int ev_reader_disable(ev_reader
*r
) {
823 D(("disable reader fd %d", r
->fd
));
824 return r
->eof ?
0 : ev_fd_disable(r
->ev
, ev_read
, r
->fd
);
827 static int reader_continuation(ev_source
attribute((unused
)) *ev
,
828 const attribute((unused
)) struct timeval
*now
,
832 D(("reader continuation callback fd %d", r
->fd
));
833 if(ev_fd_enable(r
->ev
, ev_read
, r
->fd
)) return -1;
834 return r
->callback(ev
, r
, r
->fd
, r
->b
.start
, r
->b
.end
- r
->b
.start
, r
->eof
, r
->u
);
837 int ev_reader_incomplete(ev_reader
*r
) {
838 if(ev_fd_disable(r
->ev
, ev_read
, r
->fd
)) return -1;
839 return ev_timeout(r
->ev
, 0, 0, reader_continuation
, r
);
842 static int reader_enabled(ev_source
*ev
,
843 const attribute((unused
)) struct timeval
*now
,
847 D(("reader enabled callback fd %d", r
->fd
));
848 return r
->callback(ev
, r
, r
->fd
, r
->b
.start
, r
->b
.end
- r
->b
.start
, r
->eof
, r
->u
);
851 int ev_reader_enable(ev_reader
*r
) {
852 D(("enable reader fd %d", r
->fd
));
853 return ((r
->eof ?
0 : ev_fd_enable(r
->ev
, ev_read
, r
->fd
))
854 || ev_timeout(r
->ev
, 0, 0, reader_enabled
, r
)) ?
-1 : 0;