Commit 04e11c7a authored by Gijs Peskens's avatar Gijs Peskens Committed by Sergio Ammirata

Make rist_data_block reference counted & add free function.

Unfortunately this is really necessary so late in the release process. The previous design being able to free data from under a calling application.
parent eae429fc
Pipeline #57701 passed with stages
in 1 minute and 4 seconds
......@@ -186,6 +186,7 @@ struct rist_data_block
/* Get's populated by librist with the rtp_seq on output, can be used on input to tell librist which rtp_seq to use */
uint64_t seq;
uint32_t flags;
struct rist_ref *ref;
};
struct rist_oob_block
......
......@@ -43,12 +43,27 @@ RIST_API int rist_receiver_nack_type_set(struct rist_ctx *ctx, enum rist_nack_ty
* Use this API to read data from an internal fifo queue instead of the callback
*
* @param ctx RIST receiver context
* @param[out] data_block a pointer to the rist_data_block structure
* @param[out] reference counted data_blockstructure MUST be freed via rist_receiver_data_block_free
* @param timeout How long to wait for queue data (ms), 0 for no wait
* @return num buffers remaining on queue +1 (0 if no buffer returned), -1 on error
*/
RIST_API int rist_receiver_data_read(struct rist_ctx *ctx, const struct rist_data_block **data_block, int timeout);
/**
* @brief Data callback function
*
* Optional calling application provided function for receiving callbacks upon data reception.
* Can be used to directly process data, or signal the calling application to read within it's own context.
* Stalling in this function will hinder data-reception of the libRIST library.
* This function will be called from a per-flow output thread and must be thread-safe.
*
* @param arg optional user data set via rist_receiver_data_callback_set
* @param data_block reference counted data_block structure MUST be freed via rist_receiver_data_block_free
* @return int, ignored.
*/
typedef int (*receiver_data_callback_t)(void *arg, const struct rist_data_block *data_block);
/**
* @brief Enable data callback channel
*
......@@ -61,20 +76,30 @@ RIST_API int rist_receiver_data_read(struct rist_ctx *ctx, const struct rist_dat
* @return 0 on success, -1 on error
*/
RIST_API int rist_receiver_data_callback_set(struct rist_ctx *ctx,
int (*data_callback)(void *arg, const struct rist_data_block *data_block),
receiver_data_callback_t,
void *arg);
/**
* @brief Free rist data block
*
* Must be called whenever a received data block is no longer needed by the calling application.
*
* @param block double pointer to rist_data_block, containing pointer will be set to NULL
*/
RIST_API void rist_receiver_data_block_free(struct rist_data_block **const block);
/**
* @brief Set data ready signalling fd
*
* Calling applications can provide an fd that will be written to whenever a packet
* is ready for reading via FIFO read function (rist_receiver_data_read).
* This allows calling applications to poll an fd (i.e.: in event loops).
* Whenever a packet is ready for reading, a byte (with undefined value) will
* be written to the FD. Calling application should make no assumptions
* Whenever a packet is ready for reading, a byte (with undefined value) will
* be written to the FD. Calling application should make no assumptions
* whatsoever based on the number of bytes available for reading.
* It is highly recommended that the fd is setup to operate in non blocking mode.
* A call with a 0 value fd disables the notify fd functionality. And must be
* A call with a 0 value fd disables the notify fd functionality. And must be
* made before a calling application closes the fd.
* @param ctx RIST receiver context
* @param file_handle The file descriptor to be written to
......@@ -329,7 +354,7 @@ RIST_API int rist_stats_free(const struct rist_stats *stats_container);
RIST_API int rist_peer_config_free(const struct rist_peer_config **peer_config);
/**
* @brief Populate a preallocated peer_config structure with library default values
* @brief Populate a preallocated peer_config structure with library default values
*
* @return 0 on success or non-zero on error.
*/
......
......@@ -188,6 +188,7 @@ librist = library('librist',
'src/logging.c',
'src/rist.c',
'src/rist-common.c',
'src/rist_ref.c',
'src/mpegts.c',
'src/udp.c',
'src/stats.c',
......
......@@ -135,13 +135,7 @@ void rist_delete_flow(struct rist_receiver *ctx, struct rist_flow *f)
{
if (f->dataout_fifo_queue[i])
{
const uint8_t *payload = f->dataout_fifo_queue[i]->payload;
if (payload) {
free((void*)payload);
payload = NULL;
}
free(f->dataout_fifo_queue[i]);
f->dataout_fifo_queue[i] = NULL;
free_data_block(&f->dataout_fifo_queue[i]);
}
}
......
......@@ -15,6 +15,7 @@
#include "eap.h"
#include "lz4.h"
#include "mpegts.h"
#include "rist_ref.h"
#include <stdbool.h>
#include "stdio-shim.h"
#include <assert.h>
......@@ -703,27 +704,54 @@ static int rist_process_nack(struct rist_flow *f, struct rist_missing_buffer *b)
return 0;
}
void free_data_block(struct rist_data_block **const block)
{
assert(block != NULL);
struct rist_data_block *b = *block;
if (!b)
return;
if (atomic_fetch_sub(&b->ref->refcnt, 1) == 1)
{
assert(b->ref->ptr == b);
uint8_t *payload = ((uint8_t*)b->payload - RIST_MAX_PAYLOAD_OFFSET);//this is extremely ugly, though these offsets will stop existing in next release
free(payload);
free((void *)b->ref);
free(b);
}
*block = NULL;
}
static struct rist_data_block *new_data_block(struct rist_data_block *output_buffer_current, struct rist_buffer *b, uint8_t *payload, uint32_t flow_id, uint32_t flags)
{
struct rist_data_block *output_buffer;
if (output_buffer_current)
output_buffer = output_buffer_current;
if (output_buffer_current) {
if (rist_ref_iswritable(output_buffer_current->ref)) {
output_buffer = output_buffer_current;
uint8_t *p = ((uint8_t*)output_buffer_current->payload - RIST_MAX_PAYLOAD_OFFSET);
free(p);
} else {
free_data_block(&output_buffer_current);
output_buffer = calloc(1, sizeof(*output_buffer));
}
}
else
output_buffer = calloc(1, sizeof(struct rist_data_block));
output_buffer->peer = b->peer;
output_buffer->flow_id = flow_id;
uint8_t *newbuffer;
if (output_buffer->payload && b->size != output_buffer->payload_len) {
newbuffer = realloc((void *)output_buffer->payload, b->size);
} else if (!output_buffer->payload) {
newbuffer = malloc(b->size);
if (!output_buffer) {
rist_log_priv2(get_cctx(b->peer)->logging_settings, RIST_LOG_ERROR, "Error (re)allocating rist_data_block.");
return NULL;
}
else {
newbuffer = (void *)output_buffer->payload;
if (!output_buffer->ref) {
output_buffer->ref = rist_ref_create(output_buffer);
if (!output_buffer->ref) {
rist_log_priv2(get_cctx(b->peer)->logging_settings, RIST_LOG_ERROR, "Error allocating rist_ref.");
free(output_buffer);
return NULL;
}
}
memcpy(newbuffer, payload, b->size);
output_buffer->payload = newbuffer;
output_buffer->peer = b->peer;
output_buffer->flow_id = flow_id;
output_buffer->payload = payload;
output_buffer->payload_len = b->size;
output_buffer->virt_src_port = b->src_port;
output_buffer->virt_dst_port = b->dst_port;
......@@ -842,7 +870,9 @@ static void receiver_output(struct rist_receiver *ctx, struct rist_flow *f)
f->dataout_fifo_queue[dataout_fifo_write_index] = new_data_block(
block, b,
&payload[RIST_MAX_PAYLOAD_OFFSET], f->flow_id, flags);
if (ctx->receiver_data_callback) {
b->data = NULL;
if (ctx->receiver_data_callback && f->dataout_fifo_queue[dataout_fifo_write_index]) {
rist_ref_inc(f->dataout_fifo_queue[dataout_fifo_write_index]->ref);
// send to callback synchronously
ctx->receiver_data_callback(ctx->receiver_data_callback_argument,
f->dataout_fifo_queue[dataout_fifo_write_index]);
......
......@@ -343,7 +343,7 @@ struct rist_receiver {
pthread_mutex_t mutex;
/* Receiver data callback */
int (*receiver_data_callback)(void *arg, const struct rist_data_block *data_block);
receiver_data_callback_t receiver_data_callback;
void *receiver_data_callback_argument;
int receiver_data_ready_notify_fd;
......@@ -591,6 +591,8 @@ RIST_PRIV void rist_shutdown_peer(struct rist_peer *peer);
RIST_PRIV void rist_print_inet_info(char *prefix, struct rist_peer *peer);
RIST_PRIV void rist_peer_rtcp(struct evsocket_ctx *ctx, void *arg);
RIST_PRIV void rist_populate_cname(struct rist_peer *peer);
RIST_PRIV void free_data_block(struct rist_data_block **const block);
/* needed after splitting up */
RIST_PRIV PTHREAD_START_FUNC(sender_pthread_protocol, arg);
RIST_PRIV PTHREAD_START_FUNC(receiver_pthread_protocol, arg);
......
......@@ -164,6 +164,7 @@ int rist_receiver_data_read(struct rist_ctx *rist_ctx, const struct rist_data_bl
if ((size_t)atomic_load_explicit(&f->dataout_fifo_queue_write_index, memory_order_acquire) != dataout_read_index)
{
data_block = f->dataout_fifo_queue[dataout_read_index];
f->dataout_fifo_queue[dataout_read_index] = NULL;
num = atomic_load_explicit(&f->dataout_fifo_queue_counter, memory_order_acquire);
atomic_store_explicit(&f->dataout_fifo_queue_read_index, (dataout_read_index + 1) & (RIST_DATAOUT_QUEUE_BUFFERS - 1), memory_order_release);
if (data_block)
......@@ -186,6 +187,13 @@ int rist_receiver_data_read(struct rist_ctx *rist_ctx, const struct rist_data_bl
return (int)num;
}
void rist_receiver_data_block_free(struct rist_data_block **block)
{
struct rist_data_block *b = *block;
if (b->ref != NULL)
free_data_block(block);
}
uint32_t rist_flow_id_create()
{
uint32_t u32 = prand_u32();
......
#include "rist_ref.h"
#include <stdlib.h>
#include <stdbool.h>
struct rist_ref *rist_ref_create(void *data)
{
struct rist_ref *ref = malloc(sizeof(*ref));
if (!ref)
return NULL;
ref->ptr = data;
atomic_init(&ref->refcnt, 1);
return ref;
}
void rist_ref_inc(struct rist_ref *ref)
{
atomic_fetch_add(&ref->refcnt, 1);
}
bool rist_ref_iswritable(struct rist_ref *ref)
{
return atomic_load(&ref->refcnt) == 1 && ref->ptr;
}
#include "common/attributes.h"
#include <stdint.h>
#include <stdatomic.h>
#include <stdbool.h>
struct rist_ref {
atomic_int refcnt;
const void *ptr;
};
RIST_PRIV bool rist_ref_iswritable(struct rist_ref *ref);
RIST_PRIV struct rist_ref *rist_ref_create(void *data);
RIST_PRIV void rist_ref_inc(struct rist_ref *ref);
......@@ -180,14 +180,13 @@ int main(int argc, char *argv[]) {
goto out;
}
const struct rist_data_block *b;
const struct rist_data_block *b = NULL;
char rcompare[1316];
int receive_count = 1;
bool got_first = false;
while (receive_count < 16000) {
if (atomic_load(&stop))
break;
b = NULL;
int queue_length = rist_receiver_data_read(receiver_ctx, &b, 5);
if (queue_length > 0) {
if (!got_first) {
......@@ -204,6 +203,7 @@ int main(int argc, char *argv[]) {
break;
}
receive_count++;
rist_receiver_data_block_free((struct rist_data_block **const)&b);
}
}
if (!got_first || receive_count < 12500)
......
......@@ -244,7 +244,9 @@ static int cb_recv(void *arg, const struct rist_data_block *b)
//b->virt_src_port = cb_arg->src_port;
//b->virt_dst_port = cb_arg->dst_port;
block->flags = RIST_DATA_FLAGS_USE_SEQ;//We only need this flag set, this way we don't have to null it beforehand.
return rist_sender_data_write(cb_arg->sender_ctx, b);
int ret = rist_sender_data_write(cb_arg->sender_ctx, b);
rist_receiver_data_block_free((struct rist_data_block **const) &b);
return ret;
}
static void intHandler(int signal) {
......
......@@ -168,7 +168,7 @@ static int cb_recv(void *arg, const struct rist_data_block *b)
rist_log(logging_settings, RIST_LOG_ERROR, "Destination port mismatch, no output found for %d\n", b->virt_dst_port);
return -1;
}
rist_receiver_data_block_free((struct rist_data_block **const) &b);
return 0;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment