Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support zero-knowledge scaling of RTPEngine #892

Open
wants to merge 51 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
8e207e0
Add RTPE owner address to be stored into the call redis storage
guss77 Dec 10, 2019
3adf514
refactor network address formatting to a standalone function so it ca…
guss77 Dec 10, 2019
9328f02
populate rtpe_connection_addr with our address during offer processing
guss77 Dec 10, 2019
5f62bfd
for foreign calls use owner rtpengine connection address for network …
guss77 Dec 11, 2019
1960acd
store rtpe call owner address per media instead of per-call
guss77 Jan 6, 2020
cac1f5f
socket_t now also models "foreign ports"
guss77 Jan 6, 2020
c08a065
update primary with endpoint new details added by alternates
guss77 Jan 7, 2020
a28662a
giving up on using the redis.c JSON parsing logic - so Im rolling my …
guss77 Jan 8, 2020
7d6f278
various fixes and missing features to new JSON model
guss77 Jan 8, 2020
8746e34
use new model for updating stream information and tags of local calls
guss77 Jan 8, 2020
59a7dfd
refactor json support routines and new parsing architecture out of re…
guss77 Jan 9, 2020
0ffccef
finish using macros for reading int fields, for slightly better reada…
guss77 Jan 9, 2020
93a1873
other_tags should be ref managed as well
guss77 Jan 9, 2020
ae6b271
added useful "get node by name" helper to grab an entire object or array
guss77 Jan 9, 2020
cc1b638
added payload updates to local call updates after an alternate added …
guss77 Jan 9, 2020
986549c
redone JSON parsing using the lower-level, but much better, JsonObjec…
guss77 Jan 19, 2020
4cd817c
more error logs
guss77 Jan 20, 2020
95f2174
fixed inverse sanity test
guss77 Jan 20, 2020
3f76b45
Always use remote server payload negotiation results if available
guss77 Jan 21, 2020
48b07ad
try to avoid "node is null" assertions
guss77 Jan 21, 2020
f5bc466
read and update media endpoint maps
guss77 Jan 21, 2020
766ced8
replace prefered codecs with those loaded from the database - assume …
guss77 Jan 21, 2020
121bc40
dont kill the codec data structure, just clear it
guss77 Jan 21, 2020
665503c
Fixed some broken copy&paste
guss77 Jan 21, 2020
7cfd630
fixed reading str from object in case there is no such member
guss77 Jan 21, 2020
f1728d1
dont try to update call if there isnt any new information
guss77 Jan 21, 2020
83af820
dont update codecs if the recieved data is the same length
guss77 Jan 21, 2020
4b25992
update media flags and ptime
guss77 Jan 21, 2020
628220e
access json_object properly to not cause asserts
guss77 Jan 22, 2020
bb7ce60
update codec handlers after we update media payloads
guss77 Jan 26, 2020
03acc60
prevent memory leaks in json_restore_call() and actually do load cryp…
guss77 Jan 28, 2020
b768bc4
add helpers to retrieve URI encoded data (likely binary) from json ob…
guss77 Jan 28, 2020
4c6c1c6
Also update local calls with new crypto params
guss77 Jan 28, 2020
16415ec
when encoding more than 1 sdes param set per direction, even if we do…
guss77 Jan 28, 2020
19a39ec
test if sdes param set exists before attempting to read it
guss77 Jan 29, 2020
e767e59
when reading unsigned from redis, use strtoul because otherwise we lo…
guss77 Jan 29, 2020
b0b0288
only try to parse endpoints if there is something to parse
guss77 Jan 29, 2020
14abc07
fixed payload type encoding/decoding
guss77 Jan 29, 2020
5392f75
use new parser to update dtls information instead of the missing impl…
guss77 Feb 11, 2020
ddf379b
store last_signal as timeval
guss77 Feb 12, 2020
cd432e9
handle foreign updates like local updates (to ignore self-updates)
guss77 Feb 12, 2020
a992290
fixed incorrect copypasta for crypto update
guss77 Feb 13, 2020
d30ff16
read stream last_packet time, dont just fake it
guss77 Feb 13, 2020
642329d
do crypto reinit after updated sdes params
guss77 Feb 13, 2020
7a21daf
cleanup addressing packet streams
guss77 Feb 16, 2020
24b0c80
Merge remote-tracking branch 'upstream/master' into feature/zero-know…
guss77 Feb 16, 2020
48abafe
remove double push that got in in one of the merges
guss77 Feb 16, 2020
4de3403
add missing forward declaration that was lost in the merge
guss77 Feb 16, 2020
5f882f9
correctly handle foreign server update of a re-invite offer on existi…
guss77 Mar 18, 2020
0977a95
Revert "correctly handle foreign server update of a re-invite offer o…
guss77 Mar 19, 2020
0db4257
Dont restart call on second offer
guss77 Mar 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion daemon/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ SRCS= main.c kernel.c poller.c aux.c control_tcp.c call.c control_udp.c redis.c
bencode.c cookie_cache.c udp_listener.c control_ng.strhash.c sdp.strhash.c stun.c rtcp.c \
crypto.c rtp.c call_interfaces.strhash.c dtls.c log.c cli.c graphite.c ice.c \
media_socket.c homer.c recording.c statistics.c cdr.c ssrc.c iptables.c tcp_listener.c \
codec.c load.c dtmf.c timerthread.c media_player.c jitter_buffer.c
codec.c load.c dtmf.c timerthread.c media_player.c jitter_buffer.c \
redis-json.c json-helpers.c
LIBSRCS= loglib.c auxlib.c rtplib.c str.c socket.c streambuf.c ssllib.c dtmflib.c
ifeq ($(with_transcoding),yes)
LIBSRCS+= codeclib.c resample.c
Expand Down
18 changes: 16 additions & 2 deletions daemon/call.c
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ static void call_timer_iterator(struct call *c, struct iterator_helper *hlp) {
}

