diff --git a/.github/workflows/janus-ci.yml b/.github/workflows/janus-ci.yml index 9768d1624a..2e5e65c174 100644 --- a/.github/workflows/janus-ci.yml +++ b/.github/workflows/janus-ci.yml @@ -58,6 +58,7 @@ jobs: libopus-dev librabbitmq-dev libsofia-sip-ua-dev + libspeexdsp-dev libssl-dev libtool libvorbis-dev diff --git a/conf/janus.plugin.audiobridge.jcfg.sample b/conf/janus.plugin.audiobridge.jcfg.sample index 39d41d5a55..cd98be559d 100644 --- a/conf/janus.plugin.audiobridge.jcfg.sample +++ b/conf/janus.plugin.audiobridge.jcfg.sample @@ -10,7 +10,6 @@ # audiolevel_event = true|false (whether to emit event to other users or not, default=false) # audio_active_packets = 100 (number of packets with audio level, default=100, 2 seconds) # audio_level_average = 25 (average value of audio level, 127=muted, 0='too loud', default=25) -# default_prebuffering = number of packets to buffer before decoding each particiant (default=6) # default_expectedloss = percent of packets we expect participants may miss, to help with FEC (default=0, max=20; automatically used for forwarders too) # default_bitrate = default bitrate in bps to use for the all participants (default=0, which means libopus decides; automatically used for forwarders too) # record = true|false (whether this room should be recorded, default=false) diff --git a/configure.ac b/configure.ac index 0715f79a86..b7b9588b16 100644 --- a/configure.ac +++ b/configure.ac @@ -788,14 +788,17 @@ AC_SUBST([SOFIA_CFLAGS]) AC_SUBST([SOFIA_LIBS]) PKG_CHECK_MODULES([OPUS], - [opus], + [ + opus + speexdsp + ], [ AS_IF([test "x$enable_plugin_audiobridge" = "xmaybe"], [enable_plugin_audiobridge=yes]) ], [ AS_IF([test "x$enable_plugin_audiobridge" = "xyes"], - [AC_MSG_ERROR([libopus not found. See README.md for installation instructions or use --disable-plugin-audiobridge])]) + [AC_MSG_ERROR([libopus or libspeexdsp not found. See README.md for installation instructions or use --disable-plugin-audiobridge])]) ]) AC_SUBST([OPUS_CFLAGS]) AC_SUBST([OPUS_LIBS]) diff --git a/src/plugins/janus_audiobridge.c b/src/plugins/janus_audiobridge.c index 9bd6b1bf3a..72434659dc 100644 --- a/src/plugins/janus_audiobridge.c +++ b/src/plugins/janus_audiobridge.c @@ -38,7 +38,6 @@ room-: { audiolevel_event = true|false (whether to emit event to other users or not, default=false) audio_active_packets = 100 (number of packets with audio level, default=100, 2 seconds) audio_level_average = 25 (average value of audio level, 127=muted, 0='too loud', default=25) - default_prebuffering = number of packets to buffer before decoding each participant (default=DEFAULT_PREBUFFERING) default_expectedloss = percent of packets we expect participants may miss, to help with FEC (default=0, max=20; automatically used for forwarders too) default_bitrate = default bitrate in bps to use for the all participants (default=0, which means libopus decides; automatically used for forwarders too) record = true|false (whether this room should be recorded, default=false) @@ -141,7 +140,6 @@ room-: { "audiolevel_event" : , "audio_active_packets" : , "audio_level_average" : , - "default_prebuffering" : , "default_expectedloss" : , "default_bitrate" : , "record" : , @@ -739,7 +737,6 @@ room-: { "token" : "", "muted" : , "codec" : "", - "prebuffer" : , "bitrate" : , "quality" : <0-10, Opus-related complexity to use, the higher the value, the better the quality (but more CPU); optional, default is 4>, "expected_loss" : <0-20, a percentage of the expected loss (capped at 20%), only needed in case FEC is used; optional, default is 0 (FEC disabled even when negotiated) or the room default>, @@ -827,7 +824,6 @@ room-: { "request" : "configure", "muted" : , "display" : "", - "prebuffer" : , "bitrate" : , "quality" : , "expected_loss" : @@ -1034,6 +1030,8 @@ room-: { #ifdef HAVE_LIBOGG #include #endif +#include + #include #include #include @@ -1174,7 +1172,6 @@ static struct janus_json_parameter create_parameters[] = { {"audiolevel_event", JANUS_JSON_BOOL, 0}, {"audio_active_packets", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"audio_level_average", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, - {"default_prebuffering", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"default_expectedloss", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"default_bitrate", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"groups", JSON_ARRAY, 0} @@ -1206,7 +1203,6 @@ static struct janus_json_parameter join_parameters[] = { {"group", JSON_STRING, 0}, {"muted", JANUS_JSON_BOOL, 0}, {"codec", JSON_STRING, 0}, - {"prebuffer", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"bitrate", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"quality", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"expected_loss", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, @@ -1238,7 +1234,6 @@ static struct janus_json_parameter rtp_parameters[] = { }; static struct janus_json_parameter configure_parameters[] = { {"muted", JANUS_JSON_BOOL, 0}, - {"prebuffer", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"bitrate", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"quality", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, {"expected_loss", JSON_INTEGER, JANUS_JSON_PARAM_POSITIVE}, @@ -1329,7 +1324,6 @@ typedef struct janus_audiobridge_room { gboolean spatial_audio; /* Whether the mix will use spatial audio, using stereo */ gboolean audiolevel_ext; /* Whether the ssrc-audio-level extension must be negotiated or not for new joins */ gboolean audiolevel_event; /* Whether to emit event to other users about audiolevel */ - uint default_prebuffering; /* Number of packets to buffer before decoding each participant */ uint default_expectedloss; /* Percent of packets we expect participants may miss, to help with FEC: can be overridden per-participant */ int32_t default_bitrate; /* Default bitrate to use for all Opus streams when encoding */ int audio_active_packets; /* Amount of packets with audio level for checkup */ @@ -1559,8 +1553,6 @@ typedef struct janus_audiobridge_participant { gchar *user_id_str; /* Unique ID in the room (when using strings) */ gchar *display; /* Display name (opaque value, only meaningful to application) */ gboolean admin; /* If the participant is an admin (can't be globally muted) */ - gboolean prebuffering; /* Whether this participant needs pre-buffering of a few packets (just joined) */ - uint prebuffer_count; /* Number of packets to buffer before decoding this participant */ volatile gint active; /* Whether this participant can receive media at all */ volatile gint encoding; /* Whether this participant is currently encoding */ volatile gint decoding; /* Whether this participant is currently decoding */ @@ -1571,9 +1563,9 @@ typedef struct janus_audiobridge_participant { gboolean stereo; /* Whether stereo will be used for spatial audio */ int spatial_position; /* Panning of this participant in the mix */ /* RTP stuff */ - GList *inbuf; /* Incoming audio from this participant, as an ordered list of packets */ - GAsyncQueue *outbuf; /* Mixed audio for this participant */ - gint64 last_drop; /* When we last dropped a packet because the imcoming queue was full */ + JitterBuffer *jitter; /* Jitter buffer of incoming audio packets */ + GList *inbuf; /* Decoded audio from this participant, to feed to the mixer */ + GAsyncQueue *outbuf; /* Mixed audio to send to this participant */ janus_mutex qmutex; /* Incoming queue mutex */ int opus_pt; /* Opus payload type */ int extmap_id; /* Audio level RTP extension id, if any */ @@ -1620,6 +1612,32 @@ typedef struct janus_audiobridge_rtp_relay_packet { gboolean silence; } janus_audiobridge_rtp_relay_packet; +/* Buffered audio/video packet */ +typedef struct janus_audiobridge_buffer_packet { + /* Pointer to the packet data, if RTP */ + char *buffer; + /* Size of the packet */ + int len; + /* Whether the packet contains silence, according to the RTP extension */ + gboolean silence; + /* Monotonic insert time */ + int64_t inserted; +} janus_audiobridge_buffer_packet; +static janus_audiobridge_buffer_packet *janus_audiobridge_buffer_packet_create(char *buffer, int len, gboolean silence) { + janus_audiobridge_buffer_packet *pkt = g_malloc(sizeof(janus_audiobridge_buffer_packet)); + pkt->buffer = g_malloc(len); + pkt->len = len; + pkt->silence = silence; + memcpy(pkt->buffer, buffer, len); + pkt->inserted = janus_get_monotonic_time(); + return pkt; +} +static void janus_audiobridge_buffer_packet_destroy(janus_audiobridge_buffer_packet *pkt) { + if(!pkt) + return; + g_free(pkt->buffer); + g_free(pkt); +} static void janus_audiobridge_participant_destroy(janus_audiobridge_participant *participant) { if(!participant) @@ -1646,6 +1664,8 @@ static void janus_audiobridge_participant_free(const janus_refcount *participant opus_encoder_destroy(participant->encoder); if(participant->decoder) opus_decoder_destroy(participant->decoder); + if(participant->jitter) + jitter_buffer_destroy(participant->jitter); while(participant->inbuf) { GList *first = g_list_first(participant->inbuf); janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data; @@ -1783,26 +1803,6 @@ static guint32 janus_audiobridge_rtp_forwarder_add_helper(janus_audiobridge_room return rf->stream_id; } - -/* Helper to sort incoming RTP packets by sequence numbers */ -static gint janus_audiobridge_rtp_sort(gconstpointer a, gconstpointer b) { - janus_audiobridge_rtp_relay_packet *pkt1 = (janus_audiobridge_rtp_relay_packet *)a; - janus_audiobridge_rtp_relay_packet *pkt2 = (janus_audiobridge_rtp_relay_packet *)b; - if(pkt1->seq_number < 100 && pkt2->seq_number > 65000) { - /* Sequence number was probably reset, pkt2 is older */ - return 1; - } else if(pkt2->seq_number < 100 && pkt1->seq_number > 65000) { - /* Sequence number was probably reset, pkt1 is older */ - return -1; - } - /* Simply compare timestamps */ - if(pkt1->seq_number < pkt2->seq_number) - return -1; - else if(pkt1->seq_number > pkt2->seq_number) - return 1; - return 0; -} - /* Helper struct to generate and parse WAVE headers */ typedef struct wav_header { char riff[4]; @@ -1992,11 +1992,6 @@ static int janus_audiobridge_resample(int16_t *input, int input_num, int input_r } -/* Mixer settings */ -#define DEFAULT_PREBUFFERING 6 -#define MAX_PREBUFFERING 50 - - /* Opus settings */ #define OPUS_SAMPLES 960 #define G711_SAMPLES 160 @@ -2384,7 +2379,6 @@ int janus_audiobridge_init(janus_callbacks *callback, const char *config_path) { janus_config_item *audiolevel_event = janus_config_get(config, cat, janus_config_type_item, "audiolevel_event"); janus_config_item *audio_active_packets = janus_config_get(config, cat, janus_config_type_item, "audio_active_packets"); janus_config_item *audio_level_average = janus_config_get(config, cat, janus_config_type_item, "audio_level_average"); - janus_config_item *default_prebuffering = janus_config_get(config, cat, janus_config_type_item, "default_prebuffering"); janus_config_item *default_expectedloss = janus_config_get(config, cat, janus_config_type_item, "default_expectedloss"); janus_config_item *default_bitrate = janus_config_get(config, cat, janus_config_type_item, "default_bitrate"); janus_config_item *secret = janus_config_get(config, cat, janus_config_type_item, "secret"); @@ -2484,15 +2478,6 @@ int janus_audiobridge_init(janus_callbacks *callback, const char *config_path) { } } } - audiobridge->default_prebuffering = DEFAULT_PREBUFFERING; - if(default_prebuffering != NULL && default_prebuffering->value != NULL) { - int prebuffering = atoi(default_prebuffering->value); - if(prebuffering < 0 || prebuffering > MAX_PREBUFFERING) { - JANUS_LOG(LOG_WARN, "Invalid default_prebuffering value provided, using default: %d\n", audiobridge->default_prebuffering); - } else { - audiobridge->default_prebuffering = prebuffering; - } - } audiobridge->default_expectedloss = 0; if(default_expectedloss != NULL && default_expectedloss->value != NULL) { int expectedloss = atoi(default_expectedloss->value); @@ -2825,17 +2810,15 @@ json_t *janus_audiobridge_query_session(janus_plugin_session *handle) { json_object_set_new(info, "admin", json_true()); json_object_set_new(info, "muted", participant->muted ? json_true() : json_false()); json_object_set_new(info, "active", g_atomic_int_get(&participant->active) ? json_true() : json_false()); - json_object_set_new(info, "pre-buffering", participant->prebuffering ? json_true() : json_false()); - json_object_set_new(info, "prebuffer-count", json_integer(participant->prebuffer_count)); - if(participant->inbuf) { - janus_mutex_lock(&participant->qmutex); - json_object_set_new(info, "queue-in", json_integer(g_list_length(participant->inbuf))); - janus_mutex_unlock(&participant->qmutex); - } + janus_mutex_lock(&participant->qmutex); + spx_int32_t count = 0; + if(participant->jitter) + jitter_buffer_ctl(participant->jitter, JITTER_BUFFER_GET_AVALIABLE_COUNT, &count); + json_object_set_new(info, "buffer-in", json_integer(count)); + json_object_set_new(info, "queue-in", json_integer(g_list_length(participant->inbuf))); + janus_mutex_unlock(&participant->qmutex); if(participant->outbuf) json_object_set_new(info, "queue-out", json_integer(g_async_queue_length(participant->outbuf))); - if(participant->last_drop > 0) - json_object_set_new(info, "last-drop", json_integer(participant->last_drop)); if(participant->stereo) json_object_set_new(info, "spatial_position", json_integer(participant->spatial_position)); if(participant->arc && participant->arc->filename) @@ -2972,7 +2955,6 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s json_t *audiolevel_event = json_object_get(root, "audiolevel_event"); json_t *audio_active_packets = json_object_get(root, "audio_active_packets"); json_t *audio_level_average = json_object_get(root, "audio_level_average"); - json_t *default_prebuffering = json_object_get(root, "default_prebuffering"); json_t *default_expectedloss = json_object_get(root, "default_expectedloss"); json_t *default_bitrate = json_object_get(root, "default_bitrate"); json_t *groups = json_object_get(root, "groups"); @@ -3125,13 +3107,6 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s audiobridge->audio_level_average); } } - audiobridge->default_prebuffering = default_prebuffering ? - json_integer_value(default_prebuffering) : DEFAULT_PREBUFFERING; - if(audiobridge->default_prebuffering > MAX_PREBUFFERING) { - audiobridge->default_prebuffering = DEFAULT_PREBUFFERING; - JANUS_LOG(LOG_WARN, "Invalid default_prebuffering value provided (too high), using default: %d\n", - audiobridge->default_prebuffering); - } audiobridge->default_expectedloss = 0; if(default_expectedloss != NULL) { int expectedloss = json_integer_value(default_expectedloss); @@ -3298,10 +3273,6 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s janus_config_add(config, c, janus_config_item_create("audio_level_average", value)); } } - if(audiobridge->default_prebuffering != DEFAULT_PREBUFFERING) { - g_snprintf(value, BUFSIZ, "%d", audiobridge->default_prebuffering); - janus_config_add(config, c, janus_config_item_create("default_prebuffering", value)); - } if(audiobridge->allow_plainrtp) janus_config_add(config, c, janus_config_item_create("allow_rtp_participants", "yes")); if(audiobridge->groups) { @@ -3483,10 +3454,6 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s janus_config_add(config, c, janus_config_item_create("audio_level_average", value)); } } - if(audiobridge->default_prebuffering != DEFAULT_PREBUFFERING) { - g_snprintf(value, BUFSIZ, "%d", audiobridge->default_prebuffering); - janus_config_add(config, c, janus_config_item_create("default_prebuffering", value)); - } if(audiobridge->allow_plainrtp) janus_config_add(config, c, janus_config_item_create("allow_rtp_participants", "yes")); if(audiobridge->groups) { @@ -4087,6 +4054,8 @@ static json_t *janus_audiobridge_process_synchronous_request(janus_audiobridge_s if(participant->muted) { /* Clear the queued packets waiting to be handled */ janus_mutex_lock(&participant->qmutex); + if(participant->jitter) + jitter_buffer_reset(participant->jitter); while(participant->inbuf) { GList *first = g_list_first(participant->inbuf); janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data; @@ -5550,7 +5519,7 @@ void janus_audiobridge_incoming_rtp(janus_plugin_session *handle, janus_plugin_r } participant->reset = FALSE; } - /* Decode frame (Opus/G.711 -> slinear) */ + /* We'll need to decode the frame (Opus/G.711 -> slinear), so check the payload type */ janus_rtp_header *rtp = (janus_rtp_header *)buf; if((participant->codec == JANUS_AUDIOCODEC_PCMA && rtp->type != 8) || (participant->codec == JANUS_AUDIOCODEC_PCMU && rtp->type != 0)) { @@ -5558,43 +5527,14 @@ void janus_audiobridge_incoming_rtp(janus_plugin_session *handle, janus_plugin_r rtp->type, participant->codec == JANUS_AUDIOCODEC_PCMA ? 8 : 0); return; } - janus_audiobridge_rtp_relay_packet *pkt = g_malloc(sizeof(janus_audiobridge_rtp_relay_packet)); - pkt->data = g_malloc0(BUFFER_SAMPLES*sizeof(opus_int16)); - pkt->ssrc = 0; - pkt->timestamp = ntohl(rtp->timestamp); - pkt->seq_number = ntohs(rtp->seq_number); - /* We might check the audio level extension to see if this is silence */ - pkt->silence = FALSE; - pkt->length = 0; - - /* First check if probation period */ - if(participant->probation == MIN_SEQUENTIAL) { - participant->probation--; - participant->expected_seq = pkt->seq_number + 1; - JANUS_LOG(LOG_VERB, "Probation started with ssrc = %"SCNu32", seq = %"SCNu16" \n", ntohl(rtp->ssrc), pkt->seq_number); - g_free(pkt->data); - g_free(pkt); - return; - } else if(participant->probation != 0) { - /* Decrease probation */ - participant->probation--; - /* TODO: Reset probation if sequence number is incorrect and DSSRC also; must have a correct sequence */ - if(!participant->probation){ - /* Probation is ended */ - JANUS_LOG(LOG_VERB, "Probation ended with ssrc = %"SCNu32", seq = %"SCNu16" \n", ntohl(rtp->ssrc), pkt->seq_number); - } - participant->expected_seq = pkt->seq_number + 1; - g_free(pkt->data); - g_free(pkt); - return; - } - + /* Check the audio levels, in case we need to notify participants about who's talking */ + gboolean silence = FALSE; if(participant->extmap_id > 0) { /* Check the audio levels, in case we need to notify participants about who's talking */ int level = packet->extensions.audio_level; if(level != -1) { /* Is this silence? */ - pkt->silence = (level == 127); + silence = (level == 127); if(participant->room && participant->room->audiolevel_event) { /* We also need to detect who's talking: update our monitoring stuff */ int audio_active_packets = participant->room ? participant->room->audio_active_packets : 100; @@ -5652,188 +5592,18 @@ void janus_audiobridge_incoming_rtp(janus_plugin_session *handle, janus_plugin_r } } } - if(!g_atomic_int_compare_and_exchange(&participant->decoding, 0, 1)) { - /* This means we're cleaning up, so don't try to decode */ - g_free(pkt->data); - g_free(pkt); - return; - } - int plen = 0; - const unsigned char *payload = (const unsigned char *)janus_rtp_payload(buf, len, &plen); - if(!payload) { - g_atomic_int_set(&participant->decoding, 0); - JANUS_LOG(LOG_ERR, "[%s] Ops! got an error accessing the RTP payload\n", - participant->codec == JANUS_AUDIOCODEC_OPUS ? "Opus" : "G.711"); - g_free(pkt->data); - g_free(pkt); - return; - } - /* Check sequence number received, verify if it's relevant to the expected one */ - if(pkt->seq_number == participant->expected_seq) { - /* Regular decode */ - if(participant->codec == JANUS_AUDIOCODEC_OPUS) { - /* Opus */ - pkt->length = opus_decode(participant->decoder, payload, plen, (opus_int16 *)pkt->data, BUFFER_SAMPLES, 0); - } else if(participant->codec == JANUS_AUDIOCODEC_PCMA || participant->codec == JANUS_AUDIOCODEC_PCMU) { - /* G.711 */ - if(plen != 160) { - g_atomic_int_set(&participant->decoding, 0); - JANUS_LOG(LOG_WARN, "[G.711] Wrong packet size (expected 160, got %d), skipping audio packet\n", plen); - g_free(pkt->data); - g_free(pkt); - return; - } - int i = 0; - uint16_t *samples = (uint16_t *)pkt->data; - if(rtp->type == 0) { - /* mu-law */ - for(i=0; itype == 8) { - /* a-law */ - for(i=0; ilength = 320; - } - /* Update last_timestamp */ - participant->last_timestamp = pkt->timestamp; - /* Increment according to previous seq_number */ - participant->expected_seq = pkt->seq_number + 1; - } else if(pkt->seq_number > participant->expected_seq) { - /* Sequence(s) losts */ - uint16_t gap = pkt->seq_number - participant->expected_seq; - JANUS_LOG(LOG_HUGE, "%"SCNu16" sequence(s) lost, sequence = %"SCNu16", expected seq = %"SCNu16"\n", - gap, pkt->seq_number, participant->expected_seq); - - /* Use FEC if sequence lost < DEFAULT_PREBUFFERING (or any custom value) */ - uint16_t start_lost_seq = participant->expected_seq; - if(participant->codec == JANUS_AUDIOCODEC_OPUS && participant->fec && gap < participant->prebuffer_count) { - uint8_t i=0; - for(i=1; i<=gap ; i++) { - int32_t output_samples; - janus_audiobridge_rtp_relay_packet *lost_pkt = g_malloc(sizeof(janus_audiobridge_rtp_relay_packet)); - lost_pkt->data = g_malloc0(BUFFER_SAMPLES*sizeof(opus_int16)); - lost_pkt->ssrc = 0; - lost_pkt->timestamp = participant->last_timestamp + (i * OPUS_SAMPLES); - lost_pkt->seq_number = start_lost_seq++; - lost_pkt->silence = FALSE; - lost_pkt->length = 0; - if(i == gap) { - /* Attempt to decode with in-band FEC from next packet */ - opus_decoder_ctl(participant->decoder, OPUS_GET_LAST_PACKET_DURATION(&output_samples)); - lost_pkt->length = opus_decode(participant->decoder, payload, plen, (opus_int16 *)lost_pkt->data, output_samples, 1); - } else { - opus_decoder_ctl(participant->decoder, OPUS_GET_LAST_PACKET_DURATION(&output_samples)); - lost_pkt->length = opus_decode(participant->decoder, NULL, plen, (opus_int16 *)lost_pkt->data, output_samples, 1); - } - if(lost_pkt->length < 0) { - g_atomic_int_set(&participant->decoding, 0); - JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error decoding the Opus frame: %d (%s)\n", lost_pkt->length, opus_strerror(lost_pkt->length)); - g_free(lost_pkt->data); - g_free(lost_pkt); - g_free(pkt->data); - g_free(pkt); - return; - } - /* Enqueue the decoded frame */ - janus_mutex_lock(&participant->qmutex); - /* Insert packets sorting by sequence number */ - participant->inbuf = g_list_insert_sorted(participant->inbuf, lost_pkt, &janus_audiobridge_rtp_sort); - janus_mutex_unlock(&participant->qmutex); - } - } - /* Then go with the regular decode (no FEC) */ - if(participant->codec == JANUS_AUDIOCODEC_OPUS) { - /* Opus */ - pkt->length = opus_decode(participant->decoder, payload, plen, (opus_int16 *)pkt->data, BUFFER_SAMPLES, 0); - } else if(participant->codec == JANUS_AUDIOCODEC_PCMA || participant->codec == JANUS_AUDIOCODEC_PCMU) { - /* G.711 */ - if(plen != 160) { - g_atomic_int_set(&participant->decoding, 0); - JANUS_LOG(LOG_WARN, "[G.711] Wrong packet size (expected 160, got %d), skipping audio packet\n", plen); - g_free(pkt->data); - g_free(pkt); - return; - } - int i = 0; - uint16_t *samples = (uint16_t *)pkt->data; - if(rtp->type == 0) { - /* mu-law */ - for(i=0; itype == 8) { - /* a-law */ - for(i=0; ilength = 320; - } - /* Increment according to previous seq_number */ - participant->expected_seq = pkt->seq_number + 1; - } else { - /* In late sequence or sequence wrapped */ - g_atomic_int_set(&participant->decoding, 0); - if((participant->expected_seq - pkt->seq_number) > MAX_MISORDER){ - JANUS_LOG(LOG_HUGE, "SN WRAPPED seq = %"SCNu16", expected_seq = %"SCNu16"\n", pkt->seq_number, participant->expected_seq); - participant->expected_seq = pkt->seq_number + 1; - } else { - JANUS_LOG(LOG_WARN, "IN LATE SN seq = %"SCNu16", expected_seq = %"SCNu16"\n", pkt->seq_number, participant->expected_seq); - } - g_free(pkt->data); - g_free(pkt); - return; - } - g_atomic_int_set(&participant->decoding, 0); - if(pkt->length < 0) { - if(participant->codec == JANUS_AUDIOCODEC_OPUS) { - JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error decoding the Opus frame: %d (%s)\n", pkt->length, opus_strerror(pkt->length)); - } else { - JANUS_LOG(LOG_ERR, "[G.711] Ops! got an error decoding the audio frame\n"); - } - g_free(pkt->data); - g_free(pkt); - return; - } - /* Enqueue the decoded frame */ - janus_mutex_lock(&participant->qmutex); - /* Insert packets sorting by sequence number */ - participant->inbuf = g_list_insert_sorted(participant->inbuf, pkt, &janus_audiobridge_rtp_sort); - if(participant->prebuffering) { - /* Still pre-buffering: do we have enough packets now? */ - if(g_list_length(participant->inbuf) > participant->prebuffer_count) { - participant->prebuffering = FALSE; - JANUS_LOG(LOG_VERB, "Prebuffering done! Finally adding the user to the mix\n"); - } else { - JANUS_LOG(LOG_VERB, "Still prebuffering (got %d packets), not adding the user to the mix yet\n", g_list_length(participant->inbuf)); - } - } else { - /* Make sure we're not queueing too many packets: if so, get rid of the older ones */ - if(g_list_length(participant->inbuf) >= participant->prebuffer_count*2) { - gint64 now = janus_get_monotonic_time(); - if(now - participant->last_drop > 5*G_USEC_PER_SEC) { - JANUS_LOG(LOG_VERB, "Too many packets in queue (%d > %d), removing older ones\n", - g_list_length(participant->inbuf), participant->prebuffer_count*2); - participant->last_drop = now; - } - while(g_list_length(participant->inbuf) > participant->prebuffer_count) { - /* Remove this packet: it's too old */ - GList *first = g_list_first(participant->inbuf); - janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data; - JANUS_LOG(LOG_VERB, "List length = %d, Remove sequence = %d\n", - g_list_length(participant->inbuf), pkt->seq_number); - participant->inbuf = g_list_delete_link(participant->inbuf, first); - first = NULL; - if(pkt == NULL) - continue; - g_free(pkt->data); - pkt->data = NULL; - g_free(pkt); - pkt = NULL; - } - } + /* Queue the audio packet in the jitter buffer (we won't decode now, there might be buffering involved) */ + if(participant->jitter) { + janus_audiobridge_buffer_packet *pkt = janus_audiobridge_buffer_packet_create(buf, len, silence); + janus_mutex_lock(&participant->qmutex); + JitterBufferPacket jbp = {0}; + jbp.data = (char *)pkt; + jbp.len = 0; + jbp.timestamp = ntohl(rtp->timestamp); + jbp.span = (participant->codec == JANUS_AUDIOCODEC_OPUS ? 960 : 160); + jitter_buffer_put(participant->jitter, &jbp); + janus_mutex_unlock(&participant->qmutex); } - janus_mutex_unlock(&participant->qmutex); } } @@ -5967,7 +5737,6 @@ static void janus_audiobridge_hangup_media_internal(janus_plugin_session *handle participant->muted = TRUE; g_free(participant->display); participant->display = NULL; - participant->prebuffering = TRUE; /* Make sure we're not using the encoder/decoder right now, we're going to destroy them */ while(!g_atomic_int_compare_and_exchange(&participant->encoding, 0, 1)) g_usleep(5000); @@ -5988,6 +5757,8 @@ static void janus_audiobridge_hangup_media_internal(janus_plugin_session *handle g_free(participant->mjr_base); participant->mjr_base = NULL; /* Get rid of queued packets */ + if(participant->jitter) + jitter_buffer_reset(participant->jitter); while(participant->inbuf) { GList *first = g_list_first(participant->inbuf); janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data; @@ -6000,7 +5771,6 @@ static void janus_audiobridge_hangup_media_internal(janus_plugin_session *handle g_free(pkt); pkt = NULL; } - participant->last_drop = 0; janus_mutex_unlock(&participant->qmutex); if(audiobridge != NULL) { janus_mutex_unlock(&audiobridge->mutex); @@ -6216,7 +5986,6 @@ static void *janus_audiobridge_handler(void *data) { json_t *display = json_object_get(root, "display"); const char *display_text = display ? json_string_value(display) : NULL; json_t *muted = json_object_get(root, "muted"); - json_t *prebuffer = json_object_get(root, "prebuffer"); json_t *gain = json_object_get(root, "volume"); json_t *spatial = json_object_get(root, "spatial_position"); json_t *bitrate = json_object_get(root, "bitrate"); @@ -6228,12 +5997,6 @@ static void *janus_audiobridge_handler(void *data) { json_t *record = json_object_get(root, "record"); json_t *recfile = json_object_get(root, "filename"); json_t *gen_offer = json_object_get(root, "generate_offer"); - uint prebuffer_count = prebuffer ? json_integer_value(prebuffer) : audiobridge->default_prebuffering; - if(prebuffer_count > MAX_PREBUFFERING) { - prebuffer_count = audiobridge->default_prebuffering; - JANUS_LOG(LOG_WARN, "Invalid prebuffering value provided (too high), using room default: %d\n", - audiobridge->default_prebuffering); - } int volume = gain ? json_integer_value(gain) : 100; int spatial_position = spatial ? json_integer_value(spatial) : 50; int32_t opus_bitrate = audiobridge->default_bitrate; @@ -6330,11 +6093,11 @@ static void *janus_audiobridge_handler(void *data) { janus_refcount_init(&participant->ref, janus_audiobridge_participant_free); g_atomic_int_set(&participant->active, 0); participant->codec = codec; - participant->prebuffering = TRUE; participant->display = NULL; + participant->jitter = jitter_buffer_init(participant->codec == JANUS_AUDIOCODEC_OPUS ? 960 : 160); + jitter_buffer_ctl(participant->jitter, JITTER_BUFFER_SET_DESTROY_CALLBACK, &janus_audiobridge_buffer_packet_destroy); participant->inbuf = NULL; participant->outbuf = NULL; - participant->last_drop = 0; participant->encoder = NULL; participant->decoder = NULL; participant->reset = FALSE; @@ -6357,7 +6120,6 @@ static void *janus_audiobridge_handler(void *data) { participant->admin = admin; participant->display = display_text ? g_strdup(display_text) : NULL; participant->muted = muted ? json_is_true(muted) : FALSE; /* By default, everyone's unmuted when joining */ - participant->prebuffer_count = prebuffer_count; participant->volume_gain = volume; participant->opus_complexity = complexity; participant->opus_bitrate = opus_bitrate; @@ -6686,7 +6448,6 @@ static void *janus_audiobridge_handler(void *data) { if(error_code != 0) goto error; json_t *muted = json_object_get(root, "muted"); - json_t *prebuffer = json_object_get(root, "prebuffer"); json_t *bitrate = json_object_get(root, "bitrate"); json_t *quality = json_object_get(root, "quality"); json_t *exploss = json_object_get(root, "expected_loss"); @@ -6698,32 +6459,6 @@ static void *janus_audiobridge_handler(void *data) { json_t *group = json_object_get(root, "group"); json_t *gen_offer = json_object_get(root, "generate_offer"); json_t *update = json_object_get(root, "update"); - if(prebuffer) { - uint prebuffer_count = json_integer_value(prebuffer); - if(prebuffer_count > MAX_PREBUFFERING) { - JANUS_LOG(LOG_WARN, "Invalid prebuffering value provided (too high), keeping previous value: %d\n", - participant->prebuffer_count); - } else if(prebuffer_count != participant->prebuffer_count) { - janus_mutex_lock(&participant->qmutex); - if(prebuffer_count < participant->prebuffer_count) { - /* We're switching to a shorter prebuffer, trim the incoming buffer */ - while(g_list_length(participant->inbuf) > prebuffer_count) { - GList *first = g_list_first(participant->inbuf); - janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data; - participant->inbuf = g_list_delete_link(participant->inbuf, first); - if(pkt == NULL) - continue; - g_free(pkt->data); - g_free(pkt); - } - } else { - /* Reset the prebuffering state */ - participant->prebuffering = TRUE; - } - participant->prebuffer_count = prebuffer_count; - janus_mutex_unlock(&participant->qmutex); - } - } if(gain) participant->volume_gain = json_integer_value(gain); if(bitrate) { @@ -6779,6 +6514,8 @@ static void *janus_audiobridge_handler(void *data) { if(participant->muted) { /* Clear the queued packets waiting to be handled */ janus_mutex_lock(&participant->qmutex); + if(participant->jitter) + jitter_buffer_reset(participant->jitter); while(participant->inbuf) { GList *first = g_list_first(participant->inbuf); janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data; @@ -7114,7 +6851,6 @@ static void *janus_audiobridge_handler(void *data) { } JANUS_LOG(LOG_VERB, " -- Participant ID in new room %s: %s\n", room_id_str, user_id_str); } - participant->prebuffering = TRUE; participant->audio_active_packets = 0; participant->audio_dBov_sum = 0; participant->talking = FALSE; @@ -7398,7 +7134,8 @@ static void *janus_audiobridge_handler(void *data) { /* Get rid of queued packets */ janus_mutex_lock(&participant->qmutex); g_atomic_int_set(&participant->active, 0); - participant->prebuffering = TRUE; + if(participant->jitter) + jitter_buffer_reset(participant->jitter); while(participant->inbuf) { GList *first = g_list_first(participant->inbuf); janus_audiobridge_rtp_relay_packet *pkt = (janus_audiobridge_rtp_relay_packet *)first->data; @@ -7962,7 +7699,7 @@ static void *janus_audiobridge_mixer_thread(void *data) { while(ps) { janus_audiobridge_participant *p = (janus_audiobridge_participant *)ps->data; janus_mutex_lock(&p->qmutex); - if(g_atomic_int_get(&p->destroyed) || !p->session || !g_atomic_int_get(&p->session->started) || !g_atomic_int_get(&p->active) || p->muted || p->prebuffering || !p->inbuf) { + if(g_atomic_int_get(&p->destroyed) || !p->session || !g_atomic_int_get(&p->session->started) || !g_atomic_int_get(&p->active) || p->muted || !p->inbuf) { janus_mutex_unlock(&p->qmutex); ps = ps->next; continue; @@ -8213,7 +7950,7 @@ static void *janus_audiobridge_mixer_thread(void *data) { } janus_audiobridge_rtp_relay_packet *pkt = NULL; janus_mutex_lock(&p->qmutex); - if(g_atomic_int_get(&p->active) && !p->muted && !p->prebuffering && p->inbuf) { + if(g_atomic_int_get(&p->active) && !p->muted && p->inbuf) { GList *first = g_list_first(p->inbuf); pkt = (janus_audiobridge_rtp_relay_packet *)(first ? first->data : NULL); p->inbuf = g_list_delete_link(p->inbuf, first); @@ -8450,11 +8187,135 @@ static void *janus_audiobridge_participant_thread(void *data) { outpkt->silence = FALSE; uint8_t *payload = (uint8_t *)outpkt->data; + JitterBufferPacket jbp = {0}; + janus_audiobridge_buffer_packet *bpkt = NULL; + janus_audiobridge_rtp_relay_packet *pkt = NULL; janus_audiobridge_rtp_relay_packet *mixedpkt = NULL; + janus_rtp_header *rtp = NULL; + gint64 now = janus_get_monotonic_time(), before = now; + gboolean first = TRUE, use_fec = FALSE; + int ret = 0; - /* Start working: check the outgoing queue for packets, then encode and send them */ + /* Start working: check both the incoming queue (to decode and queue) and the outgoing one (to encode and send) */ while(!g_atomic_int_get(&stopping) && g_atomic_int_get(&session->destroyed) == 0) { - mixedpkt = g_async_queue_timeout_pop(participant->outbuf, 100000); + /* Start with packets to decode and queue for the mixer */ + now = janus_get_monotonic_time(); + janus_mutex_lock(&participant->qmutex); + gboolean locked = TRUE; + /* Start by reading packets to decode from the jitter buffer on a clock */ + if(now - before >= 18000) { + before += 20000; + if(participant->jitter) { + ret = jitter_buffer_get(participant->jitter, &jbp, participant->codec == JANUS_AUDIOCODEC_OPUS ? 960 : 160, NULL); + jitter_buffer_tick(participant->jitter); + if(ret == JITTER_BUFFER_OK) { + bpkt = (janus_audiobridge_buffer_packet *)jbp.data; + janus_mutex_unlock(&participant->qmutex); + first = FALSE; + locked = FALSE; + rtp = (janus_rtp_header *)bpkt->buffer; + /* If this is Opus, check if there's a packet gap we should fix with FEC */ + use_fec = FALSE; + if(!first && participant->codec == JANUS_AUDIOCODEC_OPUS && participant->fec) { + if(ntohs(rtp->seq_number) != participant->expected_seq) { + /* Lost a packet here? Use FEC to recover */ + use_fec = TRUE; + } + } + if(!g_atomic_int_compare_and_exchange(&participant->decoding, 0, 1)) { + /* This means we're cleaning up, so don't try to decode */ + janus_audiobridge_buffer_packet_destroy(bpkt); + break; + } + /* Access the payload */ + int plen = 0; + const unsigned char *payload = (const unsigned char *)janus_rtp_payload(bpkt->buffer, bpkt->len, &plen); + if(!payload) { + g_atomic_int_set(&participant->decoding, 0); + JANUS_LOG(LOG_ERR, "[%s] Ops! got an error accessing the RTP payload\n", + participant->codec == JANUS_AUDIOCODEC_OPUS ? "Opus" : "G.711"); + janus_audiobridge_buffer_packet_destroy(bpkt); + break; + } + if(use_fec) { + /* There was a gap, try to get decode from redundant info first */ + pkt = g_malloc(sizeof(janus_audiobridge_rtp_relay_packet)); + pkt->data = g_malloc0(BUFFER_SAMPLES*sizeof(opus_int16)); + pkt->ssrc = 0; + pkt->timestamp = participant->last_timestamp + 960; /* FIXME */ + pkt->seq_number = participant->expected_seq; /* FIXME */ + /* This is a redundant packet, so we can't parse any extension info */ + pkt->silence = FALSE; + /* Decode the lost packet using fec=1 */ + int32_t output_samples; + opus_decoder_ctl(participant->decoder, OPUS_GET_LAST_PACKET_DURATION(&output_samples)); + pkt->length = opus_decode(participant->decoder, payload, plen, (opus_int16 *)pkt->data, output_samples, 1); + /* Queue the decoded redundant packet for the mixer */ + janus_mutex_lock(&participant->qmutex); + participant->inbuf = g_list_append(participant->inbuf, pkt); + janus_mutex_unlock(&participant->qmutex); + /* Now we can process the next packet */ + } + /* Decode the packet */ + pkt = g_malloc(sizeof(janus_audiobridge_rtp_relay_packet)); + pkt->data = g_malloc0(BUFFER_SAMPLES*sizeof(opus_int16)); + pkt->ssrc = 0; + pkt->timestamp = ntohl(rtp->timestamp); + pkt->seq_number = ntohs(rtp->seq_number); + /* We might check the audio level extension to see if this is silence */ + pkt->silence = bpkt->silence; + pkt->length = 0; + if(participant->codec == JANUS_AUDIOCODEC_OPUS) { + /* Opus */ + pkt->length = opus_decode(participant->decoder, payload, plen, (opus_int16 *)pkt->data, BUFFER_SAMPLES, 0); + } else if(participant->codec == JANUS_AUDIOCODEC_PCMA || participant->codec == JANUS_AUDIOCODEC_PCMU) { + /* G.711 */ + if(plen != 160) { + g_atomic_int_set(&participant->decoding, 0); + JANUS_LOG(LOG_WARN, "[G.711] Wrong packet size (expected 160, got %d), skipping audio packet\n", plen); + janus_audiobridge_buffer_packet_destroy(bpkt); + break; + } + int i = 0; + uint16_t *samples = (uint16_t *)pkt->data; + if(rtp->type == 0) { + /* mu-law */ + for(i=0; itype == 8) { + /* a-law */ + for(i=0; ilength = 320; + } + /* Get rid of the buffered packet */ + janus_audiobridge_buffer_packet_destroy(bpkt); + /* Update the details */ + participant->last_timestamp = pkt->timestamp; + participant->expected_seq = pkt->seq_number + 1; + g_atomic_int_set(&participant->decoding, 0); + if(pkt->length < 0) { + if(participant->codec == JANUS_AUDIOCODEC_OPUS) { + JANUS_LOG(LOG_ERR, "[Opus] Ops! got an error decoding the Opus frame: %d (%s)\n", pkt->length, opus_strerror(pkt->length)); + } else { + JANUS_LOG(LOG_ERR, "[G.711] Ops! got an error decoding the audio frame\n"); + } + g_free(pkt->data); + g_free(pkt); + break; + } + /* Queue the decoded packet for the mixer */ + janus_mutex_lock(&participant->qmutex); + locked = TRUE; + participant->inbuf = g_list_append(participant->inbuf, pkt); + } + } + } + if(locked) + janus_mutex_unlock(&participant->qmutex); + /* Now check if there's packets to encode */ + mixedpkt = g_async_queue_try_pop(participant->outbuf); if(mixedpkt != NULL && g_atomic_int_get(&session->destroyed) == 0 && g_atomic_int_get(&session->started)) { if(g_atomic_int_get(&participant->active) && (participant->codec == JANUS_AUDIOCODEC_PCMA || participant->codec == JANUS_AUDIOCODEC_PCMU) && g_atomic_int_compare_and_exchange(&participant->encoding, 0, 1)) { @@ -8515,6 +8376,7 @@ static void *janus_audiobridge_participant_thread(void *data) { g_free(mixedpkt->data); g_free(mixedpkt); } + g_usleep(2500); } /* We're done, get rid of the resources */ g_free(outpkt->data);