udp.c 47.7 KB
Newer Older
1
/* librist. Copyright 2019-2020 SipRadius LLC. All right reserved.
Sergio Ammirata's avatar
Sergio Ammirata committed
2
3
4
5
6
7
8
9
10
11
 * Author: Daniele Lacamera <root@danielinux.net>
 * Author: Kuldeep Singh Dhaka <kuldeep@madresistor.com>
 * Author: Sergio Ammirata, Ph.D. <sergio@ammirata.net>
 */

#include "udp-private.h"
#include "rist-private.h"
#include "log-private.h"
#include "socket-shim.h"
#include "endian-shim.h"
Gijs Peskens's avatar
Gijs Peskens committed
12
#include "lz4.h"
13
#include "eap.h"
Gijs Peskens's avatar
Gijs Peskens committed
14
15
#include "crypto/psk.h"
#include "mpegts.h"
Sergio Ammirata's avatar
Sergio Ammirata committed
16
17
18
#include <stdlib.h>
#include <stddef.h>
#include <errno.h>
19
#include <stdint.h>
Gijs Peskens's avatar
Gijs Peskens committed
20
#include <assert.h>
Sergio Ammirata's avatar
Sergio Ammirata committed
21

22
23
24
25
26
27
28
29
uint64_t timestampNTP_u64(void)
{

	// We use clock_gettime instead of gettimeofday even though we only need microseconds
	// because gettimeofday implementation under linux is dependent on the kernel clock
	// and can produce duplicate times (too close to kernel timer)

	// We use the NTP time standard: rfc5905 (https://tools.ietf.org/html/rfc5905#section-6)
30
31
32
33
	// The 64-bit timestamps used by NTP consist of a 32-bit part for seconds
	// and a 32-bit part for fractional second, giving a time scale that rolls
	// over every 232 seconds (136 years) and a theoretical resolution of
	// 2−32 seconds (233 picoseconds). NTP uses an epoch of January 1, 1900.
34
35
	// Therefore, the first rollover occurs on February 7, 2036.
	timespec_t ts;
36
37
#if defined (__APPLE__)
	clock_gettime_osx(CLOCK_MONOTONIC_OSX, &ts);
38
39
40
41
42
43
44
45
46
47
48
49
#else
	clock_gettime(CLOCK_MONOTONIC, &ts);
#endif
	// Convert nanoseconds to 32-bits fraction (232 picosecond units)
	uint64_t t = (uint64_t)(ts.tv_nsec) << 32;
	t /= 1000000000;
	// There is 70 years (incl. 17 leap ones) offset to the Unix Epoch.
	// No leap seconds during that period since they were not invented yet.
	t |= ((70LL * 365 + 17) * 24 * 60 * 60 + ts.tv_sec) << 32;
	return t; // nanoseconds (technically, 232.831 picosecond units)
}

50
51
uint64_t timestampNTP_RTC_u64(void) {
	timespec_t ts;
52
53
#if defined (__APPLE__)
	clock_gettime_osx(CLOCK_REALTIME_OSX, &ts);
54
55
56
57
58
59
60
61
62
63
64
65
#else
	clock_gettime(CLOCK_REALTIME, &ts);
#endif
	// Convert nanoseconds to 32-bits fraction (232 picosecond units)
	uint64_t t = (uint64_t)(ts.tv_nsec) << 32;
	t /= 1000000000;
	// There is 70 years (incl. 17 leap ones) offset to the Unix Epoch.
	// No leap seconds during that period since they were not invented yet.
	t |= (70LL * 365 + 17) * 24 * 60 * 60 + ts.tv_sec;
	return t;
}

66
67
uint32_t timestampRTP_u32( int advanced, uint64_t i_ntp )
{
68
69
70
71
	if (!advanced) {
		i_ntp *= RTP_PTYPE_MPEGTS_CLOCKHZ;
		i_ntp = i_ntp >> 32;
		return (uint32_t)i_ntp;
72
73
74
75
76
77
	}
	else
	{
		// We just need the middle 32 bits, i.e. 65536Hz clock
		i_ntp = i_ntp >> 16;
		return (uint32_t)i_ntp;
78
79
80
	}
}

81
uint64_t convertRTPtoNTP(uint8_t ptype, uint32_t time_extension, uint32_t i_rtp)
82
{
83
84
	uint64_t i_ntp;
	if (ptype == RTP_PTYPE_RIST) {
85
86
87
88
89
90
		// Convert rtp to 64 bit and shift it 16 bits
		uint64_t part2 = (uint64_t)i_rtp;
		part2 = part2 << 16;
		// rebuild source_time (lower and upper 16 bits)
		uint64_t part3 = (uint64_t)(time_extension & 0xffff);
		uint64_t part1 = ((uint64_t)(time_extension & 0xffff0000)) << 32;
91
		i_ntp = part1 | part2 | part3;
92
93
		//fprintf(stderr,"source time %"PRIu64", rtp time %"PRIu32"\n", source_time, rtp_time);
	} else {
94
95
		int32_t clock = get_rtp_ts_clock(ptype);
		if (RIST_UNLIKELY(!clock)){
96
97
98
				clock = RTP_PTYPE_MPEGTS_CLOCKHZ;
				// Insert a new timestamp (not ideal but better than failing)
				i_rtp = htobe32(timestampRTP_u32(0, timestampNTP_u64()));
99
		}
100
101
		i_ntp = (uint64_t)i_rtp << 32;
		i_ntp /= clock;
102
	}
103
	return i_ntp;
104
105
}

106
107
108
109
110
111
112
113
uint64_t calculate_rtt_delay(uint64_t request, uint64_t response, uint32_t delay) {
	/* both request and response are NTP timestamps, delay is in microseconds */
	uint64_t rtt = response - request;
	if (RIST_UNLIKELY(delay))
		rtt -= (((uint64_t)delay) << 32)/1000000;
	return rtt;
}

114
void rist_clean_sender_enqueue(struct rist_sender *ctx)
Sergio Ammirata's avatar
Sergio Ammirata committed
115
116
117
118
119
{
	int delete_count = 1;

	// Delete old packets (max 10 entries per function call)
	while (delete_count++ < 10) {
120
		struct rist_buffer *b = ctx->sender_queue[ctx->sender_queue_delete_index];
Sergio Ammirata's avatar
Sergio Ammirata committed
121

Gijs Peskens's avatar
Gijs Peskens committed
122
123
124
125
		/* our buffer size is zero, it must be just building up */
		if ((size_t)atomic_load_explicit(&ctx->sender_queue_write_index, memory_order_acquire) == ctx->sender_queue_delete_index) {
			break;
		}
126

Sergio Ammirata's avatar
Sergio Ammirata committed
127
		size_t safety_counter = 0;
Gijs Peskens's avatar
Gijs Peskens committed
128
		while (!b && ((ctx->sender_queue_delete_index + 1)& (ctx->sender_queue_max -1)) != atomic_load_explicit(&ctx->sender_queue_write_index, memory_order_acquire)) {
129
			ctx->sender_queue_delete_index = (ctx->sender_queue_delete_index + 1)& (ctx->sender_queue_max -1);
Sergio Ammirata's avatar
Sergio Ammirata committed
130
			// This should never happen!
Gijs Peskens's avatar
Gijs Peskens committed
131
			rist_log_priv(&ctx->common, RIST_LOG_ERROR,
Gijs Peskens's avatar
Gijs Peskens committed
132
				"Moving delete index to %zu\n",
133
134
				ctx->sender_queue_delete_index);
			b = ctx->sender_queue[ctx->sender_queue_delete_index];
Sergio Ammirata's avatar
Sergio Ammirata committed
135
136
137
138
139
140
			if (safety_counter++ > 1000)
				return;
		}

		/* perform the deletion based on the buffer size plus twice the configured/measured avg_rtt */
		uint64_t delay = (timestampNTP_u64() - b->time) / RIST_CLOCK;
141
		if (delay < ctx->sender_recover_min_time) {
Sergio Ammirata's avatar
Sergio Ammirata committed
142
143
144
			break;
		}

Gijs Peskens's avatar
Gijs Peskens committed
145
		//rist_log_priv(&ctx->common, RIST_LOG_WARN,
Sergio Ammirata's avatar
Sergio Ammirata committed
146
		//		"\tDeleting %"PRIu32" (%zu bytes) after %"PRIu64" (%zu) ms\n",
147
		//		b->seq, b->size, delay, ctx->sender_recover_min_time);
Sergio Ammirata's avatar
Sergio Ammirata committed
148
149

		/* now delete it */
150
		ctx->sender_queue_bytesize -= b->size;
151
		free_rist_buffer(&ctx->common, b);
152
		ctx->sender_queue[ctx->sender_queue_delete_index] = NULL;
153
		ctx->sender_queue_delete_index = (ctx->sender_queue_delete_index + 1)& (ctx->sender_queue_max -1);
Sergio Ammirata's avatar
Sergio Ammirata committed
154
155
156
157
158

	}

}