if (c->deleted && rtpe_now.tv_sec >= c->deleted
&& c->last_signal <= c->deleted)
&& c->last_signal.tv_sec <= c->deleted)
goto delete;

if (c->ml_deleted && rtpe_now.tv_sec >= c->ml_deleted) {
Expand Down Expand Up @@ -1800,6 +1800,18 @@ static void __update_media_id(struct call_media *media, struct call_media *other
}
}

static void __update_rtpe_address(struct call_media* media, struct sdp_ng_flags *flags) {
struct packet_stream *ps;

if (media->rtpe_connection_addr.len || !media->streams.head)
return;

ps = media->streams.head->data;
media->rtpe_connection_addr.s = call_malloc(media->call, 64);
format_network_address(&media->rtpe_connection_addr, ps, flags, 0);
rlog(LOG_INFO, "Stored media address %s",media->rtpe_connection_addr.s);
}

/* called with call->master_lock held in W */
int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams,
struct sdp_ng_flags *flags)
Expand All @@ -1822,7 +1834,7 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams,
monologue = other_ml->active_dialogue;
call = monologue->call;

call->last_signal = rtpe_now.tv_sec;
call->last_signal = rtpe_now;
call->deleted = 0;

__C_DBG("this="STR_FORMAT" other="STR_FORMAT, STR_FMT(&monologue->tag), STR_FMT(&other_ml->tag));
Expand Down Expand Up @@ -2022,6 +2034,8 @@ int monologue_offer_answer(struct call_monologue *other_ml, GQueue *streams,
ice_update(media->ice_agent, NULL); /* this is in case rtcp-mux has changed */

recording_setup_media(media);
__update_rtpe_address(media, flags);
__update_rtpe_address(other_media, flags);
}

