X-Git-Url: https://git.distorted.org.uk/~mdw/disorder/blobdiff_plain/e83d0967d4c0965eb8036248acc20d1bf12ad1d8..09ee2f0d809da23b6b442233b5c3d94a6e64bdd2:/clients/playrtp.c?ds=inline diff --git a/clients/playrtp.c b/clients/playrtp.c index 7ec35d0..5606fb5 100644 --- a/clients/playrtp.c +++ b/clients/playrtp.c @@ -29,6 +29,7 @@ #include #include #include +#include #include "log.h" #include "mem.h" @@ -36,57 +37,112 @@ #include "addr.h" #include "syscalls.h" #include "rtp.h" -#include "debug.h" +#include "defs.h" #if HAVE_COREAUDIO_AUDIOHARDWARE_H # include #endif +#if API_ALSA +#include +#endif +/** @brief RTP socket */ static int rtpfd; -#define MAXSAMPLES 2048 /* max samples/frame we'll support */ -/* NB two channels = two samples in this program! */ -#define MINBUFFER 8820 /* when to stop playing */ +/** @brief Output device */ +static const char *device; + +/** @brief Maximum samples per packet we'll support + * + * NB that two channels = two samples in this program. + */ +#define MAXSAMPLES 2048 + +/** @brief Minimum buffer size + * + * We'll stop playing if there's only this many samples in the buffer. */ +#define MINBUFFER 8820 + +/** @brief Maximum sample size + * + * The maximum supported size (in bytes) of one sample. */ +#define MAXSAMPLESIZE 2 + #define READAHEAD 88200 /* how far to read ahead */ + #define MAXBUFFER (3 * 88200) /* maximum buffer contents */ -struct frame { - struct frame *next; /* another frame */ - int nsamples; /* number of samples */ - int nused; /* number of samples used so far */ - uint32_t timestamp; /* timestamp from packet */ +/** @brief Received packet + * + * Packets are recorded in an ordered linked list. */ +struct packet { + /** @brief Pointer to next packet + * The next packet might not be immediately next: if packets are dropped + * or mis-ordered there may be gaps at any given moment. */ + struct packet *next; + /** @brief Number of samples in this packet */ + int nsamples; + /** @brief Number of samples used from this packet */ + int nused; + /** @brief Timestamp from RTP packet + * + * NB that "timestamps" are really sample counters.*/ + uint32_t timestamp; #if HAVE_COREAUDIO_AUDIOHARDWARE_H - float samples[MAXSAMPLES]; /* converted sample data */ + /** @brief Converted sample data */ + float samples_float[MAXSAMPLES]; +#else + /** @brief Raw sample data */ + unsigned char samples_raw[MAXSAMPLES * MAXSAMPLESIZE]; #endif }; -static unsigned long nsamples; /* total samples available */ +/** @brief Total number of samples available */ +static unsigned long nsamples; -static struct frame *frames; /* received frames in ascending order - * of timestamp */ +/** @brief Linked list of packets + * + * In ascending order of timestamp. */ +static struct packet *packets; + +/** @brief Timestamp of next packet to play. + * + * This is set to the timestamp of the last packet, plus the number of + * samples it contained. Only valid if @ref active is nonzero. + */ +static uint32_t next_timestamp; + +/** @brief True if actively playing + * + * This is true when playing and false when just buffering. */ +static int active; + +/** @brief Lock protecting @ref packets */ static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; -/* lock protecting frame list */ -static pthread_cond_t cond = PTHREAD_CONDVAR_INITIALIZER; -/* signalled whenever we add a new frame */ +/** @brief Condition variable signalled whenever @ref packets is changed */ +static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; static const struct option options[] = { { "help", no_argument, 0, 'h' }, { "version", no_argument, 0, 'V' }, { "debug", no_argument, 0, 'd' }, + { "device", required_argument, 0, 'D' }, { 0, 0, 0, 0 } }; -/* Return true iff a > b in sequence-space arithmetic */ -static inline int gt(const struct frame *a, const struct frame *b) { - return (uint32_t)(a->timestamp - b->timestamp) < 0x80000000; +/** @brief Return true iff a < b in sequence-space arithmetic */ +static inline int lt(uint32_t a, uint32_t b) { + return (uint32_t)(a - b) & 0x80000000; } -/* Background thread that reads frames over the network and add them to the - * list */ -static listen_thread(void attribute((unused)) *arg) { - struct frame *f = 0, **ff; - int n, i; +/** @brief Background thread collecting samples + * + * This function collects samples, perhaps converts them to the target format, + * and adds them to the packet list. */ +static void *listen_thread(void attribute((unused)) *arg) { + struct packet *p = 0, **pp; + int n; union { struct rtp_header header; uint8_t bytes[sizeof(uint16_t) * MAXSAMPLES + sizeof (struct rtp_header)]; @@ -95,8 +151,8 @@ static listen_thread(void attribute((unused)) *arg) { + sizeof (struct rtp_header)); for(;;) { - if(!f) - f = xmalloc(sizeof *f); + if(!p) + p = xmalloc(sizeof *p); n = read(rtpfd, packet.bytes, sizeof packet.bytes); if(n < 0) { switch(errno) { @@ -106,38 +162,61 @@ static listen_thread(void attribute((unused)) *arg) { fatal(errno, "error reading from socket"); } } -#if HAVE_COREAUDIO_AUDIOHARDWARE_H + /* Ignore too-short packets */ + if((size_t)n <= sizeof (struct rtp_header)) + continue; + p->nused = 0; + p->timestamp = ntohl(packet.header.timestamp); + /* Ignore packets in the past */ + if(active && lt(p->timestamp, next_timestamp)) + continue; /* Convert to target format */ - switch(packet.header.mtp & 0x7F) { + switch(packet.header.mpt & 0x7F) { case 10: - f->nsamples = (n - sizeof (struct rtp_header)) / sizeof(uint16_t); - for(i = 0; i < f->nsamples; ++i) - f->samples[i] = (int16_t)ntohs(samples[i]) * (0.5f / 32767); + p->nsamples = (n - sizeof (struct rtp_header)) / sizeof(uint16_t); +#if HAVE_COREAUDIO_AUDIOHARDWARE_H + /* Convert to what Core Audio expects */ + for(n = 0; n < p->nsamples; ++n) + p->samples_float[n] = (int16_t)ntohs(samples[n]) * (0.5f / 32767); +#else + /* ALSA can do any necessary conversion itself (though it might be better + * to do any necessary conversion in the background) */ + memcpy(p->samples_raw, samples, n - sizeof (struct rtp_header)); +#endif break; /* TODO support other RFC3551 media types (when the speaker does) */ default: - fatal(0, "unsupported RTP payload type %d", + fatal(0, "unsupported RTP payload type %d", packet.header.mpt & 0x7F); } -#endif - f->used = 0; - f->timestamp = ntohl(packet.header.timestamp); pthread_mutex_lock(&lock); - /* Stop reading if we've reached the maximum */ + /* Stop reading if we've reached the maximum. + * + * This is rather unsatisfactory: it means that if packets get heavily + * out of order then we guarantee dropouts. But for now... */ while(nsamples >= MAXBUFFER) pthread_cond_wait(&cond, &lock); - for(ff = &frames; *ff && !gt(*ff, f); ff = &(*ff)->next) + for(pp = &packets; + *pp && lt((*pp)->timestamp, p->timestamp); + pp = &(*pp)->next) ; - f->next = *ff; - *ff = f; - nsamples += f->nsamples; - pthread_cond_broadcast(&cond); + /* So now either !*pp or *pp >= p */ + if(*pp && p->timestamp == (*pp)->timestamp) { + /* *pp == p; a duplicate. Ideally we avoid the translation step here, + * but we'll worry about that another time. */ + } else { + p->next = *pp; + *pp = p; + nsamples += p->nsamples; + pthread_cond_broadcast(&cond); + p = 0; /* we've consumed this packet */ + } pthread_mutex_unlock(&lock); - f = 0; } } #if HAVE_COREAUDIO_AUDIOHARDWARE_H +/** @brief Callback from Core Audio */ static OSStatus adioproc(AudioDeviceID inDevice, const AudioTimeStamp *inNow, const AudioBufferList *inInputData, @@ -152,15 +231,15 @@ static OSStatus adioproc(AudioDeviceID inDevice, size_t samplesInLeft; size_t samplesToCopy; - pthread_mutex_lock(&lock); + pthread_mutex_lock(&lock); samplesOut = ab->data; samplesOutLeft = ab->mDataByteSize / sizeof (float); - while(frames && nbuffers > 0) { - if(frames->used == frames->nsamples) { + while(packets && nbuffers > 0) { + if(packets->used == packets->nsamples) { /* TODO if we dropped a packet then we should introduce a gap here */ - struct frame *const f = frames; - frames = f->next; - free(f); + struct packet *const p = packets; + packets = p->next; + free(p); pthread_cond_broadcast(&cond); continue; } @@ -173,11 +252,11 @@ static OSStatus adioproc(AudioDeviceID inDevice, } /* Now: (1) there is some data left to read * (2) there is some space to put it */ - samplesInLeft = frames->nsamples - frames->used; + samplesInLeft = packets->nsamples - packets->used; samplesToCopy = (samplesInLeft < samplesOutLeft ? samplesInLeft : samplesOutLeft); - memcpy(samplesOut, frame->samples + frames->used, samplesToCopy); - frames->used += samplesToCopy; + memcpy(samplesOut, packet->samples + packets->used, samplesToCopy); + packets->used += samplesToCopy; samplesOut += samplesToCopy; samesOutLeft -= samplesToCopy; } @@ -186,13 +265,147 @@ static OSStatus adioproc(AudioDeviceID inDevice, } #endif -void play_rtp(void) { - pthread_t lt; +/** @brief Play an RTP stream + * + * This is the guts of the program. It is responsible for: + * - starting the listening thread + * - opening the audio device + * - reading ahead to build up a buffer + * - arranging for audio to be played + * - detecting when the buffer has got too small and re-buffering + */ +static void play_rtp(void) { + pthread_t ltid; /* We receive and convert audio data in a background thread */ - pthread_create(<, 0, listen_thread, 0); + pthread_create(<id, 0, listen_thread, 0); #if API_ALSA - assert(!"implemented"); + { + snd_pcm_t *pcm; + snd_pcm_hw_params_t *hwparams; + snd_pcm_sw_params_t *swparams; + /* Only support one format for now */ + const int sample_format = SND_PCM_FORMAT_S16_BE; + unsigned rate = 44100; + const int channels = 2; + const int samplesize = channels * sizeof(uint16_t); + snd_pcm_uframes_t pcm_bufsize = MAXSAMPLES * samplesize * 3; + /* If we can write more than this many samples we'll get a wakeup */ + const int avail_min = 256; + snd_pcm_sframes_t frames_written; + size_t samples_written; + int prepared = 1; + int err; + + /* Open ALSA */ + if((err = snd_pcm_open(&pcm, + device ? device : "default", + SND_PCM_STREAM_PLAYBACK, + SND_PCM_NONBLOCK))) + fatal(0, "error from snd_pcm_open: %d", err); + /* Set up 'hardware' parameters */ + snd_pcm_hw_params_alloca(&hwparams); + if((err = snd_pcm_hw_params_any(pcm, hwparams)) < 0) + fatal(0, "error from snd_pcm_hw_params_any: %d", err); + if((err = snd_pcm_hw_params_set_access(pcm, hwparams, + SND_PCM_ACCESS_RW_INTERLEAVED)) < 0) + fatal(0, "error from snd_pcm_hw_params_set_access: %d", err); + if((err = snd_pcm_hw_params_set_format(pcm, hwparams, + sample_format)) < 0) + fatal(0, "error from snd_pcm_hw_params_set_format (%d): %d", + sample_format, err); + if((err = snd_pcm_hw_params_set_rate_near(pcm, hwparams, &rate, 0)) < 0) + fatal(0, "error from snd_pcm_hw_params_set_rate (%d): %d", + rate, err); + if((err = snd_pcm_hw_params_set_channels(pcm, hwparams, + channels)) < 0) + fatal(0, "error from snd_pcm_hw_params_set_channels (%d): %d", + channels, err); + if((err = snd_pcm_hw_params_set_buffer_size_near(pcm, hwparams, + &pcm_bufsize)) < 0) + fatal(0, "error from snd_pcm_hw_params_set_buffer_size (%d): %d", + MAXSAMPLES * samplesize * 3, err); + if((err = snd_pcm_hw_params(pcm, hwparams)) < 0) + fatal(0, "error calling snd_pcm_hw_params: %d", err); + /* Set up 'software' parameters */ + snd_pcm_sw_params_alloca(&swparams); + if((err = snd_pcm_sw_params_current(pcm, swparams)) < 0) + fatal(0, "error calling snd_pcm_sw_params_current: %d", err); + if((err = snd_pcm_sw_params_set_avail_min(pcm, swparams, avail_min)) < 0) + fatal(0, "error calling snd_pcm_sw_params_set_avail_min %d: %d", + avail_min, err); + if((err = snd_pcm_sw_params(pcm, swparams)) < 0) + fatal(0, "error calling snd_pcm_sw_params: %d", err); + + /* Ready to go */ + + pthread_mutex_lock(&lock); + for(;;) { + /* Wait for the buffer to fill up a bit */ + while(nsamples < READAHEAD) + pthread_cond_wait(&cond, &lock); + if(!prepared) { + if((err = snd_pcm_prepare(pcm))) + fatal(0, "error calling snd_pcm_prepare: %d", err); + prepared = 1; + } + /* Start at the first available packet */ + next_timestamp = packets->timestamp; + active = 1; + /* Wait until the buffer empties out */ + while(nsamples >= MINBUFFER) { + /* Wait for ALSA to ask us for more data */ + pthread_mutex_unlock(&lock); + snd_pcm_wait(pcm, -1); + pthread_mutex_lock(&lock); + /* ALSA is ready for more data */ + if(packets && packets->timestamp + packets->nused == next_timestamp) { + /* Hooray, we have a packet we can play */ + const size_t samples_available = packets->nsamples - packets->nused; + const size_t frames_available = samples_available / 2; + + frames_written = snd_pcm_writei(pcm, + packets->samples_raw + packets->nused, + frames_available); + if(frames_written < 0) + fatal(0, "error calling snd_pcm_writei: %d", err); + samples_written = frames_written * 2; + packets->nused += samples_written; + next_timestamp += samples_written; + if(packets->nused == packets->nsamples) { + /* We're done with this packet */ + struct packet *p = packets; + + packets = p->next; + nsamples -= p->nsamples; + free(p); + pthread_cond_broadcast(&cond); + } + } else { + /* We don't have anything to play! We'd better play some 0s. */ + static const uint16_t zeros[1024]; + size_t samples_available = 1024, frames_available; + if(packets && next_timestamp + samples_available > packets->timestamp) + samples_available = packets->timestamp - next_timestamp; + frames_available = samples_available / 2; + frames_written = snd_pcm_writei(pcm, + zeros, + frames_available); + if(frames_written < 0) + fatal(0, "error calling snd_pcm_writei: %d", err); + next_timestamp += samples_written; + } + } + active = 0; + /* We stop playing for a bit until the buffer re-fills */ + pthread_mutex_unlock(&lock); + if((err = snd_pcm_drain(pcm))) + fatal(0, "error calling snd_pcm_drain: %d", err); + prepared = 0; + pthread_mutex_lock(&lock); + } + + } #elif HAVE_COREAUDIO_AUDIOHARDWARE_H { OSStatus status; @@ -262,7 +475,8 @@ static void help(void) { "Options:\n" " --help, -h Display usage message\n" " --version, -V Display version number\n" - " --debug, -d Turn on debugging\n"); + " --debug, -d Turn on debugging\n" + " --device, -D DEVICE Output device\n"); xfclose(stdout); exit(0); } @@ -278,9 +492,9 @@ int main(int argc, char **argv) { int n; struct addrinfo *res; struct stringlist sl; - const char *sockname; + char *sockname; - static const struct addrinfo prefbind = { + static const struct addrinfo prefs = { AI_PASSIVE, PF_INET, SOCK_DGRAM, @@ -293,11 +507,12 @@ int main(int argc, char **argv) { mem_init(); if(!setlocale(LC_CTYPE, "")) fatal(errno, "error calling setlocale"); - while((n = getopt_long(argc, argv, "hVd", options, 0)) >= 0) { + while((n = getopt_long(argc, argv, "hVdD", options, 0)) >= 0) { switch(n) { case 'h': help(); case 'V': version(); case 'd': debugging = 1; break; + case 'D': device = optarg; break; default: fatal(0, "invalid option"); } } @@ -308,7 +523,7 @@ int main(int argc, char **argv) { sl.n = argc; sl.s = argv; /* Listen for inbound audio data */ - if(!(res = get_address(&sl, &pref, &sockname))) + if(!(res = get_address(&sl, &prefs, &sockname))) exit(1); if((rtpfd = socket(res->ai_family, res->ai_socktype,