159
size_t rist_send_seq_rtcp(struct rist_peer *p, uint16_t seq_rtp, uint8_t payload_type, uint8_t *payload, size_t payload_len, uint64_t source_time, uint16_t src_port, uint16_t dst_port)
Sergio Ammirata's avatar
Sergio Ammirata committed
160
161
{
	struct rist_common_ctx *ctx = get_cctx(p);
162
	struct rist_key *k = &p->key_tx;
Sergio Ammirata's avatar
Sergio Ammirata committed
163
	uint8_t *data;
164
165
	size_t len, gre_len;
	size_t hdr_len = 0;
166
	ssize_t ret = 0;
167
	uint32_t seq = p->seq++;
Gijs Peskens's avatar
Gijs Peskens committed
168
169
170
171
172
	/* Our encryption and compression operations directly modify the payload buffer we receive as a pointer
	   so we create a local pointer that points to the payload pointer, if we would either encrypt or compress we instead
	   malloc and mempcy, to ensure our source stays clean. We only do this with RAW data as these buffers are the only
	   assumed to be reused by retransmits */
	uint8_t *_payload = NULL;
173
174

	bool modifyingbuffer = (ctx->profile > RIST_PROFILE_SIMPLE
Gijs Peskens's avatar
Gijs Peskens committed
175
							&& (payload_type == RIST_PAYLOAD_TYPE_DATA_RAW || payload_type == RIST_PAYLOAD_TYPE_DATA_RAW_RTP_EXT)
Gijs Peskens's avatar
Gijs Peskens committed
176
							&& (k->key_size || p->compression));
Sergio Ammirata's avatar
Sergio Ammirata committed
177

Gijs Peskens's avatar
Gijs Peskens committed
178
179
	assert(payload != NULL);

Gijs Peskens's avatar
Gijs Peskens committed
180
181
182
183
184
185
186
187
	if (modifyingbuffer) {
		_payload = malloc(payload_len + RIST_MAX_PAYLOAD_OFFSET);
		_payload  = _payload + RIST_MAX_PAYLOAD_OFFSET;
		memcpy(_payload, payload, payload_len);
	} else {
		_payload = payload;
	}

188
	//if (p->receiver_mode)
Gijs Peskens's avatar
Gijs Peskens committed
189
	//	rist_log_priv(&ctx->common, RIST_LOG_ERROR, "Sending seq %"PRIu32" and rtp_seq %"PRIu16" payload is %d\n",
Sergio Ammirata's avatar
Sergio Ammirata committed
190
191
	//		seq, seq_rtp, payload_type);
	//else
Gijs Peskens's avatar
Gijs Peskens committed
192
	//	rist_log_priv(&ctx->common, RIST_LOG_ERROR, "Sending seq %"PRIu32" and idx is %zu/%zu/%zu (read/write/delete) and payload is %d\n",
193
194
	//		seq, p->sender_ctx->sender_queue_read_index,
	//		p->sender_ctx->sender_queue_write_index,
195
	//		p->sender_ctx->sender_queue_delete_index,
Sergio Ammirata's avatar
Sergio Ammirata committed
196
197
198
	//		payload_type);

	// TODO: write directly on the payload to make it faster
Anders Cedronius's avatar
Anders Cedronius committed
199
	uint8_t header_buf[RIST_MAX_HEADER_SIZE] = {0};
Sergio Ammirata's avatar
Sergio Ammirata committed
200
	if (k->key_size) {
201
		gre_len = sizeof(struct rist_gre_key_seq_real);
Sergio Ammirata's avatar
Sergio Ammirata committed
202
	} else {
203
		gre_len = sizeof(struct rist_gre_hdr);
Sergio Ammirata's avatar
Sergio Ammirata committed
204
205
	}

206
207
208
209
210
211
212
213
214
	uint16_t proto_type;
	if (RIST_UNLIKELY(payload_type == RIST_PAYLOAD_TYPE_DATA_OOB)) {
		proto_type = RIST_GRE_PROTOCOL_TYPE_FULL;
	} else {
		proto_type = RIST_GRE_PROTOCOL_TYPE_REDUCED;
		struct rist_protocol_hdr *hdr = (void *) (header_buf + gre_len);
		hdr->src_port = htobe16(src_port);
		hdr->dst_port = htobe16(dst_port);
		if (payload_type == RIST_PAYLOAD_TYPE_RTCP || payload_type == RIST_PAYLOAD_TYPE_RTCP_NACK)
Sergio Ammirata's avatar
Sergio Ammirata committed
215
		{
216
			hdr_len = RIST_GRE_PROTOCOL_REDUCED_SIZE;
Sergio Ammirata's avatar
Sergio Ammirata committed
217
		}
218
219
220
221
222
		else
		{
			hdr_len = sizeof(*hdr);
			// RTP header for data packets
			hdr->rtp.flags = RTP_MPEGTS_FLAGS;
Gijs Peskens's avatar
Gijs Peskens committed
223
224
			if (payload_type == RIST_PAYLOAD_TYPE_DATA_RAW_RTP_EXT)
				SET_BIT(hdr->rtp.flags, 4);
225
			hdr->rtp.ssrc = htobe32(p->adv_flow_id);
226
			hdr->rtp.seq = htobe16(seq_rtp);
227
			if ((seq_rtp + 1) != ctx->seq_rtp)
228
			{
229
				// This is a retransmission
Gijs Peskens's avatar
Gijs Peskens committed
230
				//rist_log_priv(&ctx->common, RIST_LOG_ERROR, "\tResending: %"PRIu32"/%"PRIu16"/%"PRIu32"\n", seq, seq_rtp, ctx->seq);
231
232
				/* Mark SSID for retransmission (change the last bit of the ssrc to 1) */
				//hdr->rtp.ssrc |= (1 << 31);
Gijs Peskens's avatar
Gijs Peskens committed
233
				hdr->rtp.ssrc = htobe32(p->adv_flow_id | 0x01);
234
			}
235
236
			hdr->rtp.payload_type = RTP_PTYPE_MPEGTS;
			hdr->rtp.ts = htobe32(timestampRTP_u32(0, source_time));
237
238
		}
		// copy the rtp header data (needed for encryption)
Gijs Peskens's avatar
Gijs Peskens committed
239
		memcpy(_payload - hdr_len, hdr, hdr_len);
Sergio Ammirata's avatar
Sergio Ammirata committed
240
241
	}

242
	if (ctx->profile > RIST_PROFILE_SIMPLE) {
Sergio Ammirata's avatar
Sergio Ammirata committed
243
244
245
		/* Encrypt everything except GRE */
		if (k->key_size) {
			// Prepare GRE header
246
			struct rist_gre_key_seq_real *gre_key_seq = (void *) header_buf;
Sergio Ammirata's avatar
Sergio Ammirata committed
247
248
			SET_BIT(gre_key_seq->flags1, 5); // set key flag
			SET_BIT(gre_key_seq->flags1, 4); // set seq bit
249

250
			gre_key_seq->prot_type = htobe16(proto_type);
Sergio Ammirata's avatar
Sergio Ammirata committed
251
252
			gre_key_seq->seq = htobe32(seq);

Gijs Peskens's avatar
Gijs Peskens committed
253
254
			_librist_crypto_psk_encrypt(&p->key_tx, gre_key_seq->seq, (unsigned char *)(_payload - hdr_len), (unsigned char *)(_payload - hdr_len), (hdr_len + payload_len));
			gre_key_seq->nonce = k->gre_nonce;
Sergio Ammirata's avatar
Sergio Ammirata committed
255
		} else {
256
			struct rist_gre_hdr *gre_seq = (struct rist_gre_hdr *) header_buf;
257
			gre_seq->prot_type = htobe16(proto_type);
Sergio Ammirata's avatar
Sergio Ammirata committed
258
259
260
261
		}

		// now copy the GRE header data
		len = gre_len + hdr_len + payload_len;
Gijs Peskens's avatar
Gijs Peskens committed
262
		data = _payload - gre_len - hdr_len;
Sergio Ammirata's avatar
Sergio Ammirata committed
263
264
265
266
267
		memcpy(data, header_buf, gre_len);
	}
	else
	{
		len =  hdr_len + payload_len - RIST_GRE_PROTOCOL_REDUCED_SIZE;
Gijs Peskens's avatar
Gijs Peskens committed
268
		data = _payload - hdr_len + RIST_GRE_PROTOCOL_REDUCED_SIZE;
Sergio Ammirata's avatar
Sergio Ammirata committed
269
270
	}

Gijs Peskens's avatar
Gijs Peskens committed
271

272
	// TODO: compare p->sender_ctx->sender_queue_read_index and p->sender_ctx->sender_queue_write_index
Sergio Ammirata's avatar
Sergio Ammirata committed
273
274
275
	// and warn when the difference is a multiple of 10 (slow CPU or overtaxed algortihm)
	// The difference should always stay very low < 10

276
277
278
279
280
281
282
283
	if (RIST_UNLIKELY((p->sender_ctx && p->sender_ctx->simulate_loss) || (p->receiver_ctx && p->receiver_ctx->simulate_loss))) {
		uint16_t loss_percentage = p->sender_ctx? p->sender_ctx->loss_percentage : p->receiver_ctx->loss_percentage;
		/* very crude calculation to see if we "randomly" drop packets, good enough for testing */
		uint16_t compare = rand() % 1001;
		if (compare <= loss_percentage) {
			ret = len;
			goto out;
		}
284
285
	}

286
	ret = sendto(p->sd,(const char*)data, len, 0, &(p->u.address), p->address_len);
287
288
289

out:
	if (RIST_UNLIKELY(ret <= 0)) {
290
		rist_log_priv(ctx, RIST_LOG_ERROR, "\tSend failed: errno=%d, ret=%d, socket=%d\n", errno, ret, p->sd);
Sergio Ammirata's avatar
Sergio Ammirata committed
291
	} else {
292
		p->stats_sender_instant.sent++;
Gijs Peskens's avatar
Gijs Peskens committed
293
		p->stats_receiver_instant.sent_rtcp++;
Sergio Ammirata's avatar
Sergio Ammirata committed
294
295
	}

Gijs Peskens's avatar
Gijs Peskens committed
296
297
298
299
	if (modifyingbuffer) {
		free(_payload - RIST_MAX_PAYLOAD_OFFSET);
	}

Sergio Ammirata's avatar
Sergio Ammirata committed
300
301
302
	return ret;
}

