X-Git-Url: https://git.distorted.org.uk/~mdw/disorder/blobdiff_plain/8d0c14d705a46b6d090418104172cb5c7e49bb5f..28bacdc0e8ab3e4d8b7f628f59d65e4fa38b9622:/clients/playrtp.c diff --git a/clients/playrtp.c b/clients/playrtp.c index 671c318..86d8337 100644 --- a/clients/playrtp.c +++ b/clients/playrtp.c @@ -17,6 +17,11 @@ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA */ +/** @file clients/playrtp.c + * @brief RTP player + * + * This RTP player supports Linux (ALSA) and Darwin (Core Audio) systems. + */ #include #include "types.h" @@ -30,6 +35,8 @@ #include #include #include +#include +#include #include "log.h" #include "mem.h" @@ -38,6 +45,8 @@ #include "syscalls.h" #include "rtp.h" #include "defs.h" +#include "vector.h" +#include "heap.h" #if HAVE_COREAUDIO_AUDIOHARDWARE_H # include @@ -51,6 +60,9 @@ /** @brief RTP socket */ static int rtpfd; +/** @brief Log output */ +static FILE *logfp; + /** @brief Output device */ static const char *device; @@ -60,7 +72,7 @@ static const char *device; */ #define MAXSAMPLES 2048 -/** @brief Minimum buffer size +/** @brief Minimum low watermark * * We'll stop playing if there's only this many samples in the buffer. */ static unsigned minbuffer = 2 * 44100 / 10; /* 0.2 seconds */ @@ -70,47 +82,85 @@ static unsigned minbuffer = 2 * 44100 / 10; /* 0.2 seconds */ * The maximum supported size (in bytes) of one sample. */ #define MAXSAMPLESIZE 2 -/** @brief Buffer size +/** @brief Buffer high watermark * * We'll only start playing when this many samples are available. */ static unsigned readahead = 2 * 2 * 44100; -/** @brief Number of samples to infill by in one go */ -#define INFILL_SAMPLES (44100 * 2) /* 1s */ +/** @brief Maximum buffer size + * + * We'll stop reading from the network if we have this many samples. */ +static unsigned maxbuffer; -#define MAXBUFFER (3 * 88200) /* maximum buffer contents */ +/** @brief Number of samples to infill by in one go + * + * This is an upper bound - in practice we expxect the underlying audio API to + * only ask for a much smaller number of samples in any one go. + */ +#define INFILL_SAMPLES (44100 * 2) /* 1s */ /** @brief Received packet * - * Packets are recorded in an ordered linked list. */ + * Received packets are kept in a binary heap (see @ref pheap) ordered by + * timestamp. + */ struct packet { - /** @brief Pointer to next packet - * The next packet might not be immediately next: if packets are dropped - * or mis-ordered there may be gaps at any given moment. */ - struct packet *next; /** @brief Number of samples in this packet */ uint32_t nsamples; /** @brief Timestamp from RTP packet * - * NB that "timestamps" are really sample counters.*/ + * NB that "timestamps" are really sample counters. Use lt() or lt_packet() + * to compare timestamps. + */ uint32_t timestamp; -#if HAVE_COREAUDIO_AUDIOHARDWARE_H - /** @brief Converted sample data */ - float samples_float[MAXSAMPLES]; -#else - /** @brief Raw sample data */ + /** @brief Raw sample data + * + * Only the first @p nsamples samples are defined; the rest is uninitialized + * data. + */ unsigned char samples_raw[MAXSAMPLES * MAXSAMPLESIZE]; -#endif }; +/** @brief Return true iff \f$a < b\f$ in sequence-space arithmetic + * + * Specifically it returns true if \f$(a-b) mod 2^{32} < 2^{31}\f$. + * + * See also lt_packet(). + */ +static inline int lt(uint32_t a, uint32_t b) { + return (uint32_t)(a - b) & 0x80000000; +} + +/** @brief Return true iff a >= b in sequence-space arithmetic */ +static inline int ge(uint32_t a, uint32_t b) { + return !lt(a, b); +} + +/** @brief Return true iff a > b in sequence-space arithmetic */ +static inline int gt(uint32_t a, uint32_t b) { + return lt(b, a); +} + +/** @brief Return true iff a <= b in sequence-space arithmetic */ +static inline int le(uint32_t a, uint32_t b) { + return !lt(b, a); +} + +/** @brief Ordering for packets, used by @ref pheap */ +static inline int lt_packet(const struct packet *a, const struct packet *b) { + return lt(a->timestamp, b->timestamp); +} + +/** @struct pheap + * @brief Binary heap of packets ordered by timestamp */ +HEAP_TYPE(pheap, struct packet *, lt_packet); + +/** @brief Binary heap of received packets */ +static struct pheap packets; + /** @brief Total number of samples available */ static unsigned long nsamples; -/** @brief Linked list of packets - * - * In ascending order of timestamp. */ -static struct packet *packets; - /** @brief Timestamp of next packet to play. * * This is set to the timestamp of the last packet, plus the number of @@ -123,7 +173,42 @@ static uint32_t next_timestamp; * This is true when playing and false when just buffering. */ static int active; -/** @brief Lock protecting @ref packets */ +/** @brief Structure of free packet list */ +union free_packet { + struct packet p; + union free_packet *next; +}; + +/** @brief Linked list of free packets + * + * This is a linked list of formerly used packets. For preference we re-use + * packets that have already been used rather than unused ones, to limit the + * size of the program's working set. If there are no free packets in the list + * we try @ref next_free_packet instead. + * + * Must hold @ref lock when accessing this. + */ +static union free_packet *free_packets; + +/** @brief Array of new free packets + * + * There are @ref count_free_packets ready to use at this address. If there + * are none left we allocate more memory. + * + * Must hold @ref lock when accessing this. + */ +static union free_packet *next_free_packet; + +/** @brief Count of new free packets at @ref next_free_packet + * + * Must hold @ref lock when accessing this. + */ +static size_t count_free_packets; + +/** @brief Lock protecting @ref packets + * + * This also protects the packet memory allocation infrastructure, @ref + * free_packets and @ref next_free_packet. */ static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; /** @brief Condition variable signalled whenever @ref packets is changed */ @@ -135,28 +220,51 @@ static const struct option options[] = { { "debug", no_argument, 0, 'd' }, { "device", required_argument, 0, 'D' }, { "min", required_argument, 0, 'm' }, + { "max", required_argument, 0, 'x' }, { "buffer", required_argument, 0, 'b' }, { 0, 0, 0, 0 } }; -/** @brief Return true iff a < b in sequence-space arithmetic */ -static inline int lt(uint32_t a, uint32_t b) { - return (uint32_t)(a - b) & 0x80000000; -} - -/** @brief Return true iff a >= b in sequence-space arithmetic */ -static inline int ge(uint32_t a, uint32_t b) { - return !lt(a, b); +/** @brief Return a new packet + * + * Assumes that @ref lock is held. */ +static struct packet *new_packet(void) { + struct packet *p; + + if(free_packets) { + p = &free_packets->p; + free_packets = free_packets->next; + } else { + if(!count_free_packets) { + next_free_packet = xcalloc(1024, sizeof (union free_packet)); + count_free_packets = 1024; + } + p = &(next_free_packet++)->p; + --count_free_packets; + } + return p; } -/** @brief Return true iff a > b in sequence-space arithmetic */ -static inline int gt(uint32_t a, uint32_t b) { - return lt(b, a); +/** @brief Free a packet + * + * Assumes that @ref lock is held. */ +static void free_packet(struct packet *p) { + union free_packet *u = (union free_packet *)p; + u->next = free_packets; + free_packets = u; } -/** @brief Return true iff a <= b in sequence-space arithmetic */ -static inline int le(uint32_t a, uint32_t b) { - return !lt(b, a); +/** @brief Drop the first packet + * + * Assumes that @ref lock is held. + */ +static void drop_first_packet(void) { + if(pheap_count(&packets)) { + struct packet *const p = pheap_remove(&packets); + nsamples -= p->nsamples; + free_packet(p); + pthread_cond_broadcast(&cond); + } } /** @brief Background thread collecting samples @@ -164,19 +272,24 @@ static inline int le(uint32_t a, uint32_t b) { * This function collects samples, perhaps converts them to the target format, * and adds them to the packet list. */ static void *listen_thread(void attribute((unused)) *arg) { - struct packet *p = 0, **pp; + struct packet *p = 0; int n; - union { - struct rtp_header header; - uint8_t bytes[sizeof(uint16_t) * MAXSAMPLES + sizeof (struct rtp_header)]; - } packet; - const uint16_t *const samples = (uint16_t *)(packet.bytes - + sizeof (struct rtp_header)); + struct rtp_header header; + uint16_t seq; + uint32_t timestamp; + struct iovec iov[2]; for(;;) { - if(!p) - p = xmalloc(sizeof *p); - n = read(rtpfd, packet.bytes, sizeof packet.bytes); + if(!p) { + pthread_mutex_lock(&lock); + p = new_packet(); + pthread_mutex_unlock(&lock); + } + iov[0].iov_base = &header; + iov[0].iov_len = sizeof header; + iov[1].iov_base = p->samples_raw; + iov[1].iov_len = sizeof p->samples_raw; + n = readv(rtpfd, iov, 2); if(n < 0) { switch(errno) { case EINTR: @@ -186,104 +299,128 @@ static void *listen_thread(void attribute((unused)) *arg) { } } /* Ignore too-short packets */ - if((size_t)n <= sizeof (struct rtp_header)) + if((size_t)n <= sizeof (struct rtp_header)) { + info("ignored a short packet"); continue; - p->timestamp = ntohl(packet.header.timestamp); + } + timestamp = htonl(header.timestamp); + seq = htons(header.seq); /* Ignore packets in the past */ - if(active && lt(p->timestamp, next_timestamp)) { + if(active && lt(timestamp, next_timestamp)) { info("dropping old packet, timestamp=%"PRIx32" < %"PRIx32, - p->timestamp, next_timestamp); + timestamp, next_timestamp); continue; } + pthread_mutex_lock(&lock); + p = new_packet(); + p->timestamp = timestamp; /* Convert to target format */ - switch(packet.header.mpt & 0x7F) { + switch(header.mpt & 0x7F) { case 10: - p->nsamples = (n - sizeof (struct rtp_header)) / sizeof(uint16_t); -#if HAVE_COREAUDIO_AUDIOHARDWARE_H - /* Convert to what Core Audio expects */ - for(n = 0; n < p->nsamples; ++n) - p->samples_float[n] = (int16_t)ntohs(samples[n]) * (0.5f / 32767); -#else + p->nsamples = (n - sizeof header) / sizeof(uint16_t); /* ALSA can do any necessary conversion itself (though it might be better * to do any necessary conversion in the background) */ - memcpy(p->samples_raw, samples, n - sizeof (struct rtp_header)); -#endif + /* TODO we could readv into the buffer */ break; /* TODO support other RFC3551 media types (when the speaker does) */ default: fatal(0, "unsupported RTP payload type %d", - packet.header.mpt & 0x7F); + header.mpt & 0x7F); } - pthread_mutex_lock(&lock); + if(logfp) + fprintf(logfp, "sequence %u timestamp %"PRIx32" length %"PRIx32" end %"PRIx32"\n", + seq, timestamp, p->nsamples, timestamp + p->nsamples); /* Stop reading if we've reached the maximum. * * This is rather unsatisfactory: it means that if packets get heavily * out of order then we guarantee dropouts. But for now... */ - while(nsamples >= MAXBUFFER) - pthread_cond_wait(&cond, &lock); - for(pp = &packets; - *pp && lt((*pp)->timestamp, p->timestamp); - pp = &(*pp)->next) - ; - /* So now either !*pp or *pp >= p */ - if(*pp && p->timestamp == (*pp)->timestamp) { - /* *pp == p; a duplicate. Ideally we avoid the translation step here, - * but we'll worry about that another time. */ - } else { - p->next = *pp; - *pp = p; - nsamples += p->nsamples; - pthread_cond_broadcast(&cond); - p = 0; /* we've consumed this packet */ + if(nsamples >= maxbuffer) { + info("buffer full"); + while(nsamples >= maxbuffer) + pthread_cond_wait(&cond, &lock); } + /* Add the packet to the heap */ + pheap_insert(&packets, p); + nsamples += p->nsamples; + pthread_cond_broadcast(&cond); pthread_mutex_unlock(&lock); } } +/** @brief Return true if @p p contains @p timestamp */ +static inline int contains(const struct packet *p, uint32_t timestamp) { + const uint32_t packet_start = p->timestamp; + const uint32_t packet_end = p->timestamp + p->nsamples; + + return (ge(timestamp, packet_start) + && lt(timestamp, packet_end)); +} + #if HAVE_COREAUDIO_AUDIOHARDWARE_H /** @brief Callback from Core Audio */ -static OSStatus adioproc(AudioDeviceID inDevice, - const AudioTimeStamp *inNow, - const AudioBufferList *inInputData, - const AudioTimeStamp *inInputTime, - AudioBufferList *outOutputData, - const AudioTimeStamp *inOutputTime, - void *inClientData) { +static OSStatus adioproc + (AudioDeviceID attribute((unused)) inDevice, + const AudioTimeStamp attribute((unused)) *inNow, + const AudioBufferList attribute((unused)) *inInputData, + const AudioTimeStamp attribute((unused)) *inInputTime, + AudioBufferList *outOutputData, + const AudioTimeStamp attribute((unused)) *inOutputTime, + void attribute((unused)) *inClientData) { UInt32 nbuffers = outOutputData->mNumberBuffers; AudioBuffer *ab = outOutputData->mBuffers; - float *samplesOut; /* where to write samples to */ - size_t samplesOutLeft; /* space left */ - size_t samplesInLeft; - size_t samplesToCopy; + const struct packet *p; + uint32_t samples_available; pthread_mutex_lock(&lock); - samplesOut = ab->data; - samplesOutLeft = ab->mDataByteSize / sizeof (float); - while(packets && nbuffers > 0) { - if(packets->used == packets->nsamples) { - /* TODO if we dropped a packet then we should introduce a gap here */ - struct packet *const p = packets; - packets = p->next; - free(p); - pthread_cond_broadcast(&cond); - continue; - } - if(samplesOutLeft == 0) { - --nbuffers; - ++ab; - samplesOut = ab->data; - samplesOutLeft = ab->mDataByteSize / sizeof (float); - continue; + while(nbuffers > 0) { + float *samplesOut = ab->mData; + size_t samplesOutLeft = ab->mDataByteSize / sizeof (float); + + while(samplesOutLeft > 0) { + /* Look for a suitable packet, dropping any unsuitable ones along the + * way. Unsuitable packets are ones that are in the past. */ + while(pheap_count(&packets)) { + p = pheap_first(&packets); + if(le(p->timestamp + p->nsamples, next_timestamp)) + /* This packet is in the past. Drop it and try another one. */ + drop_first_packet(); + else + /* This packet is NOT in the past. (It might be in the future + * however.) */ + break; + } + p = pheap_count(&packets) ? pheap_first(&packets) : 0; + if(p && contains(p, next_timestamp)) { + /* This packet is ready to play */ + const uint32_t packet_end = p->timestamp + p->nsamples; + const uint32_t offset = next_timestamp - p->timestamp; + const uint16_t *ptr = + (void *)(p->samples_raw + offset * sizeof (uint16_t)); + + samples_available = packet_end - next_timestamp; + if(samples_available > samplesOutLeft) + samples_available = samplesOutLeft; + next_timestamp += samples_available; + samplesOutLeft -= samples_available; + while(samples_available-- > 0) + *samplesOut++ = (int16_t)ntohs(*ptr++) * (0.5 / 32767); + /* We don't bother junking the packet - that'll be dealt with next time + * round */ + } else { + /* No packet is ready to play (and there might be no packet at all) */ + samples_available = p ? p->timestamp - next_timestamp + : samplesOutLeft; + if(samples_available > samplesOutLeft) + samples_available = samplesOutLeft; + info("infill by %"PRIu32, samples_available); + /* Conveniently the buffer is 0 to start with */ + next_timestamp += samples_available; + samplesOut += samples_available; + samplesOutLeft -= samples_available; + } } - /* Now: (1) there is some data left to read - * (2) there is some space to put it */ - samplesInLeft = packets->nsamples - packets->used; - samplesToCopy = (samplesInLeft < samplesOutLeft - ? samplesInLeft : samplesOutLeft); - memcpy(samplesOut, packet->samples + packets->used, samplesToCopy); - packets->used += samplesToCopy; - samplesOut += samplesToCopy; - samesOutLeft -= samplesToCopy; + ++ab; + --nbuffers; } pthread_mutex_unlock(&lock); return 0; @@ -382,8 +519,6 @@ static void play_rtp(void) { fatal(0, "error calling snd_pcm_prepare: %d", err); prepared = 1; } - /* Start at the first available packet */ - next_timestamp = packets->timestamp; active = 1; infilling = 0; escape = 0; @@ -401,22 +536,23 @@ static void play_rtp(void) { } if(packets && ge(next_timestamp, packets->timestamp + packets->nsamples)) { - struct packet *p = packets; - info("dropping buffered past packet %"PRIx32" < %"PRIx32, packets->timestamp, next_timestamp); - - packets = p->next; - if(packets) - assert(lt(p->timestamp, packets->timestamp)); - nsamples -= p->nsamples; - free(p); - pthread_cond_broadcast(&cond); + drop_first_packet(); continue; } /* Wait for ALSA to ask us for more data */ pthread_mutex_unlock(&lock); - snd_pcm_wait(pcm, -1); + write(2, ".", 1); /* TODO remove me sometime */ + switch(err = snd_pcm_wait(pcm, -1)) { + case 0: + info("snd_pcm_wait timed out"); + break; + case 1: + break; + default: + fatal(0, "snd_pcm_wait returned %d", err); + } pthread_mutex_lock(&lock); /* ALSA is ready for more data */ packet_start = packets->timestamp; @@ -448,17 +584,8 @@ static void play_rtp(void) { } else { samples_written = frames_written * 2; next_timestamp += samples_written; - if(ge(next_timestamp, packet_end)) { - /* We're done with this packet */ - struct packet *p = packets; - - packets = p->next; - if(packets) - assert(lt(p->timestamp, packets->timestamp)); - nsamples -= p->nsamples; - free(p); - pthread_cond_broadcast(&cond); - } + if(ge(next_timestamp, packet_end)) + drop_first_packet(); infilling = 0; } } else { @@ -550,14 +677,14 @@ static void play_rtp(void) { if(status) fatal(0, "AudioHardwareGetProperty: %d", (int)status); D(("mSampleRate %f", asbd.mSampleRate)); - D(("mFormatID %08"PRIx32, asbd.mFormatID)); - D(("mFormatFlags %08"PRIx32, asbd.mFormatFlags)); - D(("mBytesPerPacket %08"PRIx32, asbd.mBytesPerPacket)); - D(("mFramesPerPacket %08"PRIx32, asbd.mFramesPerPacket)); - D(("mBytesPerFrame %08"PRIx32, asbd.mBytesPerFrame)); - D(("mChannelsPerFrame %08"PRIx32, asbd.mChannelsPerFrame)); - D(("mBitsPerChannel %08"PRIx32, asbd.mBitsPerChannel)); - D(("mReserved %08"PRIx32, asbd.mReserved)); + D(("mFormatID %08lx", asbd.mFormatID)); + D(("mFormatFlags %08lx", asbd.mFormatFlags)); + D(("mBytesPerPacket %08lx", asbd.mBytesPerPacket)); + D(("mFramesPerPacket %08lx", asbd.mFramesPerPacket)); + D(("mBytesPerFrame %08lx", asbd.mBytesPerFrame)); + D(("mChannelsPerFrame %08lx", asbd.mChannelsPerFrame)); + D(("mBitsPerChannel %08lx", asbd.mBitsPerChannel)); + D(("mReserved %08lx", asbd.mReserved)); if(asbd.mFormatID != kAudioFormatLinearPCM) fatal(0, "audio device does not support kAudioFormatLinearPCM"); status = AudioDeviceAddIOProc(adid, adioproc, 0); @@ -566,9 +693,13 @@ static void play_rtp(void) { pthread_mutex_lock(&lock); for(;;) { /* Wait for the buffer to fill up a bit */ + info("Buffering..."); while(nsamples < readahead) pthread_cond_wait(&cond, &lock); /* Start playing now */ + info("Playing..."); + next_timestamp = pheap_first(&packets)->timestamp; + active = 1; status = AudioDeviceStart(adid, adioproc); if(status) fatal(0, "AudioDeviceStart: %d", (int)status); @@ -579,6 +710,7 @@ static void play_rtp(void) { status = AudioDeviceStop(adid, adioproc); if(status) fatal(0, "AudioDeviceStop: %d", (int)status); + active = 0; /* Go back round */ } } @@ -592,12 +724,13 @@ static void help(void) { xprintf("Usage:\n" " disorder-playrtp [OPTIONS] ADDRESS [PORT]\n" "Options:\n" - " --help, -h Display usage message\n" - " --version, -V Display version number\n" - " --debug, -d Turn on debugging\n" " --device, -D DEVICE Output device\n" " --min, -m FRAMES Buffer low water mark\n" - " --buffer, -b FRAMES Buffer high water mark\n"); + " --buffer, -b FRAMES Buffer high water mark\n" + " --max, -x FRAMES Buffer maximum size\n" + " --help, -h Display usage message\n" + " --version, -V Display version number\n" + ); xfclose(stdout); exit(0); } @@ -628,7 +761,7 @@ int main(int argc, char **argv) { mem_init(); if(!setlocale(LC_CTYPE, "")) fatal(errno, "error calling setlocale"); - while((n = getopt_long(argc, argv, "hVdD:m:b:", options, 0)) >= 0) { + while((n = getopt_long(argc, argv, "hVdD:m:b:x:L:", options, 0)) >= 0) { switch(n) { case 'h': help(); case 'V': version(); @@ -636,9 +769,13 @@ int main(int argc, char **argv) { case 'D': device = optarg; break; case 'm': minbuffer = 2 * atol(optarg); break; case 'b': readahead = 2 * atol(optarg); break; + case 'x': maxbuffer = 2 * atol(optarg); break; + case 'L': logfp = fopen(optarg, "w"); break; default: fatal(0, "invalid option"); } } + if(!maxbuffer) + maxbuffer = 4 * readahead; argc -= optind; argv += optind; if(argc < 1 || argc > 2)