Commit 9604b753 authored by Gijs Peskens's avatar Gijs Peskens Committed by Gijs Peskens
Browse files

Add API call to set size of data out fifo queue

parent 9cdd44c2
......@@ -37,6 +37,20 @@ RIST_API int rist_receiver_create(struct rist_ctx **ctx, enum rist_profile profi
*/
RIST_API int rist_receiver_nack_type_set(struct rist_ctx *ctx, enum rist_nack_type nacks_type);
/**
* @brief Set output fifo size
*
* Set the output fifo size to the desired maximum, can be set to 0 to disable
* desired size must be a power of 2. When enabled libRIST will output packets
* into the fifo queue for reading by the calling application.
* The fifo buffer size can only be set before starting, and defaults to 1024
*
* @param ctx RIST receiver context
* @param desired_size max number of packets to keep in fifo buffer, 0 to disable
* @return 0 for success
*/
RIST_API int rist_receiver_set_output_fifo_size(struct rist_ctx *ctx, uint32_t desired_size);
/**
* @brief Reads rist data
*
......
......@@ -139,7 +139,7 @@ void rist_delete_flow(struct rist_receiver *ctx, struct rist_flow *f)
empty_receiver_queue(f, &ctx->common);
rist_log_priv(&ctx->common, RIST_LOG_INFO, "Freeing data fifo queue\n");
for (int i = 0; i < RIST_DATAOUT_QUEUE_BUFFERS; i++)
for (size_t i = 0; i < ctx->fifo_queue_size; i++)
{
if (f->dataout_fifo_queue[i])
{
......@@ -195,6 +195,7 @@ static struct rist_flow *create_flow(struct rist_receiver *ctx, uint32_t flow_id
f->receiver_id = ctx->id;
f->stats_next_time = timestampNTP_u64();
f->max_output_jitter = ctx->common.rist_max_jitter;
f->dataout_fifo_queue = calloc(ctx->fifo_queue_size, sizeof(*f->dataout_fifo_queue));
int ret = pthread_cond_init(&f->condition, NULL);
if (ret) {
free(f);
......
......@@ -914,15 +914,15 @@ static void receiver_output(struct rist_receiver *ctx, struct rist_flow *f)
size_t dataout_fifo_write_index = atomic_load_explicit(&f->dataout_fifo_queue_write_index, memory_order_relaxed);
size_t dataout_fifo_read_index = atomic_load_explicit(&f->dataout_fifo_queue_read_index, memory_order_acquire);
uint32_t fifo_count = (dataout_fifo_write_index - dataout_fifo_read_index)&(RIST_DATAOUT_QUEUE_BUFFERS -1);
if (fifo_count +1 == RIST_DATAOUT_QUEUE_BUFFERS) {
uint32_t fifo_count = (dataout_fifo_write_index - dataout_fifo_read_index)&(ctx->fifo_queue_size -1);
if (fifo_count +1 == ctx->fifo_queue_size || !ctx->fifo_queue_size) {
if (!ctx->receiver_data_callback)
rist_log_priv(&ctx->common, RIST_LOG_ERROR, "Rist data out fifo queue overflow\n");
rist_receiver_data_block_free(&block);
} else
{
f->dataout_fifo_queue[dataout_fifo_write_index] = block;
atomic_store_explicit(&f->dataout_fifo_queue_write_index, (dataout_fifo_write_index + 1)& (RIST_DATAOUT_QUEUE_BUFFERS-1), memory_order_relaxed);
atomic_store_explicit(&f->dataout_fifo_queue_write_index, (dataout_fifo_write_index + 1)& (ctx->fifo_queue_size-1), memory_order_relaxed);
// Wake up the fifo read thread (poll)
if (ctx->receiver_data_ready_notify_fd) {
// send a data ready signal by writing a single byte of value 0
......
......@@ -246,7 +246,7 @@ struct rist_flow {
uint64_t last_recv_ts;
/* Receiver timed async data output */
struct rist_data_block *dataout_fifo_queue[RIST_DATAOUT_QUEUE_BUFFERS];
struct rist_data_block **dataout_fifo_queue;
size_t dataout_fifo_queue_bytesize;
atomic_ulong dataout_fifo_queue_read_index;
atomic_ulong dataout_fifo_queue_write_index;
......@@ -363,6 +363,7 @@ struct rist_receiver {
bool simulate_loss;
uint16_t loss_percentage;
uint32_t fifo_queue_size;
};
struct rist_sender {
......
......@@ -42,7 +42,7 @@ int rist_receiver_create(struct rist_ctx **_ctx, enum rist_profile profile,
ctx->common.logging_settings = logging_settings;
ctx->common.stats_report_time = (uint64_t)1000 * (uint64_t)RIST_CLOCK;
ctx->fifo_queue_size = RIST_DATAOUT_QUEUE_BUFFERS;
rist_log_priv(&ctx->common, RIST_LOG_INFO, "RIST Receiver Library version:%s \n", LIBRIST_VERSION);
if (logging_settings && logging_settings->log_level == RIST_LOG_SIMULATE)
......@@ -113,7 +113,7 @@ static struct rist_flow *rist_get_longest_flow(struct rist_receiver *ctx, ssize_
unsigned long reader_index = atomic_load_explicit(&f_loop->dataout_fifo_queue_read_index, memory_order_relaxed);
unsigned long write_index = atomic_load_explicit(&f_loop->dataout_fifo_queue_write_index, memory_order_acquire);
num_loop = (write_index - reader_index)&(RIST_DATAOUT_QUEUE_BUFFERS -1);
num_loop = (write_index - reader_index)&(ctx->fifo_queue_size -1);
if (num_loop > *num)
{
f = f_loop;
......@@ -168,8 +168,8 @@ int rist_receiver_data_read(struct rist_ctx *rist_ctx, const struct rist_data_bl
if (write_index != dataout_read_index)
{
do {
num = (atomic_load_explicit(&f->dataout_fifo_queue_write_index, memory_order_acquire) - dataout_read_index) &(RIST_DATAOUT_QUEUE_BUFFERS -1);
if (atomic_compare_exchange_weak(&f->dataout_fifo_queue_read_index, &dataout_read_index, (dataout_read_index +1)&(RIST_DATAOUT_QUEUE_BUFFERS -1)))
num = (atomic_load_explicit(&f->dataout_fifo_queue_write_index, memory_order_acquire) - dataout_read_index) &(ctx->fifo_queue_size -1);
if (atomic_compare_exchange_weak(&f->dataout_fifo_queue_read_index, &dataout_read_index, (dataout_read_index +1)&(ctx->fifo_queue_size -1)))
{
data_block = f->dataout_fifo_queue[dataout_read_index];
f->dataout_fifo_queue[dataout_read_index] = NULL;
......@@ -1019,3 +1019,28 @@ int rist_destroy(struct rist_ctx *ctx) {
return 0;
}
int rist_receiver_set_output_fifo_size(struct rist_ctx *ctx, uint32_t desired_size)
{
if (!ctx)
{
rist_log_priv3(RIST_LOG_ERROR, "rist_receiver_set_fifo_size called with null ctx\n");
return -1;
}
if (ctx->mode == RIST_RECEIVER_MODE || !ctx->receiver_ctx)
{
rist_log_priv3(RIST_LOG_ERROR, "rist_receiver_set_fifo_size can only be called on receiver\n");
return -2;
}
if (ctx->receiver_ctx->receiver_thread)
{
rist_log_priv2(ctx->receiver_ctx->common.logging_settings, RIST_LOG_ERROR, "rist_receiver_set_fifo_size must be called before starting\n");
return -3;
}
if ((desired_size & (desired_size -1)) != 0)
{
rist_log_priv2(ctx->receiver_ctx->common.logging_settings, RIST_LOG_ERROR, "Desired fifo size must be a power of 2\n");
return -4;
}
ctx->receiver_ctx->fifo_queue_size = desired_size;
return 0;
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment