Commit 547e8044 authored by Sergio Ammirata's avatar Sergio Ammirata

Add support for new connection_status callback

parent e7c75cf9
Pipeline #64653 passed with stages
in 1 minute and 4 seconds
......@@ -169,6 +169,14 @@ enum rist_stats_type
RIST_STATS_RECEIVER_FLOW
};
enum rist_connection_status
{
RIST_CONNECTION_ESTABLISHED = 0,
RIST_CONNECTION_TIMED_OUT = 1,
RIST_CLIENT_CONNECTED = 2,
RIST_CLIENT_TIMED_OUT = 3
};
struct rist_ctx;
struct rist_peer;
......
......@@ -374,6 +374,27 @@ RIST_API int rist_logging_settings_free(const struct rist_logging_settings **log
*/
RIST_API int rist_udp_config_free(const struct rist_udp_config **udp_config);
/**
* @brief Connection status callback function
*
* Optional calling application provided function for receiving connection status changes for peers.
*
* @param arg optional user data set via rist_connection_status_callback_set
* @param peer peer associated with the event
* @param rist_peer_connection_status status value
* @return void.
*/
typedef void (*connection_status_callback_t)(void *arg, struct rist_peer *peer, enum rist_connection_status peer_connection_status);
/**
* @brief Set callback for receiving connection status change events
*
* @param ctx RIST context
* @param connection_status_callback_t Callback function that will be called.
* @param arg extra arguments for callback function
*/
RIST_API int rist_connection_status_callback_set(struct rist_ctx *ctx, connection_status_callback_t, void *arg);
/**
* @brief Get the version of libRIST
*
......
......@@ -2016,40 +2016,28 @@ static void rist_recv_rtcp(struct rist_peer *peer, uint32_t seq,
peer->adv_peer_id, peer->receiver_name);
new_peer = true;
}
bool peer_authenticated = peer->authenticated;
int connection_message = 0;
if (peer->receiver_mode) {
rist_receiver_rtcp_authenticate(peer, seq, flow_id);
connection_message = RIST_CLIENT_CONNECTED;
} else if (peer->sender_ctx && peer->listening) {
// TODO: create rist_sender_recv_rtcp
if (!peer->authenticated) {
rist_peer_authenticate(peer);
}
connection_message = RIST_CLIENT_CONNECTED;
}
else if (new_peer && peer->authenticated) {
if (ctx->auth.conn_cb) {
char ip_string_buffer[INET6_ADDRSTRLEN];
uint16_t dummyport;
uint16_t port = 0;
char *ip_string =
get_ip_str(&peer->u.address, &ip_string_buffer[0], &dummyport, INET6_ADDRSTRLEN);
if (!ip_string){
ip_string = "";
}
// Real source port vs virtual source port
if (ctx->profile == RIST_PROFILE_SIMPLE)
port = peer->remote_port;
char incoming_ip_string_buffer[INET6_ADDRSTRLEN];
char *incoming_ip_string = get_ip_str(&peer->u.address, &incoming_ip_string_buffer[0], &port, INET6_ADDRSTRLEN);
if (incoming_ip_string) {
if (ctx->auth.conn_cb(ctx->auth.arg,
incoming_ip_string,
port,
ip_string,
peer->local_port,
peer)) {
return;
}
}
}
else {
connection_message = RIST_CONNECTION_ESTABLISHED;
}
if (peer->timed_out || new_peer || (!peer_authenticated && peer->authenticated)) {
if (!new_peer)
rist_log_priv(ctx, RIST_LOG_INFO, "Peer %"PRIu32" receiver with name %s reconnected\n",
peer->adv_peer_id, peer->receiver_name);
peer->timed_out = 0;
if (ctx->connection_status_callback)
ctx->connection_status_callback(ctx->connection_status_callback_argument, peer, connection_message);
}
break;
}
......@@ -2925,32 +2913,42 @@ protocol_bypass:
}
void rist_timeout_check(struct rist_common_ctx *cctx, uint64_t now)
{
struct rist_peer *peer = cctx->PEERS;
while (peer)
void rist_timeout_check(struct rist_common_ctx *cctx, uint64_t now)
{
struct rist_peer *next = peer->next;
if (!peer->dead && peer->parent && now > peer->last_rtcp_received && peer->last_rtcp_received > 0)
struct rist_peer *peer = cctx->PEERS;
while (peer)
{
if (peer->parent && (now - peer->last_rtcp_received) > peer->session_timeout &&
peer->last_rtcp_received > 0)
struct rist_peer *next = peer->next;
if (!peer->dead && now > peer->last_rtcp_received && peer->last_rtcp_received > 0)
{
rist_log_priv2(cctx->logging_settings, RIST_LOG_WARN,
"Peer %u timed out\n", peer->adv_peer_id);
kill_peer(peer);
}
} else if (peer->dead && peer->parent)
{
if ( peer->dead_since < now && (now - peer->dead_since) > 5000 * RIST_CLOCK)
if ((now - peer->last_rtcp_received) > peer->session_timeout)
{
if (peer->parent)
{
rist_log_priv2(cctx->logging_settings, RIST_LOG_WARN, "Listening peer %u timed out\n", peer->adv_peer_id);
if (cctx->connection_status_callback)
cctx->connection_status_callback(cctx->connection_status_callback_argument, peer, RIST_CLIENT_TIMED_OUT);
kill_peer(peer);
}
else if (!peer->timed_out)
{
rist_log_priv2(cctx->logging_settings, RIST_LOG_WARN, "Peer %u timed out\n", peer->adv_peer_id);
if (cctx->connection_status_callback)
cctx->connection_status_callback(cctx->connection_status_callback_argument, peer, RIST_CONNECTION_TIMED_OUT);
peer->timed_out = 1;
}
}
} else if (peer->dead && peer->parent)
{
rist_log_priv2(cctx->logging_settings, RIST_LOG_INFO, "Removing timed-out peer %u\n", peer->adv_peer_id);
rist_peer_remove(cctx, peer, NULL);
if ( peer->dead_since < now && (now - peer->dead_since) > 5000 * RIST_CLOCK)
{
rist_log_priv2(cctx->logging_settings, RIST_LOG_INFO, "Removing timed-out peer %u\n", peer->adv_peer_id);
rist_peer_remove(cctx, peer, NULL);
}
}
peer = next;
}
peer = next;
}
}
PTHREAD_START_FUNC(sender_pthread_protocol, arg)
{
......
......@@ -335,6 +335,10 @@ struct rist_common_ctx {
bool debug;
uint32_t birthtime_rtp_offset;
/* Connection status callback */
connection_status_callback_t connection_status_callback;
void *connection_status_callback_argument;
};
struct rist_receiver {
......@@ -538,6 +542,7 @@ struct rist_peer {
struct rist_peer_receiver_stats stats_receiver_total;
int dead;
int timed_out;
uint64_t dead_since;
uint64_t birthtime_peer;
uint64_t birthtime_local;
......
......@@ -219,6 +219,30 @@ int rist_receiver_data_notify_fd_set(struct rist_ctx *rist_ctx, int fd)
return 0;
}
int rist_connection_status_callback_set(struct rist_ctx *ctx, connection_status_callback_t connection_status_callback,
void *arg)
{
if (RIST_UNLIKELY(!ctx))
{
rist_log_priv3(RIST_LOG_ERROR, "ctx is null on rist_connection_status_callback_set call!\n");
return -1;
}
struct rist_common_ctx *cctx = NULL;
if (ctx->mode == RIST_RECEIVER_MODE && ctx->receiver_ctx) {
cctx = &ctx->receiver_ctx->common;
}
else if (ctx->mode == RIST_SENDER_MODE && ctx->sender_ctx) {
cctx = &ctx->sender_ctx->common;
}
else {
rist_log_priv3(RIST_LOG_ERROR, "Unknown error in rist_connection_status_callback_set call!\n");
return -1;
}
cctx->connection_status_callback = connection_status_callback;
cctx->connection_status_callback_argument = arg;
return 0;
}
int rist_receiver_data_callback_set(struct rist_ctx *rist_ctx,
int (*data_callback)(void *arg, const struct rist_data_block *data_block),
void *arg)
......
......@@ -41,6 +41,7 @@
static int signalReceived = 0;
static struct rist_logging_settings *logging_settings;
enum rist_profile profile = RIST_PROFILE_MAIN;
static int peer_connected_count = 0;
static struct option long_options[] = {
{ "inputurl", required_argument, NULL, 'i' },
......@@ -119,6 +120,17 @@ static uint32_t risttools_convertNTPtoRTP(uint64_t i_ntp)
return (uint32_t)i_ntp;
}
static void connection_status_callback(void *arg, struct rist_peer *peer, enum rist_connection_status peer_connection_status)
{
(void)arg;
if (peer_connection_status == RIST_CONNECTION_ESTABLISHED || peer_connection_status == RIST_CLIENT_CONNECTED)
peer_connected_count++;
else
peer_connected_count--;
rist_log(logging_settings, RIST_LOG_INFO,"Connection Status changed for Peer %"PRIu64", new status is %d, peer connected count is %d\n",
peer, peer_connection_status, peer_connected_count);
}
static int cb_recv(void *arg, const struct rist_data_block *b)
{
struct rist_callback_object *callback_object = (void *) arg;
......@@ -384,6 +396,11 @@ int main(int argc, char *argv[])
exit(1);
}
if (rist_connection_status_callback_set(ctx, connection_status_callback, NULL) == -1) {
rist_log(logging_settings, RIST_LOG_ERROR, "Could not initialize rist connection status callback\n");
exit(1);
}
if (profile != RIST_PROFILE_SIMPLE) {
if (rist_oob_callback_set(ctx, cb_recv_oob, ctx) == -1) {
rist_log(logging_settings, RIST_LOG_ERROR, "Could not add enable out-of-band data\n");
......
......@@ -37,7 +37,7 @@
#define MAX_OUTPUT_COUNT 10
static int signalReceived = 0;
static bool authenticated = false;
static int peer_connected_count = 0;
static struct rist_logging_settings *logging_settings;
struct rist_callback_object {
......@@ -179,7 +179,7 @@ static void input_udp_recv(struct evsocket_ctx *evctx, int fd, short revents, vo
offset = 12; // TODO: check for header extensions and remove them as well
data_block.payload = recv_buf + offset;
data_block.payload_len = recv_bufsize - offset;
if (authenticated) {
if (peer_connected_count) {
int w = rist_sender_data_write(callback_object->sender_ctx, &data_block);
// TODO: report error?
(void) w;
......@@ -208,16 +208,26 @@ static void usage(char *cmd)
exit(1);
}
static void connection_status_callback(void *arg, struct rist_peer *peer, enum rist_connection_status peer_connection_status)
{
(void)arg;
if (peer_connection_status == RIST_CONNECTION_ESTABLISHED || peer_connection_status == RIST_CLIENT_CONNECTED)
peer_connected_count++;
else
peer_connected_count--;
rist_log(logging_settings, RIST_LOG_INFO,"Connection Status changed for Peer %"PRIu64", new status is %d, peer connected count is %d\n",
peer, peer_connection_status, peer_connected_count);
}
static int cb_auth_connect(void *arg, const char* connecting_ip, uint16_t connecting_port, const char* local_ip, uint16_t local_port, struct rist_peer *peer)
{
struct rist_ctx *ctx = (struct rist_ctx *)arg;
char buffer[500];
char message[200];
authenticated = true;
int message_len = snprintf(message, 200, "auth,%s:%d,%s:%d", connecting_ip, connecting_port, local_ip, local_port);
// To be compliant with the spec, the message must have an ipv4 header
int ret = oob_build_api_payload(buffer, (char *)connecting_ip, (char *)local_ip, message, message_len);
rist_log(logging_settings, RIST_LOG_INFO,"Peer has been authenticated, sending oob/api message: %s\n", message);
rist_log(logging_settings, RIST_LOG_INFO,"Peer has been peer_connected_count, sending oob/api message: %s\n", message);
struct rist_oob_block oob_block;
oob_block.peer = peer;
oob_block.payload = buffer;
......@@ -272,6 +282,11 @@ static struct rist_peer* setup_rist_peer(struct rist_sender_args *setup)
return NULL;
}
if (rist_connection_status_callback_set(setup->ctx, connection_status_callback, NULL) == -1) {
rist_log(logging_settings, RIST_LOG_ERROR, "Could not initialize rist connection status callback\n");
return NULL;
}
if (setup->profile != RIST_PROFILE_SIMPLE) {
if (rist_oob_callback_set(setup->ctx, cb_recv_oob, setup->ctx) == -1) {
rist_log(logging_settings, RIST_LOG_ERROR, "Could not enable out-of-band data\n");
......@@ -367,7 +382,7 @@ static PTHREAD_START_FUNC(input_loop, arg)
if (queue_size % 10 == 0 || queue_size > 50)
rist_log(logging_settings, RIST_LOG_WARN, "Falling behind on rist_receiver_data_read: %d\n", queue_size);
if (b && b->payload) {
if (authenticated) {
if (peer_connected_count) {
int w = rist_sender_data_write(callback_object->sender_ctx, b);
// TODO: report error?
(void) w;
......@@ -495,7 +510,7 @@ int main(int argc, char *argv[])
}
if (profile == RIST_PROFILE_SIMPLE || faststart)
authenticated = true;
peer_connected_count = 1;
// Update log settings with custom loglevel and remote address if necessary
if (rist_logging_set(&logging_settings, loglevel, NULL, NULL, remote_log_address, stderr) != 0) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment