diff --git a/include/librist/headers.h b/include/librist/headers.h index d7fca60daebac0f46da92eca54ccfb6d3c27f7c1..3a6aef833871754756356da37fe7c345fdaf0b6f 100644 --- a/include/librist/headers.h +++ b/include/librist/headers.h @@ -186,6 +186,7 @@ struct rist_data_block /* Get's populated by librist with the rtp_seq on output, can be used on input to tell librist which rtp_seq to use */ uint64_t seq; uint32_t flags; + struct rist_ref *ref; }; struct rist_oob_block diff --git a/include/librist/librist.h b/include/librist/librist.h index 0f950c9fd34348337c5fab58265c9cb61ee96b88..63232acce58f3c881aebcdb480210e820f42c699 100755 --- a/include/librist/librist.h +++ b/include/librist/librist.h @@ -43,12 +43,27 @@ RIST_API int rist_receiver_nack_type_set(struct rist_ctx *ctx, enum rist_nack_ty * Use this API to read data from an internal fifo queue instead of the callback * * @param ctx RIST receiver context - * @param[out] data_block a pointer to the rist_data_block structure + * @param[out] reference counted data_blockstructure MUST be freed via rist_receiver_data_block_free * @param timeout How long to wait for queue data (ms), 0 for no wait * @return num buffers remaining on queue +1 (0 if no buffer returned), -1 on error */ RIST_API int rist_receiver_data_read(struct rist_ctx *ctx, const struct rist_data_block **data_block, int timeout); + +/** + * @brief Data callback function + * + * Optional calling application provided function for receiving callbacks upon data reception. + * Can be used to directly process data, or signal the calling application to read within it's own context. + * Stalling in this function will hinder data-reception of the libRIST library. + * This function will be called from a per-flow output thread and must be thread-safe. + * + * @param arg optional user data set via rist_receiver_data_callback_set + * @param data_block reference counted data_block structure MUST be freed via rist_receiver_data_block_free + * @return int, ignored. + */ +typedef int (*receiver_data_callback_t)(void *arg, const struct rist_data_block *data_block); + /** * @brief Enable data callback channel * @@ -61,20 +76,30 @@ RIST_API int rist_receiver_data_read(struct rist_ctx *ctx, const struct rist_dat * @return 0 on success, -1 on error */ RIST_API int rist_receiver_data_callback_set(struct rist_ctx *ctx, - int (*data_callback)(void *arg, const struct rist_data_block *data_block), + receiver_data_callback_t, void *arg); +/** + * @brief Free rist data block + * + * Must be called whenever a received data block is no longer needed by the calling application. + * + * @param block double pointer to rist_data_block, containing pointer will be set to NULL + */ + +RIST_API void rist_receiver_data_block_free(struct rist_data_block **const block); + /** * @brief Set data ready signalling fd * * Calling applications can provide an fd that will be written to whenever a packet * is ready for reading via FIFO read function (rist_receiver_data_read). * This allows calling applications to poll an fd (i.e.: in event loops). - * Whenever a packet is ready for reading, a byte (with undefined value) will - * be written to the FD. Calling application should make no assumptions + * Whenever a packet is ready for reading, a byte (with undefined value) will + * be written to the FD. Calling application should make no assumptions * whatsoever based on the number of bytes available for reading. * It is highly recommended that the fd is setup to operate in non blocking mode. - * A call with a 0 value fd disables the notify fd functionality. And must be + * A call with a 0 value fd disables the notify fd functionality. And must be * made before a calling application closes the fd. * @param ctx RIST receiver context * @param file_handle The file descriptor to be written to @@ -329,7 +354,7 @@ RIST_API int rist_stats_free(const struct rist_stats *stats_container); RIST_API int rist_peer_config_free(const struct rist_peer_config **peer_config); /** - * @brief Populate a preallocated peer_config structure with library default values + * @brief Populate a preallocated peer_config structure with library default values * * @return 0 on success or non-zero on error. */ diff --git a/meson.build b/meson.build index 40d72242fb3745884470290424debbb230684b11..03b50b732951e654129880cd032f053ba79caf16 100755 --- a/meson.build +++ b/meson.build @@ -188,6 +188,7 @@ librist = library('librist', 'src/logging.c', 'src/rist.c', 'src/rist-common.c', + 'src/rist_ref.c', 'src/mpegts.c', 'src/udp.c', 'src/stats.c', diff --git a/src/flow.c b/src/flow.c index 78ff881078e99c1fe15f5faed803665203c1f270..4a0523bc43d482be228c6906443684cbb3a9bdce 100755 --- a/src/flow.c +++ b/src/flow.c @@ -135,13 +135,7 @@ void rist_delete_flow(struct rist_receiver *ctx, struct rist_flow *f) { if (f->dataout_fifo_queue[i]) { - const uint8_t *payload = f->dataout_fifo_queue[i]->payload; - if (payload) { - free((void*)payload); - payload = NULL; - } - free(f->dataout_fifo_queue[i]); - f->dataout_fifo_queue[i] = NULL; + free_data_block(&f->dataout_fifo_queue[i]); } } diff --git a/src/rist-common.c b/src/rist-common.c index 613d8d13c7e584e06f17cba35c0276c647463acb..92487802ad0e929ad878b5357d4e0afd052a0323 100755 --- a/src/rist-common.c +++ b/src/rist-common.c @@ -15,6 +15,7 @@ #include "eap.h" #include "lz4.h" #include "mpegts.h" +#include "rist_ref.h" #include #include "stdio-shim.h" #include @@ -703,27 +704,54 @@ static int rist_process_nack(struct rist_flow *f, struct rist_missing_buffer *b) return 0; } +void free_data_block(struct rist_data_block **const block) +{ + assert(block != NULL); + struct rist_data_block *b = *block; + if (!b) + return; + + if (atomic_fetch_sub(&b->ref->refcnt, 1) == 1) + { + assert(b->ref->ptr == b); + uint8_t *payload = ((uint8_t*)b->payload - RIST_MAX_PAYLOAD_OFFSET);//this is extremely ugly, though these offsets will stop existing in next release + free(payload); + free((void *)b->ref); + free(b); + } + *block = NULL; +} + static struct rist_data_block *new_data_block(struct rist_data_block *output_buffer_current, struct rist_buffer *b, uint8_t *payload, uint32_t flow_id, uint32_t flags) { struct rist_data_block *output_buffer; - if (output_buffer_current) - output_buffer = output_buffer_current; + if (output_buffer_current) { + if (rist_ref_iswritable(output_buffer_current->ref)) { + output_buffer = output_buffer_current; + uint8_t *p = ((uint8_t*)output_buffer_current->payload - RIST_MAX_PAYLOAD_OFFSET); + free(p); + } else { + free_data_block(&output_buffer_current); + output_buffer = calloc(1, sizeof(*output_buffer)); + } + } else output_buffer = calloc(1, sizeof(struct rist_data_block)); - output_buffer->peer = b->peer; - output_buffer->flow_id = flow_id; - uint8_t *newbuffer; - if (output_buffer->payload && b->size != output_buffer->payload_len) { - newbuffer = realloc((void *)output_buffer->payload, b->size); - } else if (!output_buffer->payload) { - newbuffer = malloc(b->size); + if (!output_buffer) { + rist_log_priv2(get_cctx(b->peer)->logging_settings, RIST_LOG_ERROR, "Error (re)allocating rist_data_block."); + return NULL; } - else { - newbuffer = (void *)output_buffer->payload; + if (!output_buffer->ref) { + output_buffer->ref = rist_ref_create(output_buffer); + if (!output_buffer->ref) { + rist_log_priv2(get_cctx(b->peer)->logging_settings, RIST_LOG_ERROR, "Error allocating rist_ref."); + free(output_buffer); + return NULL; + } } - - memcpy(newbuffer, payload, b->size); - output_buffer->payload = newbuffer; + output_buffer->peer = b->peer; + output_buffer->flow_id = flow_id; + output_buffer->payload = payload; output_buffer->payload_len = b->size; output_buffer->virt_src_port = b->src_port; output_buffer->virt_dst_port = b->dst_port; @@ -842,7 +870,9 @@ static void receiver_output(struct rist_receiver *ctx, struct rist_flow *f) f->dataout_fifo_queue[dataout_fifo_write_index] = new_data_block( block, b, &payload[RIST_MAX_PAYLOAD_OFFSET], f->flow_id, flags); - if (ctx->receiver_data_callback) { + b->data = NULL; + if (ctx->receiver_data_callback && f->dataout_fifo_queue[dataout_fifo_write_index]) { + rist_ref_inc(f->dataout_fifo_queue[dataout_fifo_write_index]->ref); // send to callback synchronously ctx->receiver_data_callback(ctx->receiver_data_callback_argument, f->dataout_fifo_queue[dataout_fifo_write_index]); diff --git a/src/rist-private.h b/src/rist-private.h index e8a6c979e82b4b4f2ebb62dd9a2348cbe8a34bb8..6ff4c7cb2458b814867c61747f3c56107d5eb26a 100755 --- a/src/rist-private.h +++ b/src/rist-private.h @@ -343,7 +343,7 @@ struct rist_receiver { pthread_mutex_t mutex; /* Receiver data callback */ - int (*receiver_data_callback)(void *arg, const struct rist_data_block *data_block); + receiver_data_callback_t receiver_data_callback; void *receiver_data_callback_argument; int receiver_data_ready_notify_fd; @@ -591,6 +591,8 @@ RIST_PRIV void rist_shutdown_peer(struct rist_peer *peer); RIST_PRIV void rist_print_inet_info(char *prefix, struct rist_peer *peer); RIST_PRIV void rist_peer_rtcp(struct evsocket_ctx *ctx, void *arg); RIST_PRIV void rist_populate_cname(struct rist_peer *peer); +RIST_PRIV void free_data_block(struct rist_data_block **const block); + /* needed after splitting up */ RIST_PRIV PTHREAD_START_FUNC(sender_pthread_protocol, arg); RIST_PRIV PTHREAD_START_FUNC(receiver_pthread_protocol, arg); diff --git a/src/rist.c b/src/rist.c index 3ff04f5b005c99fdf05b6217a15e5c6e2ef77069..3a5ac8988918ce07b21278024fc9a6572ac76197 100644 --- a/src/rist.c +++ b/src/rist.c @@ -164,6 +164,7 @@ int rist_receiver_data_read(struct rist_ctx *rist_ctx, const struct rist_data_bl if ((size_t)atomic_load_explicit(&f->dataout_fifo_queue_write_index, memory_order_acquire) != dataout_read_index) { data_block = f->dataout_fifo_queue[dataout_read_index]; + f->dataout_fifo_queue[dataout_read_index] = NULL; num = atomic_load_explicit(&f->dataout_fifo_queue_counter, memory_order_acquire); atomic_store_explicit(&f->dataout_fifo_queue_read_index, (dataout_read_index + 1) & (RIST_DATAOUT_QUEUE_BUFFERS - 1), memory_order_release); if (data_block) @@ -186,6 +187,13 @@ int rist_receiver_data_read(struct rist_ctx *rist_ctx, const struct rist_data_bl return (int)num; } +void rist_receiver_data_block_free(struct rist_data_block **block) +{ + struct rist_data_block *b = *block; + if (b->ref != NULL) + free_data_block(block); +} + uint32_t rist_flow_id_create() { uint32_t u32 = prand_u32(); diff --git a/src/rist_ref.c b/src/rist_ref.c new file mode 100644 index 0000000000000000000000000000000000000000..59e623dc59c2b7ce32d1090136c7c3752db4b48f --- /dev/null +++ b/src/rist_ref.c @@ -0,0 +1,23 @@ +#include "rist_ref.h" +#include +#include + +struct rist_ref *rist_ref_create(void *data) +{ + struct rist_ref *ref = malloc(sizeof(*ref)); + if (!ref) + return NULL; + ref->ptr = data; + atomic_init(&ref->refcnt, 1); + return ref; +} + +void rist_ref_inc(struct rist_ref *ref) +{ + atomic_fetch_add(&ref->refcnt, 1); +} + +bool rist_ref_iswritable(struct rist_ref *ref) +{ + return atomic_load(&ref->refcnt) == 1 && ref->ptr; +} diff --git a/src/rist_ref.h b/src/rist_ref.h new file mode 100644 index 0000000000000000000000000000000000000000..3df3deec4b5dd0f4996f23a25bfcfc10c211e4e5 --- /dev/null +++ b/src/rist_ref.h @@ -0,0 +1,13 @@ +#include "common/attributes.h" +#include +#include +#include + +struct rist_ref { + atomic_int refcnt; + const void *ptr; +}; + +RIST_PRIV bool rist_ref_iswritable(struct rist_ref *ref); +RIST_PRIV struct rist_ref *rist_ref_create(void *data); +RIST_PRIV void rist_ref_inc(struct rist_ref *ref); diff --git a/test/rist/test_send_receive.c b/test/rist/test_send_receive.c index 0ada2b20164c008f110b63dcc1b31a33932e5730..36741f216c258b6e48845c1219772f949ed5121b 100644 --- a/test/rist/test_send_receive.c +++ b/test/rist/test_send_receive.c @@ -180,14 +180,13 @@ int main(int argc, char *argv[]) { goto out; } - const struct rist_data_block *b; + const struct rist_data_block *b = NULL; char rcompare[1316]; int receive_count = 1; bool got_first = false; while (receive_count < 16000) { if (atomic_load(&stop)) break; - b = NULL; int queue_length = rist_receiver_data_read(receiver_ctx, &b, 5); if (queue_length > 0) { if (!got_first) { @@ -204,6 +203,7 @@ int main(int argc, char *argv[]) { break; } receive_count++; + rist_receiver_data_block_free((struct rist_data_block **const)&b); } } if (!got_first || receive_count < 12500) diff --git a/tools/rist2rist.c b/tools/rist2rist.c index df54e2219b63b4f3f7b69b09c8c05de6c6834f17..b5eab17b5203fb7a0cf23e785862dae6fc7c38f9 100644 --- a/tools/rist2rist.c +++ b/tools/rist2rist.c @@ -244,7 +244,9 @@ static int cb_recv(void *arg, const struct rist_data_block *b) //b->virt_src_port = cb_arg->src_port; //b->virt_dst_port = cb_arg->dst_port; block->flags = RIST_DATA_FLAGS_USE_SEQ;//We only need this flag set, this way we don't have to null it beforehand. - return rist_sender_data_write(cb_arg->sender_ctx, b); + int ret = rist_sender_data_write(cb_arg->sender_ctx, b); + rist_receiver_data_block_free((struct rist_data_block **const) &b); + return ret; } static void intHandler(int signal) { diff --git a/tools/ristreceiver.c b/tools/ristreceiver.c index 88649252f90c6164d968b5c6a35de19b9527ebe7..7a3b9e34210eb5094400ababda955a4e810bac7e 100644 --- a/tools/ristreceiver.c +++ b/tools/ristreceiver.c @@ -168,7 +168,7 @@ static int cb_recv(void *arg, const struct rist_data_block *b) rist_log(logging_settings, RIST_LOG_ERROR, "Destination port mismatch, no output found for %d\n", b->virt_dst_port); return -1; } - + rist_receiver_data_block_free((struct rist_data_block **const) &b); return 0; }