303
/* This function is used by receiver for all and by sender only for rist-data and oob-data */
304
int rist_send_common_rtcp(struct rist_peer *p, uint8_t payload_type, uint8_t *payload, size_t payload_len, uint64_t source_time, uint16_t src_port, uint16_t dst_port, uint32_t seq_rtp)
Sergio Ammirata's avatar
Sergio Ammirata committed
305
{
306
	// This can only and will most likely be zero for data packets. RTCP should always have a value.
Gijs Peskens's avatar
Gijs Peskens committed
307
	assert(payload_type != RIST_PAYLOAD_TYPE_DATA_RAW && payload_type != RIST_PAYLOAD_TYPE_DATA_RAW_RTP_EXT && payload_type != RIST_PAYLOAD_TYPE_DATA_OOB ? dst_port != 0 : 1);
308
309
	if (dst_port == 0)
		dst_port = p->config.virt_dst_port;
310
311
	if (src_port == 0)
		src_port = 32768 + p->adv_peer_id;
312

Sergio Ammirata's avatar
Sergio Ammirata committed
313
	if (p->sd < 0 || !p->address_len) {
Gijs Peskens's avatar
Gijs Peskens committed
314
		rist_log_priv(get_cctx(p), RIST_LOG_ERROR, "rist_send_common_rtcp failed\n");
315
		return -1;
Sergio Ammirata's avatar
Sergio Ammirata committed
316
317
	}

318
319
320
	if (RIST_UNLIKELY(p->config.timing_mode == RIST_TIMING_MODE_ARRIVAL) && !p->receiver_mode)
		source_time = timestampNTP_u64();

321
	size_t ret = rist_send_seq_rtcp(p, (uint16_t)seq_rtp, payload_type, payload, payload_len, source_time, src_port, dst_port);
Sergio Ammirata's avatar
Sergio Ammirata committed
322

323
	if ((!p->compression && ret < payload_len) || ret <= 0)
Sergio Ammirata's avatar
Sergio Ammirata committed
324
325
326
	{
		if (p->address_family == AF_INET6) {
			// TODO: print IP and port (and error number?)
Gijs Peskens's avatar
Gijs Peskens committed
327
			rist_log_priv(get_cctx(p), RIST_LOG_ERROR,
328
				"\tError on transmission sendto for seq #%"PRIu32"\n", seq_rtp);
Sergio Ammirata's avatar
Sergio Ammirata committed
329
330
331
		} else {
			struct sockaddr_in *sin4 = (struct sockaddr_in *)&p->u.address;
			unsigned char *ip = (unsigned char *)&sin4->sin_addr.s_addr;
Gijs Peskens's avatar
Gijs Peskens committed
332
			rist_log_priv(get_cctx(p), RIST_LOG_ERROR,
Sergio Ammirata's avatar
Sergio Ammirata committed
333
334
				"\tError on transmission sendto, ret=%d to %d.%d.%d.%d:%d/%d, seq #%"PRIu32", %d bytes\n",
					ret, ip[0], ip[1], ip[2], ip[3], htons(sin4->sin_port),
335
					p->local_port, seq_rtp, payload_len);
Sergio Ammirata's avatar
Sergio Ammirata committed
336
337
		}
	}
338
339
340
341
342
	else
	{
		// update bandwidth value
		rist_calculate_bitrate(ret, &p->bw);
	}
Sergio Ammirata's avatar
Sergio Ammirata committed
343

344
345
346
347
348
349
350
351
	// TODO:
	// This should return something meaningful, however ret is always >= 0 by virtue of being unsigned.
	/*if (ret >= 0)
	 *	return 0;
	 * else
	 *	return -1;
	 */
	return 0;
Sergio Ammirata's avatar
Sergio Ammirata committed
352
353
354
355
}

int rist_set_url(struct rist_peer *peer)
{
356
357
358
	char host[512];
	uint16_t port;
	int local;
Sergio Ammirata's avatar
Sergio Ammirata committed
359
360
	if (!peer->url) {
		if (peer->local_port > 0) {
361
			/* Put sender in IPv4 learning mode */
Sergio Ammirata's avatar
Sergio Ammirata committed
362
363
364
			peer->address_family = AF_INET;
			peer->address_len = sizeof(struct sockaddr_in);
			memset(&peer->u.address, 0, sizeof(struct sockaddr_in));
Gijs Peskens's avatar
Gijs Peskens committed
365
			rist_log_priv(get_cctx(peer), RIST_LOG_INFO,
Gijs Peskens's avatar
Gijs Peskens committed
366
					"Sender: in learning mode\n");
Sergio Ammirata's avatar
Sergio Ammirata committed
367
368
369
		}
		return 1;
	}
370
	if (udpsocket_parse_url(peer->url, host, 512, &port, &local) != 0) {
Gijs Peskens's avatar
Gijs Peskens committed
371
		rist_log_priv(get_cctx(peer), RIST_LOG_ERROR, "%s / %s\n", strerror(errno), peer->url);
Sergio Ammirata's avatar
Sergio Ammirata committed
372
373
		return -1;
	} else {
Gijs Peskens's avatar
Gijs Peskens committed
374
		rist_log_priv(get_cctx(peer), RIST_LOG_INFO, "URL parsed successfully: Host %s, Port %hu\n",
375
				(char *) host, port);
Sergio Ammirata's avatar
Sergio Ammirata committed
376
	}
377
	if (udpsocket_resolve_host(host, port, &peer->u.address) < 0) {
Gijs Peskens's avatar
Gijs Peskens committed
378
		rist_log_priv(get_cctx(peer), RIST_LOG_ERROR, "Host %s cannot be resolved\n",
379
380
				(char *) host);
		return -1;
381
	}
382
383
	if (peer->u.inaddr6.sin6_family == AF_INET6) {
		peer->address_family = AF_INET6;
Sergio Ammirata's avatar
Sergio Ammirata committed
384
		peer->address_len = sizeof(struct sockaddr_in6);
385
386
387
	} else {
		peer->address_family = AF_INET;
		peer->address_len = sizeof(struct sockaddr_in);
Sergio Ammirata's avatar
Sergio Ammirata committed
388
	}
389
390
391
392
393
394
	if (local) {
		peer->listening = 1;
		peer->local_port = port;
	} else {
		peer->listening = 0;
		peer->remote_port = port;
395
	}
Sergio Ammirata's avatar
Sergio Ammirata committed
396
	if (peer->address_family == AF_INET) {
397
398
399
		peer->u.inaddr.sin_port = htons(port);
	} else {
		peer->u.inaddr6.sin6_port = htons(port);
Sergio Ammirata's avatar
Sergio Ammirata committed
400
401
402
403
	}
	return 0;
}

404
void rist_populate_cname(struct rist_peer *peer)
Sergio Ammirata's avatar
Sergio Ammirata committed
405
{
406
407
408
409
410
411
412
413
	int fd = peer->sd;
	char *identifier = peer->cname;
	struct rist_common_ctx *ctx = get_cctx(peer);
	if (strlen((char *)ctx->cname) != 0)
	{
		strncpy(identifier, (char * )ctx->cname, RIST_MAX_HOSTNAME);
		return;
	}
Sergio Ammirata's avatar
Sergio Ammirata committed
414
415
	/* Set the CNAME Identifier as host@ip:port and fallback to hostname if needed */
	char hostname[RIST_MAX_HOSTNAME];
416
	struct sockaddr_storage peer_sockaddr;
417
	peer_sockaddr.ss_family = AF_UNSPEC;
418
	int name_length = 0;
419
	socklen_t peer_socklen = sizeof(peer_sockaddr);
Sergio Ammirata's avatar
Sergio Ammirata committed
420
421
422
423
	int ret_hostname = gethostname(hostname, RIST_MAX_HOSTNAME);
	if (ret_hostname == -1) {
		snprintf(hostname, RIST_MAX_HOSTNAME, "UnknownHost");
	}
424
425
426

	int ret_sockname = getsockname(fd, (struct sockaddr *)&peer_sockaddr, &peer_socklen);
	if (ret_sockname == 0)
Sergio Ammirata's avatar
Sergio Ammirata committed
427
	{
428
		struct sockaddr *xsa = (struct sockaddr *)&peer_sockaddr;
Sergio Ammirata's avatar
Sergio Ammirata committed
429
		// TODO: why is this returning non-sense?
430
		if (xsa->sa_family == AF_INET) {
Sergio Ammirata's avatar
Sergio Ammirata committed
431
			struct sockaddr_in *xin = (struct sockaddr_in*)&peer_sockaddr;
432
433
434
435
436
437
438
			char *addr = inet_ntoa(xin->sin_addr);
			if (strcmp(addr, "0.0.0.0") != 0) {
				name_length = snprintf(identifier, RIST_MAX_HOSTNAME, "%s@%s:%u", hostname,
										addr, ntohs(xin->sin_port));
				if (name_length >= RIST_MAX_HOSTNAME)
					identifier[RIST_MAX_HOSTNAME-1] = 0;
			}
439
		}/* else if (xsa->sa_family == AF_INET6) {
Sergio Ammirata's avatar
Sergio Ammirata committed
440
441
442
443
444
445
446
			struct sockaddr_in6 *xin6 = (void*)peer;
			char str[INET6_ADDRSTRLEN];
			inet_ntop(xin6->sin6_family, &xin6->sin6_addr, str, sizeof(struct in6_addr));
			name_length = snprintf(identifier, RIST_MAX_HOSTNAME, "%s@%s:%u", hostname,
							str, ntohs(xin6->sin6_port));
			if (name_length >= RIST_MAX_HOSTNAME)
				identifier[RIST_MAX_HOSTNAME-1] = 0;
447
		}*/
Sergio Ammirata's avatar
Sergio Ammirata committed
448
449
450
451
452
453
454
455
456
457
458
459
	}

	if (name_length == 0)
	{
		name_length = snprintf(identifier, RIST_MAX_HOSTNAME, "%s", hostname);
		if (name_length >= RIST_MAX_HOSTNAME)
			identifier[RIST_MAX_HOSTNAME-1] = 0;
	}
}

void rist_create_socket(struct rist_peer *peer)
{
460
	if(!peer->address_family && rist_set_url(peer)) {
Sergio Ammirata's avatar
Sergio Ammirata committed
461
462
463
464
465
		return;
	}

	if (peer->local_port) {
		const char* host;
466
		uint16_t port;
Sergio Ammirata's avatar
Sergio Ammirata committed
467
468
469
470
471
472
473
474
475
476
477
478

		char buffer[256];
		if (peer->u.address.sa_family == AF_INET) {
			struct sockaddr_in *addrv4 = (struct sockaddr_in *)&(peer->u);
			host = inet_ntop(AF_INET, &(addrv4->sin_addr), buffer, sizeof(buffer));
			port = htons(addrv4->sin_port);
		} else {
			struct sockaddr_in6 *addrv6 = (struct sockaddr_in6 *)&(peer->u);
			host = inet_ntop(AF_INET6, &(addrv6->sin6_addr), buffer, sizeof(buffer));
			port = htons(addrv6->sin6_port);
		}
		if (!host) {
Gijs Peskens's avatar
Gijs Peskens committed
479
			rist_log_priv(get_cctx(peer), RIST_LOG_INFO, "failed to convert address to string (errno=%d)", errno);
Sergio Ammirata's avatar
Sergio Ammirata committed
480
481
482
			return;
		}

483
		peer->sd = udpsocket_open_bind(host, port, &peer->miface[0]);
Sergio Ammirata's avatar
Sergio Ammirata committed
484
		if (peer->sd > 0) {
Gijs Peskens's avatar
Gijs Peskens committed
485
			rist_log_priv(get_cctx(peer), RIST_LOG_INFO, "Starting in URL listening mode (socket# %d)\n", peer->sd);
Sergio Ammirata's avatar
Sergio Ammirata committed
486
		} else {
Gijs Peskens's avatar
Gijs Peskens committed
487
			rist_log_priv(get_cctx(peer), RIST_LOG_ERROR, "Could not start in URL listening mode. %s\n", strerror(errno));
Sergio Ammirata's avatar
Sergio Ammirata committed
488
		}
489
490
491

		// Set non-blocking only for receive sockets
		udpsocket_set_nonblocking(peer->sd);
Sergio Ammirata's avatar
Sergio Ammirata committed
492
493
	}
	else {
494
495
496
497
498
499
500
501
502
503
504
505
506
		if (peer->u.address.sa_family == AF_INET)
		{
			struct sockaddr_in *addrv4 = (struct sockaddr_in *)&(peer->u);
			peer->multicast = IN_MULTICAST(ntohl(addrv4->sin_addr.s_addr));
		}
		else
		{
			struct sockaddr_in6 *addrv6 = (struct sockaddr_in6 *)&(peer->u);
			peer->multicast = IN6_IS_ADDR_MULTICAST(&addrv6->sin6_addr);
		}
		if (peer->multicast) {
			rist_log_priv(get_cctx(peer), RIST_LOG_INFO, "Peer configured for multicast");
		}
Sergio Ammirata's avatar
Sergio Ammirata committed
507
		// We use sendto ... so, no need to connect directly here
508
		peer->sd = udpsocket_open(peer->address_family);
509
		// TODO : set max hops
Sergio Ammirata's avatar
Sergio Ammirata committed
510
		if (peer->sd > 0)
Gijs Peskens's avatar
Gijs Peskens committed
511
			rist_log_priv(get_cctx(peer), RIST_LOG_INFO, "Starting in URL connect mode (%d)\n", peer->sd);
Sergio Ammirata's avatar
Sergio Ammirata committed
512
		else {
Gijs Peskens's avatar
Gijs Peskens committed
513
			rist_log_priv(get_cctx(peer), RIST_LOG_ERROR, "Could not start in URL connect mode. %s\n", strerror(errno));
Sergio Ammirata's avatar
Sergio Ammirata committed
514
		}
515
		peer->local_port = 32768 + (get_cctx(peer)->peer_counter % 28232);
Sergio Ammirata's avatar
Sergio Ammirata committed
516
517
	}

518
519
520
	// Increase default OS udp receive buffer size
	if (udpsocket_set_optimal_buffer_size(peer->sd)) {
		rist_log_priv(get_cctx(peer), RIST_LOG_WARN, "Unable to set the socket receive buffer size to %d Bytes. %s\n",
521
522
			UDPSOCKET_SOCK_BUFSIZE, strerror(errno));
	} else {
523
524
525
526
527
528
529
530
531
532
533
534
		uint32_t current_recvbuf = udpsocket_get_buffer_size(peer->sd);
		rist_log_priv(get_cctx(peer), RIST_LOG_INFO, "Configured the starting socket receive buffer size to %d Bytes.\n",
			current_recvbuf);
	}
	// Increase default OS udp send buffer size
	if (udpsocket_set_optimal_buffer_send_size(peer->sd)) {
		rist_log_priv(get_cctx(peer), RIST_LOG_WARN, "Unable to set the socket send buffer size to %d Bytes. %s\n",
			UDPSOCKET_SOCK_BUFSIZE, strerror(errno));
	} else {
		uint32_t current_sendbuf = udpsocket_get_buffer_send_size(peer->sd);
		rist_log_priv(get_cctx(peer), RIST_LOG_INFO, "Configured the starting socket send buffer size to %d Bytes.\n",
			current_sendbuf);
535
536
	}

Sergio Ammirata's avatar
Sergio Ammirata committed
537
538
	if (peer->cname[0] == 0)
		rist_populate_cname(peer);
Gijs Peskens's avatar
Gijs Peskens committed
539
	rist_log_priv(get_cctx(peer), RIST_LOG_INFO, "Peer cname is %s\n", peer->cname);
Sergio Ammirata's avatar
Sergio Ammirata committed
540
541
542

}