return 0;
Expand Down
30 changes: 14 additions & 16 deletions daemon/call_interfaces.c
Original file line number Diff line number Diff line change
Expand Up @@ -965,22 +965,8 @@ static const char *call_offer_answer_ng(bencode_item_t *input,
/* OP_ANSWER; OP_OFFER && !IS_FOREIGN_CALL */
call = call_get(&flags.call_id);

/* Failover scenario because of timeout on offer response: siprouter tries
* to establish session with another rtpengine2 even though rtpengine1
* might have persisted part of the session. rtpengine2 deletes previous
* call in memory and recreates an OWN call in redis */
// SDP fragments for trickle ICE must always operate on an existing call
if (opmode == OP_OFFER && !flags.fragment) {
if (call) {
if (IS_FOREIGN_CALL(call)) {
/* destroy call and create new one */
rwlock_unlock_w(&call->master_lock);
call_destroy(call);
obj_put(call);
call = call_get_or_create(&flags.call_id, CT_OWN_CALL);
}
}
else {
if (!call) {
/* call == NULL, should create call */
call = call_get_or_create(&flags.call_id, CT_OWN_CALL);
}
Expand Down Expand Up @@ -1361,7 +1347,7 @@ void ng_call_stats(struct call *call, const str *fromtag, const str *totag, benc

bencode_dictionary_add_integer(output, "created", call->created.tv_sec);
bencode_dictionary_add_integer(output, "created_us", call->created.tv_usec);
bencode_dictionary_add_integer(output, "last signal", call->last_signal);
bencode_dictionary_add_integer(output, "last signal", call->last_signal.tv_sec);
ng_stats_ssrc(bencode_dictionary_add_dictionary(output, "SSRC"), call->ssrc_hash);

tags = bencode_dictionary_add_dictionary(output, "tags");
Expand Down Expand Up @@ -1945,3 +1931,15 @@ int call_interfaces_init() {

return 0;
}


void format_network_address(str* o, struct packet_stream *ps, struct sdp_ng_flags *flags, int keep_unspec) {
if (!is_addr_unspecified(&flags->parsed_media_address))
o->len = sprintf(o->s, "%s %s",
flags->parsed_media_address.family->rfc_name,
sockaddr_print_buf(&flags->parsed_media_address));
else if (IS_FOREIGN_CALL(ps->call) && ps->media && ps->media->rtpe_connection_addr.len)
o->len = sprintf(o->s, "%s", ps->media->rtpe_connection_addr.s);
else
call_stream_address46(o->s, ps, SAF_NG, &o->len, NULL, keep_unspec);
}
2 changes: 1 addition & 1 deletion daemon/cdr.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void cdr_update_entry(struct call* c) {
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"created_from=%s, ", c->created_from);
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"last_signal=%llu, ", (unsigned long long)c->last_signal);
printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"last_signal=%llu, ", (unsigned long long)c->last_signal.tv_sec);
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
printlen = snprintf(cdrbufcur,CDRBUFREMAINDER,"tos=%u, ", (unsigned int)c->tos);
ADJUSTLEN(printlen,cdrbufend,cdrbufcur);
Expand Down
2 changes: 1 addition & 1 deletion daemon/cli.c
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ static void cli_incoming_list_callid(str *instr, struct streambuf *replybuffer)
"\ncallid: %s\ndeletionmark: %s\ncreated: %i\nproxy: %s\ntos: %u\nlast_signal: %llu\n"
"redis_keyspace: %i\nforeign: %s\n\n",
c->callid.s, c->ml_deleted ? "yes" : "no", (int) c->created.tv_sec, c->created_from,
(unsigned int) c->tos, (unsigned long long) c->last_signal, c->redis_hosted_db,
(unsigned int) c->tos, (unsigned long long) c->last_signal.tv_sec, c->redis_hosted_db,
IS_FOREIGN_CALL(c) ? "yes" : "no");

for (l = c->monologues.head; l; l = l->next) {
Expand Down
12 changes: 11 additions & 1 deletion daemon/codec.c
Original file line number Diff line number Diff line change
Expand Up @@ -1165,7 +1165,17 @@ void codec_packet_free(void *pp) {
g_slice_free1(sizeof(*p), p);
}


str *codec_print_payload_type(const struct rtp_payload_type* pt) {
return str_sprintf(
"%s/" /* encoding */
"%u/" /* clock_rate */
"%i/" /* channels */
"%i/" /* bitrate (opts) */
"%i/" /* ptime (extra_opts) */
"%s/" /* format_parameters(fmt_params) */
/* the last part must end with '/', otherwise codec_make_payload_type won't read it*/
,pt->encoding.s, pt->clock_rate, pt->channels, pt->bitrate, pt->ptime, pt->format_parameters.s);
}

struct rtp_payload_type *codec_make_payload_type(const str *codec_str, struct call_media *media) {
str codec_fmt = *codec_str;
Expand Down
167 changes: 167 additions & 0 deletions daemon/json-helpers.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
#include "json-helpers.h"

/**
* Helper method to create `str` instances from C-strings that glib uses often
* @param c C-style string where `strlen()` reports correct length.
* @return `str` string. Release using `free()`.
*/
static str *str_dup_charptr(const char *c) {
str temp = { .s = (char*)c, .len = strlen(c) };
return str_dup(&temp);
}

/**
* Helper method to test if the current node the reader is pointing to is a string value.
* Similar to `json_reader_is_value()` but will only return non-zero for values that are string values.
* @param reader JSON reader that is not in an error state
* @return non-zero if the current node is a string value and the reader is not in an error state. zero otherwise.
*/
static int json_reader_is_string(JsonReader* reader) {
JsonNode *node;

node = json_reader_get_value(reader);
if (json_node_get_value_type(node) == G_TYPE_STRING)
return 1;
return 0;
}

str *json_reader_get_str(JsonReader *reader, const char *key) {
const gchar *strval;
str *out = NULL;
json_reader_read_member(reader, key);
strval = json_reader_get_string_value(reader);
json_reader_end_member(reader);
if (strval)
out = str_dup_charptr(strval);
return out;
}

str *json_object_get_str(JsonObject* json, const char *key) {
const gchar *strval = NULL;
str *out = NULL;
if (json_object_has_member(json, key))
strval = json_object_get_string_member(json, key);
if (strval)
out = str_dup_charptr(strval);
return out;
}

str *json_object_get_str_uri_enc(JsonObject *json, const char *key) {
str *strval = NULL;
strval = json_object_get_str(json, key);
if (!strval)
return NULL;
return str_uri_decode_len(strval->s, strval->len);
}

str *json_array_get_str(JsonArray *json, unsigned idx) {
const gchar *strval;
str *out = NULL;
strval = json_array_get_string_element(json, idx);
if (strval)
out = str_dup_charptr(strval);
return out;
}

str *json_reader_get_str_element(JsonReader *reader, unsigned idx) {
const gchar *strval = NULL;
str *out = NULL;
json_reader_read_element(reader, idx);
strval = json_reader_get_string_value(reader);
json_reader_end_element(reader);
if (strval)
out = str_dup_charptr(strval);
return out;
}

long long json_reader_get_ll_element(JsonReader *reader, unsigned idx) {
str *strval;
long long out = -1;
if (json_reader_read_element(reader, idx)) {
if (json_reader_is_string(reader)) {
json_reader_end_element(reader);
strval = json_reader_get_str_element(reader, idx);
if (strval) {
out = strtoll(strval->s, NULL, 10);
free(strval);
}
} else {
out = json_reader_get_int_value(reader);
json_reader_end_element(reader);
}
} else
json_reader_end_element(reader);
return out;
}

long long json_array_get_ll(JsonArray *json, unsigned idx) {
long long out = -1;
JsonNode *member;

if (idx >= json_array_get_length(json))
return out;

member = json_array_get_element(json, idx);
if (json_node_get_value_type(member) == G_TYPE_STRING) {
str *strval = json_array_get_str(json, idx);
out = strtoll(strval->s, NULL, 10);
free(strval);
return out;
}

return json_array_get_int_element(json, idx); // returns gint64
}

str *json_reader_get_string_value_uri_enc(JsonReader *reader) {
const char *s = json_reader_get_string_value(reader);
if (!s)
return NULL;
str *out = str_uri_decode_len(s, strlen(s));
return out;
}

long long json_reader_get_ll(JsonReader *reader, const char *key) {
long long r = -1;

if (!json_reader_read_member(reader, key)) {
json_reader_end_member(reader);
return r;
}
if (json_reader_is_string(reader)) {
str *ret = json_reader_get_string_value_uri_enc(reader);
json_reader_end_member(reader);
r = strtoll(ret->s, NULL, 10);
free(ret);
return r;
}
/* not a string, lets assume integer */
r = json_reader_get_int_value(reader);
json_reader_end_member(reader);
return r;
}

long long json_object_get_ll(JsonObject *json, const char *key) {
long long r = -1;
JsonNode *member;

if (!json_object_has_member(json, key))
return r;

member = json_object_get_member(json, key);
if (json_node_get_value_type(member) == G_TYPE_STRING) {
str *ret = json_object_get_str(json, key);
r = strtoll(ret->s, NULL, 10);
free(ret);
return r;
}
/* not a string, lets assume integer */
return json_object_get_int_member(json, key); // returns gint64
}

JsonNode* json_reader_get_node(JsonReader *reader, const char *key) {
JsonNode* nodeval;
json_reader_read_member(reader, key);
nodeval = json_reader_get_value(reader);
json_reader_end_member(reader);
return nodeval;
}
22 changes: 13 additions & 9 deletions daemon/media_socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,9 @@ static void release_port(socket_t *r, struct intf_spec *spec) {

iptables_del_rule(r);

if (close_socket(r) == 0) {
if (r->is_foreign) {
__C_DBG("port %u is foreign so release is not needed");
} else if (close_socket(r) == 0) {
__C_DBG("port %u is released", port);
bit_array_clear(pp->ports_used, port);
g_atomic_int_inc(&pp->free_ports);
Expand Down Expand Up @@ -1634,7 +1636,7 @@ static int media_packet_address_check(struct packet_handler_ctx *phc)

/* wait at least 3 seconds after last signal before committing to a particular
* endpoint address */
if (!phc->mp.call->last_signal || rtpe_now.tv_sec <= phc->mp.call->last_signal + 3)
if (!phc->mp.call->last_signal.tv_sec || rtpe_now.tv_sec <= phc->mp.call->last_signal.tv_sec + 3)
goto update_peerinfo;

confirm_now:
Expand Down Expand Up @@ -2000,14 +2002,16 @@ struct stream_fd *stream_fd_new(socket_t *fd, struct call *call, const struct lo

__C_DBG("stream_fd_new localport=%d", sfd->socket.local.port);

ZERO(pi);
pi.fd = sfd->socket.fd;
pi.obj = &sfd->obj;
pi.readable = stream_fd_readable;
pi.closed = stream_fd_closed;
if (!sfd->socket.is_foreign) {
ZERO(pi);
pi.fd = sfd->socket.fd;
pi.obj = &sfd->obj;
pi.readable = stream_fd_readable;
pi.closed = stream_fd_closed;

if (poller_add_item(rtpe_poller, &pi))
ilog(LOG_ERR, "Failed to add stream_fd to poller");
if (poller_add_item(rtpe_poller, &pi))
ilog(LOG_ERR, "Failed to add stream_fd to poller");
}

return sfd;
}
Expand Down
Loading