X-Git-Url: https://git.distorted.org.uk/~mdw/disorder/blobdiff_plain/5626f6d23354cf868148fb803577672f41d4d245..8e3fe3d8ea9f5e75b6f1be4e4ae8cf2c16db88e3:/clients/playrtp.c diff --git a/clients/playrtp.c b/clients/playrtp.c index fcc339b..86a248c 100644 --- a/clients/playrtp.c +++ b/clients/playrtp.c @@ -26,21 +26,26 @@ * systems. There is no support for Microsoft Windows yet, and that will in * fact probably an entirely separate program. * - * The program runs (at least) two threads. listen_thread() is responsible for - * reading RTP packets off the wire and adding them to the binary heap @ref - * packets, assuming they are basically sound. + * The program runs (at least) three threads. listen_thread() is responsible + * for reading RTP packets off the wire and adding them to the linked list @ref + * received_packets, assuming they are basically sound. queue_thread() takes + * packets off this linked list and adds them to @ref packets (an operation + * which might be much slower due to contention for @ref lock). * * The main thread is responsible for actually playing audio. In ALSA this * means it waits until ALSA says it's ready for more audio which it then * plays. * - * InCore Audio the main thread is only responsible for starting and stopping + * In Core Audio the main thread is only responsible for starting and stopping * play: the system does the actual playback in its own private thread, and * calls adioproc() to fetch the audio data. * * Sometimes it happens that there is no audio available to play. This may * because the server went away, or a packet was dropped, or the server * deliberately did not send any sound because it encountered a silence. + * + * Assumptions: + * - it is safe to read uint32_t values without a lock protecting them */ #include @@ -57,6 +62,7 @@ #include #include #include +#include #include "log.h" #include "mem.h" @@ -67,6 +73,8 @@ #include "defs.h" #include "vector.h" #include "heap.h" +#include "timeval.h" +#include "playrtp.h" #if HAVE_COREAUDIO_AUDIOHARDWARE_H # include @@ -84,13 +92,7 @@ static int rtpfd; static FILE *logfp; /** @brief Output device */ -static const char *device; - -/** @brief Maximum samples per packet we'll support - * - * NB that two channels = two samples in this program. - */ -#define MAXSAMPLES 2048 +const char *device; /** @brief Minimum low watermark * @@ -107,138 +109,64 @@ static unsigned readahead = 2 * 2 * 44100; * We'll stop reading from the network if we have this many samples. */ static unsigned maxbuffer; -/** @brief Number of samples to infill by in one go +/** @brief Received packets + * Protected by @ref receive_lock * - * This is an upper bound - in practice we expect the underlying audio API to - * only ask for a much smaller number of samples in any one go. + * Received packets are added to this list, and queue_thread() picks them off + * it and adds them to @ref packets. Whenever a packet is added to it, @ref + * receive_cond is signalled. */ -#define INFILL_SAMPLES (44100 * 2) /* 1s */ +struct packet *received_packets; -/** @brief Received packet - * - * Received packets are kept in a binary heap (see @ref pheap) ordered by - * timestamp. +/** @brief Tail of @ref received_packets + * Protected by @ref receive_lock */ -struct packet { - /** @brief Number of samples in this packet */ - uint32_t nsamples; - - /** @brief Timestamp from RTP packet - * - * NB that "timestamps" are really sample counters. Use lt() or lt_packet() - * to compare timestamps. - */ - uint32_t timestamp; - - /** @brief Flags - * - * Valid values are: - * - @ref IDLE - the idle bit was set in the RTP packet - */ - unsigned flags; -/** @brief idle bit set in RTP packet*/ -#define IDLE 0x0001 - - /** @brief Raw sample data - * - * Only the first @p nsamples samples are defined; the rest is uninitialized - * data. - */ - uint16_t samples_raw[MAXSAMPLES]; -}; +struct packet **received_tail = &received_packets; -/** @brief Return true iff \f$a < b\f$ in sequence-space arithmetic +/** @brief Lock protecting @ref received_packets * - * Specifically it returns true if \f$(a-b) mod 2^{32} < 2^{31}\f$. - * - * See also lt_packet(). - */ -static inline int lt(uint32_t a, uint32_t b) { - return (uint32_t)(a - b) & 0x80000000; -} - -/** @brief Return true iff a >= b in sequence-space arithmetic */ -static inline int ge(uint32_t a, uint32_t b) { - return !lt(a, b); -} - -/** @brief Return true iff a > b in sequence-space arithmetic */ -static inline int gt(uint32_t a, uint32_t b) { - return lt(b, a); -} + * Only listen_thread() and queue_thread() ever hold this lock. It is vital + * that queue_thread() not hold it any longer than it strictly has to. */ +pthread_mutex_t receive_lock = PTHREAD_MUTEX_INITIALIZER; -/** @brief Return true iff a <= b in sequence-space arithmetic */ -static inline int le(uint32_t a, uint32_t b) { - return !lt(b, a); -} - -/** @brief Ordering for packets, used by @ref pheap */ -static inline int lt_packet(const struct packet *a, const struct packet *b) { - return lt(a->timestamp, b->timestamp); -} +/** @brief Condition variable signalled when @ref received_packets is updated + * + * Used by listen_thread() to notify queue_thread() that it has added another + * packet to @ref received_packets. */ +pthread_cond_t receive_cond = PTHREAD_COND_INITIALIZER; -/** @struct pheap - * @brief Binary heap of packets ordered by timestamp */ -HEAP_TYPE(pheap, struct packet *, lt_packet); +/** @brief Length of @ref received_packets */ +uint32_t nreceived; /** @brief Binary heap of received packets */ -static struct pheap packets; +struct pheap packets; -/** @brief Total number of samples available */ -static unsigned long nsamples; +/** @brief Total number of samples available + * + * We make this volatile because we inspect it without a protecting lock, + * so the usual pthread_* guarantees aren't available. + */ +volatile uint32_t nsamples; /** @brief Timestamp of next packet to play. * * This is set to the timestamp of the last packet, plus the number of * samples it contained. Only valid if @ref active is nonzero. */ -static uint32_t next_timestamp; +uint32_t next_timestamp; /** @brief True if actively playing * * This is true when playing and false when just buffering. */ -static int active; +int active; -/** @brief Structure of free packet list */ -union free_packet { - struct packet p; - union free_packet *next; -}; - -/** @brief Linked list of free packets - * - * This is a linked list of formerly used packets. For preference we re-use - * packets that have already been used rather than unused ones, to limit the - * size of the program's working set. If there are no free packets in the list - * we try @ref next_free_packet instead. - * - * Must hold @ref lock when accessing this. - */ -static union free_packet *free_packets; - -/** @brief Array of new free packets - * - * There are @ref count_free_packets ready to use at this address. If there - * are none left we allocate more memory. - * - * Must hold @ref lock when accessing this. - */ -static union free_packet *next_free_packet; - -/** @brief Count of new free packets at @ref next_free_packet - * - * Must hold @ref lock when accessing this. - */ -static size_t count_free_packets; - -/** @brief Lock protecting @ref packets - * - * This also protects the packet memory allocation infrastructure, @ref - * free_packets and @ref next_free_packet. */ -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; +/** @brief Lock protecting @ref packets */ +pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; /** @brief Condition variable signalled whenever @ref packets is changed */ -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; +pthread_cond_t cond = PTHREAD_COND_INITIALIZER; + +HEAP_DEFINE(pheap, struct packet *, lt_packet); static const struct option options[] = { { "help", no_argument, 0, 'h' }, @@ -248,38 +176,11 @@ static const struct option options[] = { { "min", required_argument, 0, 'm' }, { "max", required_argument, 0, 'x' }, { "buffer", required_argument, 0, 'b' }, + { "rcvbuf", required_argument, 0, 'R' }, + { "multicast", required_argument, 0, 'M' }, { 0, 0, 0, 0 } }; -/** @brief Return a new packet - * - * Assumes that @ref lock is held. */ -static struct packet *new_packet(void) { - struct packet *p; - - if(free_packets) { - p = &free_packets->p; - free_packets = free_packets->next; - } else { - if(!count_free_packets) { - next_free_packet = xcalloc(1024, sizeof (union free_packet)); - count_free_packets = 1024; - } - p = &(next_free_packet++)->p; - --count_free_packets; - } - return p; -} - -/** @brief Free a packet - * - * Assumes that @ref lock is held. */ -static void free_packet(struct packet *p) { - union free_packet *u = (union free_packet *)p; - u->next = free_packets; - free_packets = u; -} - /** @brief Drop the first packet * * Assumes that @ref lock is held. @@ -293,6 +194,36 @@ static void drop_first_packet(void) { } } +/** @brief Background thread adding packets to heap + * + * This just transfers packets from @ref received_packets to @ref packets. It + * is important that it holds @ref receive_lock for as little time as possible, + * in order to minimize the interval between calls to read() in + * listen_thread(). + */ +static void *queue_thread(void attribute((unused)) *arg) { + struct packet *p; + + for(;;) { + /* Get the next packet */ + pthread_mutex_lock(&receive_lock); + while(!received_packets) + pthread_cond_wait(&receive_cond, &receive_lock); + p = received_packets; + received_packets = p->next; + if(!received_packets) + received_tail = &received_packets; + --nreceived; + pthread_mutex_unlock(&receive_lock); + /* Add it to the heap */ + pthread_mutex_lock(&lock); + pheap_insert(&packets, p); + nsamples += p->nsamples; + pthread_cond_broadcast(&cond); + pthread_mutex_unlock(&lock); + } +} + /** @brief Background thread collecting samples * * This function collects samples, perhaps converts them to the target format, @@ -323,11 +254,8 @@ static void *listen_thread(void attribute((unused)) *arg) { struct iovec iov[2]; for(;;) { - if(!p) { - pthread_mutex_lock(&lock); + if(!p) p = new_packet(); - pthread_mutex_unlock(&lock); - } iov[0].iov_base = &header; iov[0].iov_len = sizeof header; iov[1].iov_base = p->samples_raw; @@ -354,7 +282,7 @@ static void *listen_thread(void attribute((unused)) *arg) { timestamp, next_timestamp); continue; } - pthread_mutex_lock(&lock); + p->next = 0; p->flags = 0; p->timestamp = timestamp; /* Convert to target format */ @@ -377,17 +305,20 @@ static void *listen_thread(void attribute((unused)) *arg) { * This is rather unsatisfactory: it means that if packets get heavily * out of order then we guarantee dropouts. But for now... */ if(nsamples >= maxbuffer) { - info("Buffer full"); + pthread_mutex_lock(&lock); while(nsamples >= maxbuffer) pthread_cond_wait(&cond, &lock); + pthread_mutex_unlock(&lock); } - /* Add the packet to the heap */ - pheap_insert(&packets, p); - nsamples += p->nsamples; + /* Add the packet to the receive queue */ + pthread_mutex_lock(&receive_lock); + *received_tail = p; + received_tail = &p->next; + ++nreceived; + pthread_cond_signal(&receive_cond); + pthread_mutex_unlock(&receive_lock); /* We'll need a new packet */ p = 0; - pthread_cond_broadcast(&cond); - pthread_mutex_unlock(&lock); } } @@ -408,6 +339,8 @@ static inline int contains(const struct packet *p, uint32_t timestamp) { * Must be called with @ref lock held. */ static void fill_buffer(void) { + while(nsamples) + drop_first_packet(); info("Buffering..."); while(nsamples < readahead) pthread_cond_wait(&cond, &lock); @@ -460,8 +393,6 @@ static OSStatus adioproc while(samplesOutLeft > 0) { const struct packet *p = next_packet(); if(p && contains(p, next_timestamp)) { - if(p->flags & IDLE) - write(2, "I", 1); /* This packet is ready to play */ const uint32_t packet_end = p->timestamp + p->nsamples; const uint32_t offset = next_timestamp - p->timestamp; @@ -476,7 +407,6 @@ static OSStatus adioproc *samplesOut++ = (int16_t)ntohs(*ptr++) * (0.5 / 32767); /* We don't bother junking the packet - that'll be dealt with next time * round */ - write(2, ".", 1); } else { /* No packet is ready to play (and there might be no packet at all) */ samples_available = p ? p->timestamp - next_timestamp @@ -488,7 +418,6 @@ static OSStatus adioproc next_timestamp += samples_available; samplesOut += samples_available; samplesOutLeft -= samples_available; - write(2, "?", 1); } } ++ab; @@ -594,7 +523,6 @@ static int alsa_writei(const void *s, size_t n) { /* Something went wrong */ switch(frames_written) { case -EAGAIN: - write(2, "#", 1); return 0; case -EPIPE: error(0, "error calling snd_pcm_writei: %ld", @@ -616,9 +544,6 @@ static int alsa_writei(const void *s, size_t n) { * @return 0 on success, -1 on non-fatal error */ static int alsa_play(const struct packet *p) { - if(p->flags & IDLE) - write(2, "I", 1); - write(2, ".", 1); return alsa_writei(p->samples_raw + next_timestamp - p->timestamp, (p->timestamp + p->nsamples) - next_timestamp); } @@ -633,7 +558,6 @@ static int alsa_infill(const struct packet *p) { if(p && samples_available > p->timestamp - next_timestamp) samples_available = p->timestamp - next_timestamp; - write(2, "?", 1); return alsa_writei(zeros, samples_available); } @@ -669,6 +593,8 @@ static void play_rtp(void) { /* We receive and convert audio data in a background thread */ pthread_create(<id, 0, listen_thread, 0); + /* We have a second thread to add received packets to the queue */ + pthread_create(<id, 0, queue_thread, 0); #if API_ALSA { struct packet *p; @@ -689,7 +615,10 @@ static void play_rtp(void) { info("Playing..."); /* Keep playing until the buffer empties out, or ALSA tells us to get * lost */ - while(nsamples >= minbuffer && !escape) { + while((nsamples >= minbuffer + || (nsamples > 0 + && contains(pheap_first(&packets), next_timestamp))) + && !escape) { /* Wait for ALSA to ask us for more data */ pthread_mutex_unlock(&lock); wait_alsa(); @@ -760,7 +689,9 @@ static void play_rtp(void) { if(status) fatal(0, "AudioDeviceStart: %d", (int)status); /* Wait until the buffer empties out */ - while(nsamples >= minbuffer) + while(nsamples >= minbuffer + || (nsamples > 0 + && contains(pheap_first(&packets), next_timestamp))) pthread_cond_wait(&cond, &lock); /* Stop playing for a bit until the buffer re-fills */ status = AudioDeviceStop(adid, adioproc); @@ -784,6 +715,8 @@ static void help(void) { " --min, -m FRAMES Buffer low water mark\n" " --buffer, -b FRAMES Buffer high water mark\n" " --max, -x FRAMES Buffer maximum size\n" + " --rcvbuf, -R BYTES Socket receive buffer size\n" + " --multicast, -M GROUP Join multicast group\n" " --help, -h Display usage message\n" " --version, -V Display version number\n" ); @@ -803,6 +736,11 @@ int main(int argc, char **argv) { struct addrinfo *res; struct stringlist sl; char *sockname; + int rcvbuf, target_rcvbuf = 131072; + socklen_t len; + char *multicast_group = 0; + struct ip_mreq mreq; + struct ipv6_mreq mreq6; static const struct addrinfo prefs = { AI_PASSIVE, @@ -817,7 +755,7 @@ int main(int argc, char **argv) { mem_init(); if(!setlocale(LC_CTYPE, "")) fatal(errno, "error calling setlocale"); - while((n = getopt_long(argc, argv, "hVdD:m:b:x:L:", options, 0)) >= 0) { + while((n = getopt_long(argc, argv, "hVdD:m:b:x:L:R:M:", options, 0)) >= 0) { switch(n) { case 'h': help(); case 'V': version(); @@ -827,6 +765,8 @@ int main(int argc, char **argv) { case 'b': readahead = 2 * atol(optarg); break; case 'x': maxbuffer = 2 * atol(optarg); break; case 'L': logfp = fopen(optarg, "w"); break; + case 'R': target_rcvbuf = atoi(optarg); break; + case 'M': multicast_group = optarg; break; default: fatal(0, "invalid option"); } } @@ -847,6 +787,44 @@ int main(int argc, char **argv) { fatal(errno, "error creating socket"); if(bind(rtpfd, res->ai_addr, res->ai_addrlen) < 0) fatal(errno, "error binding socket to %s", sockname); + if(multicast_group) { + if((n = getaddrinfo(multicast_group, 0, &prefs, &res))) + fatal(0, "getaddrinfo %s: %s", multicast_group, gai_strerror(n)); + switch(res->ai_family) { + case PF_INET: + mreq.imr_multiaddr = ((struct sockaddr_in *)res->ai_addr)->sin_addr; + mreq.imr_interface.s_addr = 0; /* use primary interface */ + if(setsockopt(rtpfd, IPPROTO_IP, IP_ADD_MEMBERSHIP, + &mreq, sizeof mreq) < 0) + fatal(errno, "error calling setsockopt IP_ADD_MEMBERSHIP"); + break; + case PF_INET6: + mreq6.ipv6mr_multiaddr = ((struct sockaddr_in6 *)res->ai_addr)->sin6_addr; + memset(&mreq6.ipv6mr_interface, 0, sizeof mreq6.ipv6mr_interface); + if(setsockopt(rtpfd, IPPROTO_IPV6, IPV6_JOIN_GROUP, + &mreq6, sizeof mreq6) < 0) + fatal(errno, "error calling setsockopt IPV6_JOIN_GROUP"); + break; + default: + fatal(0, "unsupported address family %d", res->ai_family); + } + } + len = sizeof rcvbuf; + if(getsockopt(rtpfd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, &len) < 0) + fatal(errno, "error calling getsockopt SO_RCVBUF"); + if(target_rcvbuf > rcvbuf) { + if(setsockopt(rtpfd, SOL_SOCKET, SO_RCVBUF, + &target_rcvbuf, sizeof target_rcvbuf) < 0) + error(errno, "error calling setsockopt SO_RCVBUF %d", + target_rcvbuf); + /* We try to carry on anyway */ + else + info("changed socket receive buffer from %d to %d", + rcvbuf, target_rcvbuf); + } else + info("default socket receive buffer %d", rcvbuf); + if(logfp) + info("WARNING: -L option can impact performance"); play_rtp(); return 0; }