543
544
545
static inline void rist_rtcp_write_empty_rr(uint8_t *buf, int *offset, const uint32_t flow_id) {
	struct rist_rtcp_rr_empty_pkt *rr = (struct rist_rtcp_rr_empty_pkt *)(buf + RIST_MAX_PAYLOAD_OFFSET + *offset);
	*offset += sizeof(struct rist_rtcp_rr_empty_pkt);
Sergio Ammirata's avatar
Sergio Ammirata committed
546
547
	rr->rtcp.flags = RTCP_SR_FLAGS;
	rr->rtcp.ptype = PTYPE_RR;
548
	rr->rtcp.ssrc = htobe32(flow_id);
Sergio Ammirata's avatar
Sergio Ammirata committed
549
	rr->rtcp.len = htons(1);
550
}
Sergio Ammirata's avatar
Sergio Ammirata committed
551

552
553
554
555
556
557
558
559
560
561
562
563
564
565
static inline void rist_rtcp_write_rr(uint8_t *buf, int *offset, const struct rist_peer *peer)
{
	struct rist_rtcp_rr_pkt *rr = (struct rist_rtcp_rr_pkt *)(buf + RIST_MAX_PAYLOAD_OFFSET + *offset);
	*offset += sizeof(struct rist_rtcp_rr_pkt);
	rr->rtcp.flags = RTCP_RR_FULL_FLAGS;
	rr->rtcp.ptype = PTYPE_RR;
	rr->rtcp.ssrc = htobe32(peer->adv_flow_id);
	rr->rtcp.len = htons(7);
	/* TODO fix these variables */
	rr->fraction_lost = 0;
	rr->cumulative_pkt_loss_msb = 0;
	rr->cumulative_pkt_loss_lshw = 0;
	rr->highest_seq = 0;
	rr->jitter = 0;
566
	rr->lsr = htobe32((uint32_t)(peer->last_sender_report_time >> 16));
567
	/*  expressed in units of 1/65536  == middle 16 bits?!? */
568
	rr->dlsr = htobe32((uint32_t)((timestampNTP_u64() - peer->last_sender_report_ts) >> 16));
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
}

static inline void rist_rtcp_write_sr(uint8_t *buf, int *offset, struct rist_peer *peer) {
	struct rist_rtcp_sr_pkt *sr = (struct rist_rtcp_sr_pkt *)(buf + RIST_MAX_PAYLOAD_OFFSET + *offset);
	*offset += sizeof(struct rist_rtcp_sr_pkt);
	/* Populate SR for sender */
	sr->rtcp.flags = RTCP_SR_FLAGS;
	sr->rtcp.ptype = PTYPE_SR;
	sr->rtcp.ssrc = htobe32(peer->adv_flow_id);
	sr->rtcp.len = htons(6);
	uint64_t now = timestampNTP_u64();
	uint64_t now_rtc = timestampNTP_RTC_u64();
	peer->last_sender_report_time = now_rtc;
	peer->last_sender_report_ts = now;
	uint32_t ntp_lsw = (uint32_t)now_rtc;
	// There is 70 years (incl. 17 leap ones) offset to the Unix Epoch.
	// No leap seconds during that period since they were not invented yet.
	uint32_t ntp_msw = now_rtc >> 32;
	sr->ntp_msw = htobe32(ntp_msw);
	sr->ntp_lsw = htobe32(ntp_lsw);
589
	sr->rtp_ts = htobe32(timestampRTP_u32(0, now));
590
591
592
593
594
595
	sr->sender_pkts = 0;  //htonl(f->packets_count);
	sr->sender_bytes = 0; //htonl(f->bytes_count);
}

static inline void rist_rtcp_write_sdes(uint8_t *buf, int *offset, const char *name, const uint32_t flow_id)
{
596
597
598
	size_t namelen = strlen(name);
	size_t sdes_size = ((10 + namelen + 1) + 3) & ~3;
	size_t padding = sdes_size - namelen - 10;
599
600
	struct rist_rtcp_sdes_pkt *sdes = (struct rist_rtcp_sdes_pkt *)(buf + RIST_MAX_PAYLOAD_OFFSET + *offset);
	*offset += sdes_size;
Sergio Ammirata's avatar
Sergio Ammirata committed
601
602
603
	/* Populate SDES for sender description */
	sdes->rtcp.flags = RTCP_SDES_FLAGS;
	sdes->rtcp.ptype = PTYPE_SDES;
604
	sdes->rtcp.len = htons((uint16_t)((sdes_size - 1) >> 2));
605
	sdes->rtcp.ssrc = htobe32(flow_id);
Sergio Ammirata's avatar
Sergio Ammirata committed
606
	sdes->cname = 1;
607
	sdes->name_len = (uint8_t)namelen;
608
	// We copy the extra padding bytes from the source because it is a preallocated buffer
609
	// of size 128 with all zeroes
610
611
	memcpy(sdes->udn, name, namelen + padding);
}
Sergio Ammirata's avatar
Sergio Ammirata committed
612

613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
static inline void rist_rtcp_write_echoreq(uint8_t *buf, int *offset, const uint32_t flow_id)
{
	struct rist_rtcp_echoext *echo = (struct rist_rtcp_echoext *)(buf + RIST_MAX_PAYLOAD_OFFSET + *offset);
	*offset += sizeof(struct rist_rtcp_echoext);
	echo->flags = RTCP_ECHOEXT_REQ_FLAGS;
	echo->ptype = PTYPE_NACK_CUSTOM;
	echo->ssrc = htobe32(flow_id);
	echo->len = htons(5);
	memcpy(echo->name, "RIST", 4);
	uint64_t now = timestampNTP_u64();
	echo->ntp_msw = htobe32((uint32_t)(now >> 32));
	echo->ntp_lsw = htobe32((uint32_t)(now & 0x000000000FFFFFFFF));
}

static inline void rist_rtcp_write_echoresp(uint8_t *buf,int *offset, const uint64_t request_time, const uint32_t flow_id) {
	struct rist_rtcp_echoext *echo = (struct rist_rtcp_echoext *)(buf + RIST_MAX_PAYLOAD_OFFSET + *offset);
	*offset += sizeof(struct rist_rtcp_echoext);
	echo->flags = RTCP_ECHOEXT_RESP_FLAGS;
	echo->ptype = PTYPE_NACK_CUSTOM;
	echo->len = htons(5);
	echo->ssrc = htobe32(flow_id);
	memcpy(echo->name, "RIST", 4);
	echo->ntp_msw = htobe32((uint32_t)(request_time >> 32));
	echo->ntp_lsw = htobe32((uint32_t)(request_time & 0x000000000FFFFFFFF));
	echo->delay = 0;
}

Gijs Peskens's avatar
Gijs Peskens committed
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
static inline void rist_rtcp_write_xr_echoreq(uint8_t *buf, int *offset,struct rist_peer *peer, const uint32_t flow_id)
{
	struct rist_rtcp_hdr *xr_hdr = (struct rist_rtcp_hdr *)(buf + RIST_MAX_PAYLOAD_OFFSET + *offset);
	*offset += sizeof(*xr_hdr);
	xr_hdr->flags =  0x80;//v=2;p=0;
	xr_hdr->ptype = PTYPE_XR;
	xr_hdr->ssrc = htobe32(flow_id);
	struct rist_rtcp_xr_rrtrb *block = (struct rist_rtcp_xr_rrtrb *)(buf + RIST_MAX_PAYLOAD_OFFSET + *offset);
	*offset += sizeof(*block);
	block->block_type = 4;
	block->length = htobe16(2);
	block->reserved = 0;
	uint64_t now = timestampNTP_u64();
	peer->last_sender_report_ts = now;
	block->ntp_msw = htobe32((uint32_t)(now >> 32));
	block->ntp_lsw = htobe32((uint32_t)(now & 0x000000000FFFFFFFF));
	xr_hdr->len = htobe16(1 + sizeof(*block)/4);
}

659
660
661
662
663
664
665
int rist_receiver_periodic_rtcp(struct rist_peer *peer) {
	uint8_t payload_type = RIST_PAYLOAD_TYPE_RTCP;
	uint8_t *rtcp_buf = get_cctx(peer)->buf.rtcp;

	int payload_len = 0;
	rist_rtcp_write_rr(rtcp_buf, &payload_len, peer);
	rist_rtcp_write_sdes(rtcp_buf, &payload_len, peer->cname, peer->adv_flow_id);
Gijs Peskens's avatar
Gijs Peskens committed
666
667
	if (peer->echo_enabled == false)
		rist_rtcp_write_xr_echoreq(rtcp_buf, &payload_len, peer, peer->adv_flow_id);
668
	rist_rtcp_write_echoreq(rtcp_buf, &payload_len, peer->adv_flow_id);
669
	return rist_send_common_rtcp(peer, payload_type, &rtcp_buf[RIST_MAX_PAYLOAD_OFFSET], payload_len, 0, peer->local_port, peer->remote_port, 0);
670
671
}

Gijs Peskens's avatar
Gijs Peskens committed
672
int rist_receiver_send_nacks(struct rist_peer *peer, uint32_t seq_array[], size_t array_len)
673
{
674
675
676
	if (get_cctx(peer)->debug)
		rist_log_priv(get_cctx(peer), RIST_LOG_DEBUG, "Sending %d nacks starting with %"PRIu32"\n",
		array_len, seq_array[0]);
677
678
679
680
681
682
683
	uint8_t payload_type = RIST_PAYLOAD_TYPE_RTCP;
	uint8_t *rtcp_buf = get_cctx(peer)->buf.rtcp;

	int payload_len = 0;
	rist_rtcp_write_empty_rr(rtcp_buf, &payload_len, peer->adv_flow_id);
	rist_rtcp_write_sdes(rtcp_buf, &payload_len, peer->cname, peer->adv_flow_id);
	if (RIST_LIKELY(array_len > 0)) {
Sergio Ammirata's avatar
Sergio Ammirata committed
684
685
		// Add nack requests (if any)
		struct rist_rtp_nack_record *rec;
686
		uint32_t fci_count = 1;
Sergio Ammirata's avatar
Sergio Ammirata committed
687
688

		// Now the NACK message
689
		if (peer->receiver_ctx->nack_type == RIST_NACK_BITMASK)
Sergio Ammirata's avatar
Sergio Ammirata committed
690
		{
Gijs Peskens's avatar
Gijs Peskens committed
691
			struct rist_rtcp_nack_bitmask *rtcp = (struct rist_rtcp_nack_bitmask *)(rtcp_buf + RIST_MAX_PAYLOAD_OFFSET + payload_len);
Sergio Ammirata's avatar
Sergio Ammirata committed
692
693
694
695
			rtcp->flags = RTCP_NACK_BITMASK_FLAGS;
			rtcp->ptype = PTYPE_NACK_BITMASK;
			rtcp->ssrc_source = 0; // TODO
			rtcp->ssrc = htobe32(peer->adv_flow_id);
Gijs Peskens's avatar
Gijs Peskens committed
696
			rec = (struct rist_rtp_nack_record *)(rtcp_buf + RIST_MAX_PAYLOAD_OFFSET + payload_len + RTCP_FB_HEADER_SIZE);
697
698
699
			uint32_t last_seq, tmp_seq;
			tmp_seq = last_seq = seq_array[0];
			uint32_t boundary = tmp_seq +16;
700
			rec->start = htons((uint16_t)tmp_seq);
701
			uint16_t extra = 0;
Gijs Peskens's avatar
Gijs Peskens committed
702
			for (size_t i = 1; i < array_len; i++)
703
704
705
			{
				tmp_seq = seq_array[i];
				if (last_seq < tmp_seq && tmp_seq <= boundary) {
706
					uint32_t bitnum = tmp_seq - last_seq;
707
708
709
710
711
712
					SET_BIT(extra, (bitnum -1));
				} else {
					rec->extra = htons(extra);
					rec++;
					fci_count++;
					extra = 0;
713
					rec->start = htons((uint16_t)tmp_seq);
714
715
716
					last_seq = tmp_seq;
					boundary = tmp_seq + 16;
				}
Sergio Ammirata's avatar
Sergio Ammirata committed
717
			}
718
			rec->extra = htons(extra);
719
			rtcp->len = htons((uint16_t)(2 + fci_count));
Sergio Ammirata's avatar
Sergio Ammirata committed
720
721
722
		}
		else // PTYPE_NACK_CUSTOM
		{
Gijs Peskens's avatar
Gijs Peskens committed
723
			struct rist_rtcp_nack_range *rtcp = (struct rist_rtcp_nack_range *)(rtcp_buf + RIST_MAX_PAYLOAD_OFFSET + payload_len);
Sergio Ammirata's avatar
Sergio Ammirata committed
724
725
726
727
			rtcp->flags = RTCP_NACK_RANGE_FLAGS;
			rtcp->ptype = PTYPE_NACK_CUSTOM;
			rtcp->ssrc_source = htobe32(peer->adv_flow_id);
			memcpy(rtcp->name, "RIST", 4);
Gijs Peskens's avatar
Gijs Peskens committed
728
			rec = (struct rist_rtp_nack_record *)(rtcp_buf + RIST_MAX_PAYLOAD_OFFSET + payload_len + RTCP_FB_HEADER_SIZE);
729
730
731
732
			uint16_t tmp_seq = (uint16_t)seq_array[0];
			uint16_t last_seq = tmp_seq;
			rec->start = htons(tmp_seq);
			uint16_t extra = 0;
Gijs Peskens's avatar
Gijs Peskens committed
733
			for (size_t i = 1; i < array_len; i++)
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
			{
				tmp_seq = (uint16_t)seq_array[i];
				if (RIST_UNLIKELY(extra == UINT16_MAX)) {
					rec->extra = htons(extra);
					rec++;
					fci_count++;
					rec->start = htons(tmp_seq);
					extra = 0;
				} else if (tmp_seq == last_seq +1) {
					extra++;
				} else {
					rec->extra = htons(extra);
					rec++;
					fci_count++;
					rec->start = htons(tmp_seq);
					extra = 0;
				}
				last_seq = tmp_seq;
Sergio Ammirata's avatar
Sergio Ammirata committed
752
			}
753
			rec->extra = htons(extra);
754
			rtcp->len = htons((uint16_t)(2 + fci_count));
Sergio Ammirata's avatar
Sergio Ammirata committed
755
		}
Gijs Peskens's avatar
Gijs Peskens committed
756
		int nack_bufsize = RTCP_FB_HEADER_SIZE + RTCP_FB_FCI_GENERIC_NACK_SIZE * fci_count;
Sergio Ammirata's avatar
Sergio Ammirata committed
757
758
759
760
		payload_len += nack_bufsize;
		payload_type = RIST_PAYLOAD_TYPE_RTCP_NACK;
	}

761
	// We use direct send from receiver to sender (no fifo to keep track of seq/idx)
762
	return rist_send_common_rtcp(peer, payload_type, &rtcp_buf[RIST_MAX_PAYLOAD_OFFSET], payload_len, 0, peer->local_port, peer->remote_port, 0);
Sergio Ammirata's avatar
Sergio Ammirata committed
763
764
}

765
static void rist_sender_send_rtcp(uint8_t *rtcp_buf, int payload_len, struct rist_peer *peer) {
766
	rist_send_common_rtcp(peer, RIST_PAYLOAD_TYPE_RTCP, rtcp_buf, payload_len, 0, peer->local_port, peer->remote_port, 0);
767
768
}

769
void rist_sender_periodic_rtcp(struct rist_peer *peer) {
Sergio Ammirata's avatar
Sergio Ammirata committed
770
	uint8_t *rtcp_buf = get_cctx(peer)->buf.rtcp;
771
	int payload_len = 0;
Sergio Ammirata's avatar
Sergio Ammirata committed
772

773
774
	rist_rtcp_write_sr(rtcp_buf, &payload_len, peer);
	rist_rtcp_write_sdes(rtcp_buf, &payload_len, peer->cname, peer->adv_flow_id);
Gijs Peskens's avatar
Gijs Peskens committed
775
776
	if (peer->echo_enabled)
		rist_rtcp_write_echoreq(rtcp_buf, &payload_len, peer->adv_flow_id);
Sergio Ammirata's avatar
Sergio Ammirata committed
777
	// Push it to the FIFO buffer to be sent ASAP (even in the simple profile case)
778
	rist_sender_send_rtcp(&rtcp_buf[RIST_MAX_PAYLOAD_OFFSET], payload_len, peer);
779
	return;
Sergio Ammirata's avatar
Sergio Ammirata committed
780
781
}

782
783
784
785
786
787
788
789
int rist_respond_echoreq(struct rist_peer *peer, const uint64_t echo_request_time) {
	uint8_t *rtcp_buf = get_cctx(peer)->buf.rtcp;
	int payload_len = 0;
	rist_rtcp_write_empty_rr(rtcp_buf, &payload_len, peer->adv_flow_id);
	rist_rtcp_write_sdes(rtcp_buf, &payload_len, peer->cname, peer->adv_flow_id);
	rist_rtcp_write_echoresp(rtcp_buf, &payload_len, echo_request_time, peer->adv_flow_id);
	if (peer->receiver_mode) {
		uint8_t payload_type = RIST_PAYLOAD_TYPE_RTCP;
790
		return rist_send_common_rtcp(peer, payload_type, &rtcp_buf[RIST_MAX_PAYLOAD_OFFSET], payload_len, 0, peer->local_port, peer->remote_port, 0);
791
792
	} else {
		/* I do this to not break advanced mode, however echo responses should really NOT be resend when lost ymmv */
793
		rist_sender_send_rtcp(&rtcp_buf[RIST_MAX_PAYLOAD_OFFSET], payload_len, peer);
794
795
796
		return 0;
	}
}
Sergio Ammirata's avatar
Sergio Ammirata committed
797

798
799
800
801
802
803
804
805
806
int rist_request_echo(struct rist_peer *peer) {
	uint8_t *rtcp_buf = get_cctx(peer)->buf.rtcp;
	int payload_len = 0;
	rist_rtcp_write_empty_rr(rtcp_buf, &payload_len, peer->adv_flow_id);
	rist_rtcp_write_sdes(rtcp_buf, &payload_len, peer->cname, peer->adv_flow_id);
	rist_rtcp_write_echoreq(rtcp_buf, &payload_len, peer->adv_flow_id);
	if (peer->receiver_mode)
	{
		uint8_t payload_type = RIST_PAYLOAD_TYPE_RTCP;
807
		return rist_send_common_rtcp(peer, payload_type, &rtcp_buf[RIST_MAX_PAYLOAD_OFFSET], payload_len, 0, peer->local_port, peer->remote_port, 0);
808
809
810
	}
	else
	{
811
		rist_sender_send_rtcp(&rtcp_buf[RIST_MAX_PAYLOAD_OFFSET], payload_len, peer);
812
813
814
		return 0;
	}
}
Sergio Ammirata's avatar
Sergio Ammirata committed
815

Gijs Peskens's avatar
Gijs Peskens committed
816
int rist_sender_enqueue(struct rist_sender *ctx, const void *data, size_t len, uint64_t datagram_time, uint16_t src_port, uint16_t dst_port, uint32_t seq_rtp)
Sergio Ammirata's avatar
Sergio Ammirata committed
817
818
{
	uint8_t payload_type = RIST_PAYLOAD_TYPE_DATA_RAW;
Gijs Peskens's avatar
Gijs Peskens committed
819
	const void * payload = data;
Sergio Ammirata's avatar
Sergio Ammirata committed
820
821
822
823
824
825
	if (ctx->common.PEERS == NULL) {
		// Do not cache data if the lib user has not added peers
		return -1;
	}

	ctx->last_datagram_time = datagram_time;
Gijs Peskens's avatar
Gijs Peskens committed
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
	uint8_t tmp_buf[6 * 204 + 4];//Max size needed with at least 1 pkt suppressed
	if (ctx->null_packet_suppression && len <= 7 * 204)
	{

		struct rist_rtp_hdr_ext *hdr_ext = (struct rist_rtp_hdr_ext *)&tmp_buf;
		memset(tmp_buf, 0, sizeof(*hdr_ext));//hdr_ext
		int ret = 0;
		if ((ret = suppress_null_packets(data, &tmp_buf[sizeof(*hdr_ext)], &len, hdr_ext)) > 0)
		{
			memcpy(&hdr_ext->identifier, "RI", 2);
			hdr_ext->length = htobe16(1);
			len += sizeof(*hdr_ext);
			payload = tmp_buf;
			payload_type = RIST_PAYLOAD_TYPE_DATA_RAW_RTP_EXT;
		}
	}
Sergio Ammirata's avatar
Sergio Ammirata committed
842

843
	/* insert into sender fifo queue */
Gijs Peskens's avatar
Gijs Peskens committed
844
	pthread_mutex_lock(&ctx->queue_lock);
Gijs Peskens's avatar
Gijs Peskens committed
845
	size_t sender_write_index = atomic_load_explicit(&ctx->sender_queue_write_index, memory_order_acquire);
Gijs Peskens's avatar
Gijs Peskens committed
846
	ctx->sender_queue[sender_write_index] = rist_new_buffer(&ctx->common, payload, len, payload_type, 0, datagram_time, src_port, dst_port);
Gijs Peskens's avatar
Gijs Peskens committed
847
	if (RIST_UNLIKELY(!ctx->sender_queue[sender_write_index])) {
Gijs Peskens's avatar
Gijs Peskens committed
848
		rist_log_priv(&ctx->common, RIST_LOG_ERROR, "\t Could not create packet buffer inside sender buffer, OOM, decrease max bitrate or buffer time length\n");
Gijs Peskens's avatar
Gijs Peskens committed
849
		pthread_mutex_unlock(&ctx->queue_lock);
Sergio Ammirata's avatar
Sergio Ammirata committed
850
851
		return -1;
	}
852
	ctx->sender_queue[sender_write_index]->seq_rtp = (uint16_t)seq_rtp;
853
	ctx->sender_queue_bytesize += len;
Gijs Peskens's avatar
Gijs Peskens committed
854
	atomic_store_explicit(&ctx->sender_queue_write_index, (sender_write_index + 1) & (ctx->sender_queue_max - 1), memory_order_release);
Gijs Peskens's avatar
Gijs Peskens committed
855
	pthread_mutex_unlock(&ctx->queue_lock);
Sergio Ammirata's avatar
Sergio Ammirata committed
856
857
858
859

	return 0;
}

860
void rist_sender_send_data_balanced(struct rist_sender *ctx, struct rist_buffer *buffer)
Sergio Ammirata's avatar
Sergio Ammirata committed
861
862
863
864
{
	struct rist_peer *peer;
	struct rist_peer *selected_peer_by_weight = NULL;
	uint32_t max_remainder = 0;
865
	int peercnt;
866
	bool looped = false;
Gijs Peskens's avatar
Gijs Peskens committed
867
868
869

	//We can do it safely here, since this function is only to be called once per packet
	buffer->seq = ctx->common.seq++;
870

871
872
873
peer_select:

	peercnt = 0;
Sergio Ammirata's avatar
Sergio Ammirata committed
874
875
	for (peer = ctx->common.PEERS; peer; peer = peer->next) {

876
		if (!peer->is_data || peer->parent)
Sergio Ammirata's avatar
Sergio Ammirata committed
877
			continue;
878
879
880
881
#ifdef USE_MBEDTLS
		if (!peer->listening && !eap_is_authenticated(peer->eap_ctx))
			continue;
#endif
882
		if ((!peer->listening && !peer->authenticated) || peer->dead
883
			|| (peer->listening && !peer->child_alive_count)) {
Sergio Ammirata's avatar
Sergio Ammirata committed
884
			ctx->weight_counter -= peer->config.weight;
Sergio Ammirata's avatar
Sergio Ammirata committed
885
886
887
			if (ctx->weight_counter <= 0) {
				ctx->weight_counter = ctx->total_weight;
			}
Sergio Ammirata's avatar
Sergio Ammirata committed
888
			peer->w_count = peer->config.weight;
Sergio Ammirata's avatar
Sergio Ammirata committed
889
890
			continue;
		}
891
		peercnt++;
Sergio Ammirata's avatar
Sergio Ammirata committed
892

Sergio Ammirata's avatar
Sergio Ammirata committed
893
894
		/*************************************/
		/* * * * * * * * * * * * * * * * * * */
Sergio Ammirata's avatar
Sergio Ammirata committed
895
		/** Heuristics for sender goes here **/
Sergio Ammirata's avatar
Sergio Ammirata committed
896
897
		/* * * * * * * * * * * * * * * * * * */
		/*************************************/
Sergio Ammirata's avatar
Sergio Ammirata committed
898

899
		if (peer->config.weight == 0 && !looped) {
900
901
902
			if (peer->listening) {
				struct rist_peer *child = peer->child;
				while (child) {
903
904
905
906
907
908
#ifdef USE_MBEDTLS
					if (!eap_is_authenticated(child->eap_ctx))
					{
						//do nothing
					} else
#endif
Gijs Peskens's avatar
Gijs Peskens committed
909
					if (child->is_data && !child->dead) {
910
					uint8_t *payload = buffer->data;
911
					rist_send_common_rtcp(child, buffer->type, &payload[RIST_MAX_PAYLOAD_OFFSET], buffer->size, buffer->source_time, buffer->src_port, buffer->dst_port, buffer->seq_rtp);
Gijs Peskens's avatar
Gijs Peskens committed
912
					}
913
914
915
916
					child = child->sibling_next;
				}
			} else {
				uint8_t *payload = buffer->data;
917
				rist_send_common_rtcp(peer, buffer->type, &payload[RIST_MAX_PAYLOAD_OFFSET], buffer->size, buffer->source_time, buffer->src_port, buffer->dst_port, buffer->seq_rtp);
918
			}
Sergio Ammirata's avatar
Sergio Ammirata committed
919
920
921
922
923
924
925
926
927
928
		} else {
			/* Election of next peer */
			// printf("peer election: considering %p, count=%d (wc: %d)\n",
			// peer, peer->w_count, ctx->weight_counter);
			if (peer->w_count > max_remainder) {
				max_remainder = peer->w_count;
				selected_peer_by_weight = peer;
			}
		}
	}
929
	looped = true;
Sergio Ammirata's avatar
Sergio Ammirata committed
930
931
932

	if (selected_peer_by_weight) {
		peer = selected_peer_by_weight;
933
934
935
		if (peer->listening) {
			struct rist_peer *child = peer->child;
			while (child) {
936
937
938
939
940
941
#ifdef USE_MBEDTLS
					if (!eap_is_authenticated(child->eap_ctx))
					{
						//do nothing
					} else
#endif
Gijs Peskens's avatar
Gijs Peskens committed
942
943
				if (child->is_data && !child->dead) {
					uint8_t *payload = buffer->data;
944
					rist_send_common_rtcp(child, buffer->type, &payload[RIST_MAX_PAYLOAD_OFFSET], buffer->size, buffer->source_time, buffer->src_port, buffer->dst_port,  buffer->seq_rtp);
Gijs Peskens's avatar
Gijs Peskens committed
945
				}
946
947
948
949
				child = child->sibling_next;
			}
		} else {
			uint8_t *payload = buffer->data;
950
			rist_send_common_rtcp(peer, buffer->type, &payload[RIST_MAX_PAYLOAD_OFFSET], buffer->size, buffer->source_time, buffer->src_port, buffer->dst_port, buffer->seq_rtp);
951
952
953
			ctx->weight_counter--;
			peer->w_count--;
		}
Sergio Ammirata's avatar
Sergio Ammirata committed
954
955
	}

956
	if (ctx->total_weight > 0 && (ctx->weight_counter == 0 || !selected_peer_by_weight)) {
Sergio Ammirata's avatar
Sergio Ammirata committed
957
958
959
960
961
		peer = ctx->common.PEERS;
		ctx->weight_counter = ctx->total_weight;
		for (; peer; peer = peer->next) {
			if (peer->listening || !peer->is_data)
				continue;
Sergio Ammirata's avatar
Sergio Ammirata committed
962
			peer->w_count = peer->config.weight;
Sergio Ammirata's avatar
Sergio Ammirata committed
963
		}
964
965
		if (!selected_peer_by_weight && peercnt > 0)
			goto peer_select;
Sergio Ammirata's avatar
Sergio Ammirata committed
966
967
968
	}
}

969
static size_t rist_sender_index_get(struct rist_sender *ctx, uint32_t seq)
Sergio Ammirata's avatar
Sergio Ammirata committed
970
{
971
	size_t idx = ctx->seq_index[(uint16_t)seq];
Sergio Ammirata's avatar
Sergio Ammirata committed
972
973
974
	return idx;
}

975
976
size_t rist_get_sender_retry_queue_size(struct rist_sender *ctx)
{
977
978
979
	size_t retry_queue_size = (ctx->sender_retry_queue_write_index - ctx->sender_retry_queue_read_index)
							& (ctx->sender_retry_queue_size - 1);
	return retry_queue_size;
980
981
982
}

/* This function must return, 0 when there is nothing to send, < 0 on error and > 0 for bytes sent */
Gijs Peskens's avatar
Gijs Peskens committed
983
ssize_t rist_retry_dequeue(struct rist_sender *ctx)
Sergio Ammirata's avatar
Sergio Ammirata committed
984
{
Gijs Peskens's avatar
Gijs Peskens committed
985
//	rist_log_priv(&ctx->common, RIST_LOG_ERROR,
986
987
//			"\tCurrent read/write index are %zu/%zu \n", ctx->sender_retry_queue_read_index,
//			ctx->sender_retry_queue_write_index);
Sergio Ammirata's avatar
Sergio Ammirata committed
988

Sergio Ammirata's avatar
Sergio Ammirata committed
989
	// TODO: Is this logic flawed and we are always one unit behind (look at oob_dequee)
Gijs Peskens's avatar
Gijs Peskens committed
990
	size_t sender_retry_queue_read_index = (ctx->sender_retry_queue_read_index + 1)& (ctx->sender_retry_queue_size -1);
Sergio Ammirata's avatar
Sergio Ammirata committed
991

992
	if (sender_retry_queue_read_index == ctx->sender_retry_queue_write_index) {
Gijs Peskens's avatar
Gijs Peskens committed
993
		//rist_log_priv(&ctx->common, RIST_LOG_ERROR,
Sergio Ammirata's avatar
Sergio Ammirata committed
994
		//	"\t[GOOD] We are all up to date, index is %" PRIu64 "\n",
995
		//	ctx->sender_retry_queue_read_index);
Sergio Ammirata's avatar
Sergio Ammirata committed
996
997
998
		return 0;
	}

999
1000
	ctx->sender_retry_queue_read_index = sender_retry_queue_read_index;
	struct rist_retry *retry = &ctx->sender_retry_queue[ctx->sender_retry_queue_read_index];
For faster browsing, not all history is shown. View entire blame