X-Git-Url: https://git.distorted.org.uk/~mdw/disorder/blobdiff_plain/0b75463fed40851e872ddb495ed679a62ff7ed20..9aa6b167410b3b0545e93ba9cdc2bf0a0785c00b:/clients/playrtp.c diff --git a/clients/playrtp.c b/clients/playrtp.c index 7f2b238..bf96e84 100644 --- a/clients/playrtp.c +++ b/clients/playrtp.c @@ -46,6 +46,8 @@ #include #endif +#define readahead linux_headers_are_borked + /** @brief RTP socket */ static int rtpfd; @@ -58,19 +60,28 @@ static const char *device; */ #define MAXSAMPLES 2048 -/** @brief Minimum buffer size +/** @brief Minimum low watermark * * We'll stop playing if there's only this many samples in the buffer. */ -#define MINBUFFER 8820 +static unsigned minbuffer = 2 * 44100 / 10; /* 0.2 seconds */ /** @brief Maximum sample size * * The maximum supported size (in bytes) of one sample. */ #define MAXSAMPLESIZE 2 -#define READAHEAD 88200 /* how far to read ahead */ +/** @brief Buffer high watermark + * + * We'll only start playing when this many samples are available. */ +static unsigned readahead = 2 * 2 * 44100; -#define MAXBUFFER (3 * 88200) /* maximum buffer contents */ +/** @brief Maximum buffer size + * + * We'll stop reading from the network if we have this many samples. */ +static unsigned maxbuffer; + +/** @brief Number of samples to infill by in one go */ +#define INFILL_SAMPLES (44100 * 2) /* 1s */ /** @brief Received packet * @@ -81,9 +92,7 @@ struct packet { * or mis-ordered there may be gaps at any given moment. */ struct packet *next; /** @brief Number of samples in this packet */ - int nsamples; - /** @brief Number of samples used from this packet */ - int nused; + uint32_t nsamples; /** @brief Timestamp from RTP packet * * NB that "timestamps" are really sample counters.*/ @@ -102,16 +111,22 @@ static unsigned long nsamples; /** @brief Linked list of packets * - * In ascending order of timestamp. */ + * In ascending order of timestamp. Really this should be a heap for more + * efficient access. */ static struct packet *packets; /** @brief Timestamp of next packet to play. * * This is set to the timestamp of the last packet, plus the number of - * samples it contained. + * samples it contained. Only valid if @ref active is nonzero. */ static uint32_t next_timestamp; +/** @brief True if actively playing + * + * This is true when playing and false when just buffering. */ +static int active; + /** @brief Lock protecting @ref packets */ static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; @@ -123,20 +138,47 @@ static const struct option options[] = { { "version", no_argument, 0, 'V' }, { "debug", no_argument, 0, 'd' }, { "device", required_argument, 0, 'D' }, + { "min", required_argument, 0, 'm' }, + { "max", required_argument, 0, 'x' }, + { "buffer", required_argument, 0, 'b' }, { 0, 0, 0, 0 } }; /** @brief Return true iff a < b in sequence-space arithmetic */ -static inline int lt(const struct packet *a, const struct packet *b) { - return (uint32_t)(a->timestamp - b->timestamp) & 0x80000000; +static inline int lt(uint32_t a, uint32_t b) { + return (uint32_t)(a - b) & 0x80000000; +} + +/** @brief Return true iff a >= b in sequence-space arithmetic */ +static inline int ge(uint32_t a, uint32_t b) { + return !lt(a, b); +} + +/** @brief Return true iff a > b in sequence-space arithmetic */ +static inline int gt(uint32_t a, uint32_t b) { + return lt(b, a); +} + +/** @brief Return true iff a <= b in sequence-space arithmetic */ +static inline int le(uint32_t a, uint32_t b) { + return !lt(b, a); +} + +/** @brief Drop the packet at the head of the queue */ +static void drop_first_packet(void) { + struct packet *const p = packets; + packets = p->next; + nsamples -= p->nsamples; + free(p); + pthread_cond_broadcast(&cond); } -/** Background thread collecting samples +/** @brief Background thread collecting samples * * This function collects samples, perhaps converts them to the target format, * and adds them to the packet list. */ static void *listen_thread(void attribute((unused)) *arg) { - struct packet *f = 0, **ff; + struct packet *p = 0, **pp; int n; union { struct rtp_header header; @@ -146,8 +188,8 @@ static void *listen_thread(void attribute((unused)) *arg) { + sizeof (struct rtp_header)); for(;;) { - if(!f) - f = xmalloc(sizeof *f); + if(!p) + p = xmalloc(sizeof *p); n = read(rtpfd, packet.bytes, sizeof packet.bytes); if(n < 0) { switch(errno) { @@ -160,18 +202,29 @@ static void *listen_thread(void attribute((unused)) *arg) { /* Ignore too-short packets */ if((size_t)n <= sizeof (struct rtp_header)) continue; + p->timestamp = ntohl(packet.header.timestamp); + /* Ignore packets in the past */ + if(active && lt(p->timestamp, next_timestamp)) { + info("dropping old packet, timestamp=%"PRIx32" < %"PRIx32, + p->timestamp, next_timestamp); + continue; + } /* Convert to target format */ switch(packet.header.mpt & 0x7F) { case 10: - f->nsamples = (n - sizeof (struct rtp_header)) / sizeof(uint16_t); + p->nsamples = (n - sizeof (struct rtp_header)) / sizeof(uint16_t); #if HAVE_COREAUDIO_AUDIOHARDWARE_H /* Convert to what Core Audio expects */ - for(n = 0; n < f->nsamples; ++n) - f->samples_float[n] = (int16_t)ntohs(samples[n]) * (0.5f / 32767); + { + size_t i; + + for(i = 0; i < p->nsamples; ++i) + p->samples_float[i] = (int16_t)ntohs(samples[i]) * (0.5f / 32767); + } #else /* ALSA can do any necessary conversion itself (though it might be better * to do any necessary conversion in the background) */ - memcpy(f->samples_raw, samples, n - sizeof (struct rtp_header)); + memcpy(p->samples_raw, samples, n - sizeof (struct rtp_header)); #endif break; /* TODO support other RFC3551 media types (when the speaker does) */ @@ -179,82 +232,124 @@ static void *listen_thread(void attribute((unused)) *arg) { fatal(0, "unsupported RTP payload type %d", packet.header.mpt & 0x7F); } - f->nused = 0; - f->timestamp = ntohl(packet.header.timestamp); pthread_mutex_lock(&lock); /* Stop reading if we've reached the maximum. * * This is rather unsatisfactory: it means that if packets get heavily * out of order then we guarantee dropouts. But for now... */ - while(nsamples >= MAXBUFFER) + while(nsamples >= maxbuffer) pthread_cond_wait(&cond, &lock); - for(ff = &packets; *ff && lt(*ff, f); ff = &(*ff)->next) + for(pp = &packets; + *pp && lt((*pp)->timestamp, p->timestamp); + pp = &(*pp)->next) ; - /* So now either !*ff or *ff >= f */ - if(*ff && f->timestamp == (*ff)->timestamp) { - /* *ff == f; a duplicate. Ideally we avoid the translation step here, + /* So now either !*pp or *pp >= p */ + if(*pp && p->timestamp == (*pp)->timestamp) { + /* *pp == p; a duplicate. Ideally we avoid the translation step here, * but we'll worry about that another time. */ - free(f); + info("dropped a duplicated"); } else { - f->next = *ff; - *ff = f; - nsamples += f->nsamples; + if(*pp) + info("receiving packets out of order"); + p->next = *pp; + *pp = p; + nsamples += p->nsamples; pthread_cond_broadcast(&cond); + p = 0; /* we've consumed this packet */ } pthread_mutex_unlock(&lock); - f = 0; } } #if HAVE_COREAUDIO_AUDIOHARDWARE_H -static OSStatus adioproc(AudioDeviceID inDevice, - const AudioTimeStamp *inNow, - const AudioBufferList *inInputData, - const AudioTimeStamp *inInputTime, - AudioBufferList *outOutputData, - const AudioTimeStamp *inOutputTime, - void *inClientData) { +/** @brief Callback from Core Audio */ +static OSStatus adioproc + (AudioDeviceID attribute((unused)) inDevice, + const AudioTimeStamp attribute((unused)) *inNow, + const AudioBufferList attribute((unused)) *inInputData, + const AudioTimeStamp attribute((unused)) *inInputTime, + AudioBufferList *outOutputData, + const AudioTimeStamp attribute((unused)) *inOutputTime, + void attribute((unused)) *inClientData) { UInt32 nbuffers = outOutputData->mNumberBuffers; AudioBuffer *ab = outOutputData->mBuffers; - float *samplesOut; /* where to write samples to */ - size_t samplesOutLeft; /* space left */ - size_t samplesInLeft; - size_t samplesToCopy; pthread_mutex_lock(&lock); - samplesOut = ab->data; - samplesOutLeft = ab->mDataByteSize / sizeof (float); - while(packets && nbuffers > 0) { - if(packets->used == packets->nsamples) { - /* TODO if we dropped a packet then we should introduce a gap here */ - struct packet *const f = packets; - packets = f->next; - free(f); - pthread_cond_broadcast(&cond); - continue; - } - if(samplesOutLeft == 0) { - --nbuffers; - ++ab; - samplesOut = ab->data; - samplesOutLeft = ab->mDataByteSize / sizeof (float); - continue; + while(nbuffers > 0) { + float *samplesOut = ab->mData; + size_t samplesOutLeft = ab->mDataByteSize / sizeof (float); + + while(samplesOutLeft > 0) { + if(packets) { + /* There's a packet */ + const uint32_t packet_start = packets->timestamp; + const uint32_t packet_end = packets->timestamp + packets->nsamples; + + if(le(packet_end, next_timestamp)) { + /* This packet is in the past */ + info("dropping buffered past packet %"PRIx32" < %"PRIx32, + packet_start, next_timestamp); + drop_first_packet(); + continue; + } + if(ge(next_timestamp, packet_start) + && lt(next_timestamp, packet_end)) { + /* This packet is suitable */ + const uint32_t offset = next_timestamp - packet_start; + uint32_t samples_available = packet_end - next_timestamp; + if(samples_available > samplesOutLeft) + samples_available = samplesOutLeft; + memcpy(samplesOut, + packets->samples_float + offset, + samples_available * sizeof(float)); + samplesOut += samples_available; + next_timestamp += samples_available; + samplesOutLeft -= samples_available; + if(ge(next_timestamp, packet_end)) + drop_first_packet(); + continue; + } + } + /* We didn't find a suitable packet (though there might still be + * unsuitable ones). We infill with 0s. */ + if(packets) { + /* There is a next packet, only infill up to that point */ + uint32_t samples_available = packets->timestamp - next_timestamp; + + if(samples_available > samplesOutLeft) + samples_available = samplesOutLeft; + info("infill by %"PRIu32, samples_available); + /* Convniently the buffer is 0 to start with */ + next_timestamp += samples_available; + samplesOut += samples_available; + samplesOutLeft -= samples_available; + /* TODO log infill */ + } else { + /* There's no next packet at all */ + info("infilled by %zu", samplesOutLeft); + next_timestamp += samplesOutLeft; + samplesOut += samplesOutLeft; + samplesOutLeft = 0; + /* TODO log infill */ + } } - /* Now: (1) there is some data left to read - * (2) there is some space to put it */ - samplesInLeft = packets->nsamples - packets->used; - samplesToCopy = (samplesInLeft < samplesOutLeft - ? samplesInLeft : samplesOutLeft); - memcpy(samplesOut, packet->samples + packets->used, samplesToCopy); - packets->used += samplesToCopy; - samplesOut += samplesToCopy; - samesOutLeft -= samplesToCopy; + ++ab; + --nbuffers; } pthread_mutex_unlock(&lock); return 0; } #endif +/** @brief Play an RTP stream + * + * This is the guts of the program. It is responsible for: + * - starting the listening thread + * - opening the audio device + * - reading ahead to build up a buffer + * - arranging for audio to be played + * - detecting when the buffer has got too small and re-buffering + */ static void play_rtp(void) { pthread_t ltid; @@ -277,6 +372,9 @@ static void play_rtp(void) { size_t samples_written; int prepared = 1; int err; + int infilling = 0, escape = 0; + time_t logged, now; + uint32_t packet_start, packet_end; /* Open ALSA */ if((err = snd_pcm_open(&pcm, @@ -320,63 +418,151 @@ static void play_rtp(void) { /* Ready to go */ + time(&logged); pthread_mutex_lock(&lock); for(;;) { /* Wait for the buffer to fill up a bit */ - while(nsamples < READAHEAD) + logged = now; + info("%lu samples in buffer (%lus)", nsamples, + nsamples / (44100 * 2)); + info("Buffering..."); + while(nsamples < readahead) pthread_cond_wait(&cond, &lock); if(!prepared) { if((err = snd_pcm_prepare(pcm))) fatal(0, "error calling snd_pcm_prepare: %d", err); prepared = 1; } + /* Start at the first available packet */ + next_timestamp = packets->timestamp; + active = 1; + infilling = 0; + escape = 0; + logged = now; + info("%lu samples in buffer (%lus)", nsamples, + nsamples / (44100 * 2)); + info("Playing..."); /* Wait until the buffer empties out */ - while(nsamples >= MINBUFFER) { + while(nsamples >= minbuffer && !escape) { + time(&now); + if(now > logged + 10) { + logged = now; + info("%lu samples in buffer (%lus)", nsamples, + nsamples / (44100 * 2)); + } + if(packets + && ge(next_timestamp, packets->timestamp + packets->nsamples)) { + info("dropping buffered past packet %"PRIx32" < %"PRIx32, + packets->timestamp, next_timestamp); + drop_first_packet(); + continue; + } /* Wait for ALSA to ask us for more data */ pthread_mutex_unlock(&lock); - snd_pcm_wait(pcm, -1); + write(2, ".", 1); /* TODO remove me sometime */ + switch(err = snd_pcm_wait(pcm, -1)) { + case 0: + info("snd_pcm_wait timed out"); + break; + case 1: + break; + default: + fatal(0, "snd_pcm_wait returned %d", err); + } pthread_mutex_lock(&lock); - /* ALSA wants more data */ - if(packets && packets->timestamp + packets->nused == next_timestamp) { - /* Hooray, we have a packet we can play */ - const size_t samples_available = packets->nsamples - packets->nused; + /* ALSA is ready for more data */ + packet_start = packets->timestamp; + packet_end = packets->timestamp + packets->nsamples; + if(ge(next_timestamp, packet_start) + && lt(next_timestamp, packet_end)) { + /* The target timestamp is somewhere in this packet */ + const uint32_t offset = next_timestamp - packets->timestamp; + const uint32_t samples_available = (packets->timestamp + packets->nsamples) - next_timestamp; const size_t frames_available = samples_available / 2; frames_written = snd_pcm_writei(pcm, - packets->samples_raw + packets->nused, + packets->samples_raw + offset, frames_available); - if(frames_written < 0) - fatal(0, "error calling snd_pcm_writei: %d", err); - samples_written = frames_written * 2; - packets->nused += samples_written; - next_timestamp += samples_written; - if(packets->nused == packets->nsamples) { - struct packet *f = packets; - - packets = f->next; - nsamples -= f->nsamples; - free(f); - pthread_cond_broadcast(&cond); + if(frames_written < 0) { + switch(frames_written) { + case -EAGAIN: + info("snd_pcm_wait() returned but we got -EAGAIN!"); + break; + case -EPIPE: + error(0, "error calling snd_pcm_writei: %ld", + (long)frames_written); + escape = 1; + break; + default: + fatal(0, "error calling snd_pcm_writei: %ld", + (long)frames_written); + } + } else { + samples_written = frames_written * 2; + next_timestamp += samples_written; + if(ge(next_timestamp, packet_end)) + drop_first_packet(); + infilling = 0; } } else { /* We don't have anything to play! We'd better play some 0s. */ - static const uint16_t zeros[1024]; - size_t samples_available = 1024, frames_available; - if(packets && next_timestamp + samples_available > packets->timestamp) + static const uint16_t zeros[INFILL_SAMPLES]; + size_t samples_available = INFILL_SAMPLES, frames_available; + + /* If the maximum infill would take us past the start of the next + * packet then we truncate the infill to the right amount. */ + if(lt(packets->timestamp, + next_timestamp + samples_available)) samples_available = packets->timestamp - next_timestamp; + if((int)samples_available < 0) { + info("packets->timestamp: %"PRIx32" next_timestamp: %"PRIx32" next+max: %"PRIx32" available: %"PRIx32, + packets->timestamp, next_timestamp, + next_timestamp + INFILL_SAMPLES, samples_available); + } frames_available = samples_available / 2; + if(!infilling) { + info("Infilling %d samples, next=%"PRIx32" packet=[%"PRIx32",%"PRIx32"]", + samples_available, next_timestamp, + packets->timestamp, packets->timestamp + packets->nsamples); + //infilling++; + } frames_written = snd_pcm_writei(pcm, zeros, frames_available); - if(frames_written < 0) - fatal(0, "error calling snd_pcm_writei: %d", err); - next_timestamp += samples_written; + if(frames_written < 0) { + switch(frames_written) { + case -EAGAIN: + info("snd_pcm_wait() returned but we got -EAGAIN!"); + break; + case -EPIPE: + error(0, "error calling snd_pcm_writei: %ld", + (long)frames_written); + escape = 1; + break; + default: + fatal(0, "error calling snd_pcm_writei: %ld", + (long)frames_written); + } + } else { + samples_written = frames_written * 2; + next_timestamp += samples_written; + } } } + active = 0; /* We stop playing for a bit until the buffer re-fills */ pthread_mutex_unlock(&lock); - if((err = snd_pcm_drain(pcm))) - fatal(0, "error calling snd_pcm_drain: %d", err); + if((err = snd_pcm_nonblock(pcm, 0))) + fatal(0, "error calling snd_pcm_nonblock: %d", err); + if(escape) { + if((err = snd_pcm_drop(pcm))) + fatal(0, "error calling snd_pcm_drop: %d", err); + escape = 0; + } else + if((err = snd_pcm_drain(pcm))) + fatal(0, "error calling snd_pcm_drain: %d", err); + if((err = snd_pcm_nonblock(pcm, 1))) + fatal(0, "error calling snd_pcm_nonblock: %d", err); prepared = 0; pthread_mutex_lock(&lock); } @@ -407,14 +593,14 @@ static void play_rtp(void) { if(status) fatal(0, "AudioHardwareGetProperty: %d", (int)status); D(("mSampleRate %f", asbd.mSampleRate)); - D(("mFormatID %08"PRIx32, asbd.mFormatID)); - D(("mFormatFlags %08"PRIx32, asbd.mFormatFlags)); - D(("mBytesPerPacket %08"PRIx32, asbd.mBytesPerPacket)); - D(("mFramesPerPacket %08"PRIx32, asbd.mFramesPerPacket)); - D(("mBytesPerFrame %08"PRIx32, asbd.mBytesPerFrame)); - D(("mChannelsPerFrame %08"PRIx32, asbd.mChannelsPerFrame)); - D(("mBitsPerChannel %08"PRIx32, asbd.mBitsPerChannel)); - D(("mReserved %08"PRIx32, asbd.mReserved)); + D(("mFormatID %08lx", asbd.mFormatID)); + D(("mFormatFlags %08lx", asbd.mFormatFlags)); + D(("mBytesPerPacket %08lx", asbd.mBytesPerPacket)); + D(("mFramesPerPacket %08lx", asbd.mFramesPerPacket)); + D(("mBytesPerFrame %08lx", asbd.mBytesPerFrame)); + D(("mChannelsPerFrame %08lx", asbd.mChannelsPerFrame)); + D(("mBitsPerChannel %08lx", asbd.mBitsPerChannel)); + D(("mReserved %08lx", asbd.mReserved)); if(asbd.mFormatID != kAudioFormatLinearPCM) fatal(0, "audio device does not support kAudioFormatLinearPCM"); status = AudioDeviceAddIOProc(adid, adioproc, 0); @@ -423,19 +609,24 @@ static void play_rtp(void) { pthread_mutex_lock(&lock); for(;;) { /* Wait for the buffer to fill up a bit */ - while(nsamples < READAHEAD) + info("Buffering..."); + while(nsamples < readahead) pthread_cond_wait(&cond, &lock); /* Start playing now */ + info("Playing..."); + next_timestamp = packets->timestamp; + active = 1; status = AudioDeviceStart(adid, adioproc); if(status) fatal(0, "AudioDeviceStart: %d", (int)status); /* Wait until the buffer empties out */ - while(nsamples >= MINBUFFER) + while(nsamples >= minbuffer) pthread_cond_wait(&cond, &lock); /* Stop playing for a bit until the buffer re-fills */ status = AudioDeviceStop(adid, adioproc); if(status) fatal(0, "AudioDeviceStop: %d", (int)status); + active = 0; /* Go back round */ } } @@ -449,10 +640,13 @@ static void help(void) { xprintf("Usage:\n" " disorder-playrtp [OPTIONS] ADDRESS [PORT]\n" "Options:\n" + " --device, -D DEVICE Output device\n" + " --min, -m FRAMES Buffer low water mark\n" + " --buffer, -b FRAMES Buffer high water mark\n" + " --max, -x FRAMES Buffer maximum size\n" " --help, -h Display usage message\n" " --version, -V Display version number\n" - " --debug, -d Turn on debugging\n" - " --device, -D DEVICE Output device\n"); + ); xfclose(stdout); exit(0); } @@ -483,15 +677,20 @@ int main(int argc, char **argv) { mem_init(); if(!setlocale(LC_CTYPE, "")) fatal(errno, "error calling setlocale"); - while((n = getopt_long(argc, argv, "hVdD", options, 0)) >= 0) { + while((n = getopt_long(argc, argv, "hVdD:m:b:x:", options, 0)) >= 0) { switch(n) { case 'h': help(); case 'V': version(); case 'd': debugging = 1; break; case 'D': device = optarg; break; + case 'm': minbuffer = 2 * atol(optarg); break; + case 'b': readahead = 2 * atol(optarg); break; + case 'x': maxbuffer = 2 * atol(optarg); break; default: fatal(0, "invalid option"); } } + if(!maxbuffer) + maxbuffer = 4 * readahead; argc -= optind; argv += optind; if(argc < 1 || argc > 2)