Commit d8f49af1 authored by Sergio Ammirata's avatar Sergio Ammirata
Browse files

WIP: output data is corrupt ...

parent 3b000701
......@@ -31,6 +31,7 @@
#include <vlc_block.h>
#include <vlc_rand.h>
#include <sys/time.h>
#include <vlc_queue.h>
#define RIST_CFG_PREFIX "sout-rist-"
#include "../access/rist.h"
......@@ -65,6 +66,11 @@ typedef struct
int gre_dst_port;
uint32_t i_recovery_buffer;
size_t i_max_packet_size;
size_t i_group_size;
uint8_t *p_buffer;
bool dead;
vlc_queue_t queue;
vlc_thread_t thread;
struct rist_logging_settings logging_settings;
} sout_access_out_sys_t;
......@@ -102,32 +108,94 @@ static ssize_t Write( sout_access_out_t *p_access, block_t *p_buffer )
sout_access_out_sys_t *p_sys = p_access->p_sys;
int i_len = 0;
while( p_buffer )
{
block_t *p_next;
vlc_queue_Enqueue(&p_sys->queue, block_Duplicate(p_buffer));
// Estimate and store group size based on initial input data chunk size
if (p_sys->i_group_size == 0) {
p_sys->i_group_size = __MAX(p_sys->i_max_packet_size / p_buffer->i_buffer, 1);
msg_Info(p_access, "output packet aggregation estimated to be %zu", p_sys->i_group_size);
}
i_len += p_buffer->i_buffer;
p_next = p_buffer->p_next;
block_Release( p_buffer );
p_buffer = p_next;
}
return i_len;
}
static ssize_t RistWrite( sout_access_out_t *p_access, uint8_t *p_buffer, size_t i_total_size, struct rist_data_block *rist_buffer)
{
sout_access_out_sys_t *p_sys = p_access->p_sys;
size_t i_bytes_total = i_total_size;
while( i_bytes_total )
{
size_t i_write = __MIN( i_bytes_total, p_sys->i_max_packet_size );
rist_buffer->payload = p_buffer;
rist_buffer->payload_len = i_write;
rist_sender_data_write(p_sys->sender_ctx, rist_buffer);
p_buffer += i_write;
i_bytes_total -= i_write;
}
return i_total_size;
}
static void* ThreadWrite( void *data )
{
sout_access_out_t *p_access = data;
sout_access_out_sys_t *p_sys = p_access->p_sys;
vlc_tick_t i_date_last = -1;
size_t i_total_size = 0;
int i_to_send = 1;
block_t *p_pk;
struct rist_data_block rist_buffer = { 0 };
rist_buffer.virt_src_port = p_sys->gre_src_port;
rist_buffer.virt_dst_port = p_sys->gre_dst_port;
while( p_buffer )
while ((p_pk = vlc_queue_DequeueKillable(&p_sys->queue, &p_sys->dead)) != NULL)
{
block_t *p_next;
vlc_tick_t i_date;
i_len += p_buffer->i_buffer;
i_date = p_pk->i_dts;
if( i_date_last > 0 )
{
if( (i_date - i_date_last > VLC_TICK_FROM_SEC(1) ) || (i_date - i_date_last < VLC_TICK_FROM_MS(-1)) )
{
msg_Dbg( p_access, "Timestamp discontinuity, delta is (%"PRId64") resetting output clock",
i_date - i_date_last );
//p_sys->i_group_size = 0;
i_date_last = i_date;
}
}
while( p_buffer->i_buffer )
i_to_send--;
if( !i_to_send || (p_pk->i_flags & BLOCK_FLAG_CLOCK) )
{
size_t i_write = __MIN( p_buffer->i_buffer, p_sys->i_max_packet_size );
rist_buffer.payload = p_buffer->p_buffer;
rist_buffer.payload_len = p_buffer->i_buffer;
rist_sender_data_write(p_sys->sender_ctx, &rist_buffer);
p_buffer->p_buffer += i_write;
p_buffer->i_buffer -= i_write;
vlc_tick_wait( i_date );
i_to_send = p_sys->i_group_size;
}
p_next = p_buffer->p_next;
block_Release( p_buffer );
p_buffer = p_next;
// Accumulate until we reach zero on i_to_send or max mtu would be exceeded
if (i_to_send && (i_total_size + p_pk->i_buffer) <= p_sys->i_max_packet_size) {
memcpy(p_sys->p_buffer + i_total_size, p_pk->p_buffer, p_pk->i_buffer);
i_total_size += p_pk->i_buffer;
}
else {
RistWrite( p_access, p_sys->p_buffer, i_total_size, &rist_buffer );
i_total_size = 0;
}
i_date_last = i_date;
i_date = vlc_tick_now() - i_date;
}
return i_len;
return NULL;
}
static int Control( sout_access_out_t *p_access, int i_query, va_list args )
......@@ -155,6 +223,11 @@ static void Close( vlc_object_t * p_this )
sout_access_out_t *p_access = (sout_access_out_t*)p_this;
sout_access_out_sys_t *p_sys = p_access->p_sys;
vlc_queue_Kill(&p_sys->queue, &p_sys->dead);
vlc_join( p_sys->thread, NULL );
free(p_sys->p_buffer);
rist_destroy(p_sys->sender_ctx);
p_sys->sender_ctx = NULL;
}
......@@ -190,6 +263,12 @@ static int Open( vlc_object_t *p_this )
}
p_sys->i_max_packet_size = rist_get_max_packet_size((vlc_object_t *)p_access);
p_sys->p_buffer = malloc(p_sys->i_max_packet_size);
if( unlikely( p_sys->p_buffer == NULL ) )
return VLC_ENOMEM;
p_sys->i_group_size = 0;
p_sys->dead = false;
int i_rist_profile = var_InheritInteger(p_access, RIST_CFG_PREFIX RIST_URL_PARAM_PROFILE);
int i_verbose_level = var_InheritInteger(p_access, RIST_CFG_PREFIX RIST_URL_PARAM_VERBOSE_LEVEL);
......@@ -232,6 +311,12 @@ static int Open( vlc_object_t *p_this )
goto failed;
}
vlc_queue_Init(&p_sys->queue, offsetof (block_t, p_next));
if( vlc_clone( &p_sys->thread, ThreadWrite, p_access, VLC_THREAD_PRIORITY_HIGHEST ) ) {
msg_Err( p_access, "cannot spawn rist output thread" );
goto failed;
}
p_access->pf_write = Write;
p_access->pf_control = Control;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment