-
Notifications
You must be signed in to change notification settings - Fork 24
/
rti_remote.c
1758 lines (1565 loc) · 76.3 KB
/
rti_remote.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#if defined STANDALONE_RTI
/**
* @file
* @author Edward A. Lee
* @author Soroush Bateni
* @author Erling Jellum
* @author Chadlia Jerad
* @copyright (c) 2020-2023, The University of California at Berkeley
* License in [BSD 2-clause](https://github.com/lf-lang/reactor-c/blob/main/LICENSE.md)
* @brief Runtime infrastructure (RTI) for distributed Lingua Franca programs.
*
* This implementation creates one thread per federate so as to be able
* to take advantage of multiple cores. It may be more efficient, however,
* to use select() instead to read from the multiple socket connections
* to each federate.
*
* This implementation sends messages in little endian order
* because Intel, RISC V, and Arm processors are little endian.
* This is not what is normally considered "network order",
* but we control both ends, and hence, for commonly used
* processors, this will be more efficient since it won't have
* to swap bytes.
*
* This implementation of the RTI should be considered a reference
* implementation. In the future it might be re-implemented in Java or Kotlin.
* Or we could bootstrap and implement it using Lingua Franca.
*/
#include "rti_remote.h"
#include "net_util.h"
#include <string.h>
// Global variables defined in tag.c:
extern instant_t start_time;
/**
* Local reference to the rti_remote object
*/
static rti_remote_t* rti_remote;
bool _lf_federate_reports_error = false;
// A convenient macro for getting the `federate_info_t *` at index `_idx`
// and casting it.
#define GET_FED_INFO(_idx) (federate_info_t*)rti_remote->base.scheduling_nodes[_idx]
lf_mutex_t rti_mutex;
lf_cond_t received_start_times;
lf_cond_t sent_start_time;
extern int lf_critical_section_enter(environment_t* env) { return lf_mutex_lock(&rti_mutex); }
extern int lf_critical_section_exit(environment_t* env) { return lf_mutex_unlock(&rti_mutex); }
/**
* Create a server and enable listening for socket connections.
* If the specified port if it is non-zero, it will attempt to acquire that port.
* If it fails, it will repeatedly attempt up to PORT_BIND_RETRY_LIMIT times with
* a delay of PORT_BIND_RETRY_INTERVAL in between. If the specified port is
* zero, then it will attempt to acquire DEFAULT_PORT first. If this fails, then it
* will repeatedly attempt up to PORT_BIND_RETRY_LIMIT times, incrementing the port
* number between attempts, with no delay between attempts. Once it has incremented
* the port number MAX_NUM_PORT_ADDRESSES times, it will cycle around and begin again
* with DEFAULT_PORT.
*
* @param port The port number to use or 0 to start trying at DEFAULT_PORT.
* @param socket_type The type of the socket for the server (TCP or UDP).
* @return The socket descriptor on which to accept connections.
*/
static int create_rti_server(uint16_t port, socket_type_t socket_type) {
// Timeout time for the communications of the server
struct timeval timeout_time = {.tv_sec = TCP_TIMEOUT_TIME / BILLION, .tv_usec = (TCP_TIMEOUT_TIME % BILLION) / 1000};
// Create an IPv4 socket for TCP (not UDP) communication over IP (0).
int socket_descriptor = -1;
if (socket_type == TCP) {
socket_descriptor = create_real_time_tcp_socket_errexit();
} else if (socket_type == UDP) {
socket_descriptor = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
// Set the appropriate timeout time
timeout_time =
(struct timeval){.tv_sec = UDP_TIMEOUT_TIME / BILLION, .tv_usec = (UDP_TIMEOUT_TIME % BILLION) / 1000};
}
if (socket_descriptor < 0) {
lf_print_error_system_failure("Failed to create RTI socket.");
}
// Set the option for this socket to reuse the same address
int true_variable = 1; // setsockopt() requires a reference to the value assigned to an option
if (setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEADDR, &true_variable, sizeof(int32_t)) < 0) {
lf_print_error("RTI failed to set SO_REUSEADDR option on the socket: %s.", strerror(errno));
}
// Set the timeout on the socket so that read and write operations don't block for too long
if (setsockopt(socket_descriptor, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout_time, sizeof(timeout_time)) < 0) {
lf_print_error("RTI failed to set SO_RCVTIMEO option on the socket: %s.", strerror(errno));
}
if (setsockopt(socket_descriptor, SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeout_time, sizeof(timeout_time)) < 0) {
lf_print_error("RTI failed to set SO_SNDTIMEO option on the socket: %s.", strerror(errno));
}
/*
* The following used to permit reuse of a port that an RTI has previously
* used that has not been released. We no longer do this, and instead retry
* some number of times after waiting.
// SO_REUSEPORT (since Linux 3.9)
// Permits multiple AF_INET or AF_INET6 sockets to be bound to an
// identical socket address. This option must be set on each
// socket (including the first socket) prior to calling bind(2)
// on the socket. To prevent port hijacking, all of the
// processes binding to the same address must have the same
// effective UID. This option can be employed with both TCP and
// UDP sockets.
int reuse = 1;
#ifdef SO_REUSEPORT
if (setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEPORT,
(const char*)&reuse, sizeof(reuse)) < 0) {
perror("setsockopt(SO_REUSEPORT) failed");
}
#endif
*/
// Server file descriptor.
struct sockaddr_in server_fd;
// Zero out the server address structure.
bzero((char*)&server_fd, sizeof(server_fd));
uint16_t specified_port = port;
if (specified_port == 0)
port = DEFAULT_PORT;
server_fd.sin_family = AF_INET; // IPv4
server_fd.sin_addr.s_addr = INADDR_ANY; // All interfaces, 0.0.0.0.
// Convert the port number from host byte order to network byte order.
server_fd.sin_port = htons(port);
int result = bind(socket_descriptor, (struct sockaddr*)&server_fd, sizeof(server_fd));
// Try repeatedly to bind to a port. If no specific port is specified, then
// increment the port number each time.
int count = 1;
while (result != 0 && count++ < PORT_BIND_RETRY_LIMIT) {
if (specified_port == 0) {
lf_print_warning("RTI failed to get port %d.", port);
port++;
if (port >= DEFAULT_PORT + MAX_NUM_PORT_ADDRESSES)
port = DEFAULT_PORT;
lf_print_warning("RTI will try again with port %d.", port);
server_fd.sin_port = htons(port);
// Do not sleep.
} else {
lf_print("RTI failed to get port %d. Will try again.", port);
lf_sleep(PORT_BIND_RETRY_INTERVAL);
}
result = bind(socket_descriptor, (struct sockaddr*)&server_fd, sizeof(server_fd));
}
if (result != 0) {
lf_print_error_and_exit("Failed to bind the RTI socket. Port %d is not available. ", port);
}
char* type = "TCP";
if (socket_type == UDP) {
type = "UDP";
}
lf_print("RTI using %s port %d for federation %s.", type, port, rti_remote->federation_id);
if (socket_type == TCP) {
rti_remote->final_port_TCP = port;
// Enable listening for socket connections.
// The second argument is the maximum number of queued socket requests,
// which according to the Mac man page is limited to 128.
listen(socket_descriptor, 128);
} else if (socket_type == UDP) {
rti_remote->final_port_UDP = port;
// No need to listen on the UDP socket
}
return socket_descriptor;
}
void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag) {
if (e->state == NOT_CONNECTED || lf_tag_compare(tag, e->last_granted) <= 0 ||
lf_tag_compare(tag, e->last_provisionally_granted) < 0) {
return;
}
// Need to make sure that the destination federate's thread has already
// sent the starting MSG_TYPE_TIMESTAMP message.
while (e->state == PENDING) {
// Need to wait here.
lf_cond_wait(&sent_start_time);
}
size_t message_length = 1 + sizeof(int64_t) + sizeof(uint32_t);
unsigned char buffer[message_length];
buffer[0] = MSG_TYPE_TAG_ADVANCE_GRANT;
encode_int64(tag.time, &(buffer[1]));
encode_int32((int32_t)tag.microstep, &(buffer[1 + sizeof(int64_t)]));
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_to_federate(send_TAG, e->id, &tag);
}
// This function is called in notify_advance_grant_if_safe(), which is a long
// function. During this call, the socket might close, causing the following write_to_socket
// to fail. Consider a failure here a soft failure and update the federate's status.
if (write_to_socket(((federate_info_t*)e)->socket, message_length, buffer)) {
lf_print_error("RTI failed to send tag advance grant to federate %d.", e->id);
e->state = NOT_CONNECTED;
} else {
e->last_granted = tag;
LF_PRINT_LOG("RTI sent to federate %d the tag advance grant (TAG) " PRINTF_TAG ".", e->id, tag.time - start_time,
tag.microstep);
}
}
void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) {
if (e->state == NOT_CONNECTED || lf_tag_compare(tag, e->last_granted) <= 0 ||
lf_tag_compare(tag, e->last_provisionally_granted) <= 0) {
return;
}
// Need to make sure that the destination federate's thread has already
// sent the starting MSG_TYPE_TIMESTAMP message.
while (e->state == PENDING) {
// Need to wait here.
lf_cond_wait(&sent_start_time);
}
size_t message_length = 1 + sizeof(int64_t) + sizeof(uint32_t);
unsigned char buffer[message_length];
buffer[0] = MSG_TYPE_PROVISIONAL_TAG_ADVANCE_GRANT;
encode_int64(tag.time, &(buffer[1]));
encode_int32((int32_t)tag.microstep, &(buffer[1 + sizeof(int64_t)]));
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_to_federate(send_PTAG, e->id, &tag);
}
// This function is called in notify_advance_grant_if_safe(), which is a long
// function. During this call, the socket might close, causing the following write_to_socket
// to fail. Consider a failure here a soft failure and update the federate's status.
if (write_to_socket(((federate_info_t*)e)->socket, message_length, buffer)) {
lf_print_error("RTI failed to send tag advance grant to federate %d.", e->id);
e->state = NOT_CONNECTED;
} else {
e->last_provisionally_granted = tag;
LF_PRINT_LOG("RTI sent to federate %d the Provisional Tag Advance Grant (PTAG) " PRINTF_TAG ".", e->id,
tag.time - start_time, tag.microstep);
// Send PTAG to all upstream federates, if they have not had
// a later or equal PTAG or TAG sent previously and if their transitive
// NET is greater than or equal to the tag.
// This is needed to stimulate absent messages from upstream and break deadlocks.
// The scenario this deals with is illustrated in `test/C/src/federated/FeedbackDelay2.lf`
// and `test/C/src/federated/FeedbackDelay4.lf`.
// Note that this is transitive.
// NOTE: This is not needed for enclaves because zero-delay loops are prohibited.
// It's only needed for federates, which is why this is implemented here.
for (int j = 0; j < e->num_upstream; j++) {
scheduling_node_t* upstream = rti_remote->base.scheduling_nodes[e->upstream[j]];
// Ignore this federate if it has resigned.
if (upstream->state == NOT_CONNECTED)
continue;
tag_t earliest = earliest_future_incoming_message_tag(upstream);
tag_t strict_earliest = eimt_strict(upstream); // Non-ZDC version.
// If these tags are equal, then a TAG or PTAG should have already been granted,
// in which case, another will not be sent. But it may not have been already granted.
if (lf_tag_compare(earliest, tag) > 0) {
notify_tag_advance_grant(upstream, tag);
} else if (lf_tag_compare(earliest, tag) == 0 && lf_tag_compare(strict_earliest, tag) > 0) {
notify_provisional_tag_advance_grant(upstream, tag);
}
}
}
}
void update_federate_next_event_tag_locked(uint16_t federate_id, tag_t next_event_tag) {
federate_info_t* fed = GET_FED_INFO(federate_id);
tag_t min_in_transit_tag = pqueue_tag_peek_tag(fed->in_transit_message_tags);
if (lf_tag_compare(min_in_transit_tag, next_event_tag) < 0) {
next_event_tag = min_in_transit_tag;
}
update_scheduling_node_next_event_tag_locked(&(fed->enclave), next_event_tag);
}
void handle_port_absent_message(federate_info_t* sending_federate, unsigned char* buffer) {
size_t message_size = sizeof(uint16_t) + sizeof(uint16_t) + sizeof(int64_t) + sizeof(uint32_t);
read_from_socket_fail_on_error(&sending_federate->socket, message_size, &(buffer[1]), NULL,
" RTI failed to read port absent message from federate %u.",
sending_federate->enclave.id);
uint16_t reactor_port_id = extract_uint16(&(buffer[1]));
uint16_t federate_id = extract_uint16(&(buffer[1 + sizeof(uint16_t)]));
tag_t tag = extract_tag(&(buffer[1 + 2 * sizeof(uint16_t)]));
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_from_federate(receive_PORT_ABS, sending_federate->enclave.id, &tag);
}
// Need to acquire the mutex lock to ensure that the thread handling
// messages coming from the socket connected to the destination does not
// issue a TAG before this message has been forwarded.
LF_MUTEX_LOCK(&rti_mutex);
// If the destination federate is no longer connected, issue a warning
// and return.
federate_info_t* fed = GET_FED_INFO(federate_id);
if (fed->enclave.state == NOT_CONNECTED) {
LF_MUTEX_UNLOCK(&rti_mutex);
lf_print_warning("RTI: Destination federate %d is no longer connected. Dropping message.", federate_id);
LF_PRINT_LOG("Fed status: next_event " PRINTF_TAG ", "
"completed " PRINTF_TAG ", "
"last_granted " PRINTF_TAG ", "
"last_provisionally_granted " PRINTF_TAG ".",
fed->enclave.next_event.time - start_time, fed->enclave.next_event.microstep,
fed->enclave.completed.time - start_time, fed->enclave.completed.microstep,
fed->enclave.last_granted.time - start_time, fed->enclave.last_granted.microstep,
fed->enclave.last_provisionally_granted.time - start_time,
fed->enclave.last_provisionally_granted.microstep);
return;
}
LF_PRINT_LOG("RTI forwarding port absent message for port %u to federate %u.", reactor_port_id, federate_id);
// Need to make sure that the destination federate's thread has already
// sent the starting MSG_TYPE_TIMESTAMP message.
while (fed->enclave.state == PENDING) {
// Need to wait here.
lf_cond_wait(&sent_start_time);
}
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_to_federate(send_PORT_ABS, federate_id, &tag);
}
// Forward the message.
write_to_socket_fail_on_error(&fed->socket, message_size + 1, buffer, &rti_mutex,
"RTI failed to forward message to federate %d.", federate_id);
LF_MUTEX_UNLOCK(&rti_mutex);
}
void handle_timed_message(federate_info_t* sending_federate, unsigned char* buffer) {
size_t header_size = 1 + sizeof(uint16_t) + sizeof(uint16_t) + sizeof(uint32_t) + sizeof(int64_t) + sizeof(uint32_t);
// Read the header, minus the first byte which has already been read.
read_from_socket_fail_on_error(&sending_federate->socket, header_size - 1, &(buffer[1]), NULL,
"RTI failed to read the timed message header from remote federate.");
// Extract the header information. of the sender
uint16_t reactor_port_id;
uint16_t federate_id;
size_t length;
tag_t intended_tag;
// Extract information from the header.
extract_timed_header(&(buffer[1]), &reactor_port_id, &federate_id, &length, &intended_tag);
size_t total_bytes_to_read = length + header_size;
size_t bytes_to_read = length;
if (FED_COM_BUFFER_SIZE < header_size + 1) {
lf_print_error_and_exit("Buffer size (%d) is not large enough to "
"read the header plus one byte.",
FED_COM_BUFFER_SIZE);
}
// Cut up the payload in chunks.
if (bytes_to_read > FED_COM_BUFFER_SIZE - header_size) {
bytes_to_read = FED_COM_BUFFER_SIZE - header_size;
}
LF_PRINT_LOG("RTI received message from federate %d for federate %u port %u with intended tag " PRINTF_TAG
". Forwarding.",
sending_federate->enclave.id, federate_id, reactor_port_id, intended_tag.time - lf_time_start(),
intended_tag.microstep);
read_from_socket_fail_on_error(&sending_federate->socket, bytes_to_read, &(buffer[header_size]), NULL,
"RTI failed to read timed message from federate %d.", federate_id);
size_t bytes_read = bytes_to_read + header_size;
// Following only works for string messages.
// LF_PRINT_DEBUG("Message received by RTI: %s.", buffer + header_size);
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_from_federate(receive_TAGGED_MSG, sending_federate->enclave.id, &intended_tag);
}
// Need to acquire the mutex lock to ensure that the thread handling
// messages coming from the socket connected to the destination does not
// issue a TAG before this message has been forwarded.
LF_MUTEX_LOCK(&rti_mutex);
// If the destination federate is no longer connected, issue a warning,
// remove the message from the socket and return.
federate_info_t* fed = GET_FED_INFO(federate_id);
if (fed->enclave.state == NOT_CONNECTED) {
lf_print_warning("RTI: Destination federate %d is no longer connected. Dropping message.", federate_id);
LF_PRINT_LOG("Fed status: next_event " PRINTF_TAG ", "
"completed " PRINTF_TAG ", "
"last_granted " PRINTF_TAG ", "
"last_provisionally_granted " PRINTF_TAG ".",
fed->enclave.next_event.time - start_time, fed->enclave.next_event.microstep,
fed->enclave.completed.time - start_time, fed->enclave.completed.microstep,
fed->enclave.last_granted.time - start_time, fed->enclave.last_granted.microstep,
fed->enclave.last_provisionally_granted.time - start_time,
fed->enclave.last_provisionally_granted.microstep);
// If the message was larger than the buffer, we must empty out the remainder also.
size_t total_bytes_read = bytes_read;
while (total_bytes_read < total_bytes_to_read) {
bytes_to_read = total_bytes_to_read - total_bytes_read;
if (bytes_to_read > FED_COM_BUFFER_SIZE) {
bytes_to_read = FED_COM_BUFFER_SIZE;
}
read_from_socket_fail_on_error(&sending_federate->socket, bytes_to_read, buffer, NULL,
"RTI failed to clear message chunks.");
total_bytes_read += bytes_to_read;
}
LF_MUTEX_UNLOCK(&rti_mutex);
return;
}
LF_PRINT_DEBUG("RTI forwarding message to port %d of federate %hu of length %zu.", reactor_port_id, federate_id,
length);
// Need to make sure that the destination federate's thread has already
// sent the starting MSG_TYPE_TIMESTAMP message.
while (fed->enclave.state == PENDING) {
// Need to wait here.
lf_cond_wait(&sent_start_time);
}
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_to_federate(send_TAGGED_MSG, federate_id, &intended_tag);
}
write_to_socket_fail_on_error(&fed->socket, bytes_read, buffer, &rti_mutex,
"RTI failed to forward message to federate %d.", federate_id);
// The message length may be longer than the buffer,
// in which case we have to handle it in chunks.
size_t total_bytes_read = bytes_read;
while (total_bytes_read < total_bytes_to_read) {
LF_PRINT_DEBUG("Forwarding message in chunks.");
bytes_to_read = total_bytes_to_read - total_bytes_read;
if (bytes_to_read > FED_COM_BUFFER_SIZE) {
bytes_to_read = FED_COM_BUFFER_SIZE;
}
read_from_socket_fail_on_error(&sending_federate->socket, bytes_to_read, buffer, NULL,
"RTI failed to read message chunks.");
total_bytes_read += bytes_to_read;
// FIXME: a mutex needs to be held for this so that other threads
// do not write to destination_socket and cause interleaving. However,
// holding the rti_mutex might be very expensive. Instead, each outgoing
// socket should probably have its own mutex.
write_to_socket_fail_on_error(&fed->socket, bytes_to_read, buffer, &rti_mutex,
"RTI failed to send message chunks.");
}
// Record this in-transit message in federate's in-transit message queue.
if (lf_tag_compare(fed->enclave.completed, intended_tag) < 0) {
// Add a record of this message to the list of in-transit messages to this federate.
pqueue_tag_insert_if_no_match(fed->in_transit_message_tags, intended_tag);
LF_PRINT_DEBUG("RTI: Adding a message with tag " PRINTF_TAG " to the list of in-transit messages for federate %d.",
intended_tag.time - lf_time_start(), intended_tag.microstep, federate_id);
} else {
lf_print_error("RTI: Federate %d has already completed tag " PRINTF_TAG
", but there is an in-transit message with tag " PRINTF_TAG " from federate %hu. "
"This is going to cause an STP violation under centralized coordination.",
federate_id, fed->enclave.completed.time - lf_time_start(), fed->enclave.completed.microstep,
intended_tag.time - lf_time_start(), intended_tag.microstep, sending_federate->enclave.id);
// FIXME: Drop the federate?
}
// If the message tag is less than the most recently received NET from the federate,
// then update the federate's next event tag to match the message tag.
if (lf_tag_compare(intended_tag, fed->enclave.next_event) < 0) {
update_federate_next_event_tag_locked(federate_id, intended_tag);
}
LF_MUTEX_UNLOCK(&rti_mutex);
}
void handle_latest_tag_complete(federate_info_t* fed) {
unsigned char buffer[sizeof(int64_t) + sizeof(uint32_t)];
read_from_socket_fail_on_error(&fed->socket, sizeof(int64_t) + sizeof(uint32_t), buffer, NULL,
"RTI failed to read the content of the logical tag complete from federate %d.",
fed->enclave.id);
tag_t completed = extract_tag(buffer);
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_from_federate(receive_LTC, fed->enclave.id, &completed);
}
_logical_tag_complete(&(fed->enclave), completed);
// FIXME: Should this function be in the enclave version?
LF_MUTEX_LOCK(&rti_mutex);
// See if we can remove any of the recorded in-transit messages for this.
pqueue_tag_remove_up_to(fed->in_transit_message_tags, completed);
LF_MUTEX_UNLOCK(&rti_mutex);
}
void handle_next_event_tag(federate_info_t* fed) {
unsigned char buffer[sizeof(int64_t) + sizeof(uint32_t)];
read_from_socket_fail_on_error(&fed->socket, sizeof(int64_t) + sizeof(uint32_t), buffer, NULL,
"RTI failed to read the content of the next event tag from federate %d.",
fed->enclave.id);
// Acquire a mutex lock to ensure that this state does not change while a
// message is in transport or being used to determine a TAG.
LF_MUTEX_LOCK(&rti_mutex); // FIXME: Instead of using a mutex, it might be more efficient to use a
// select() mechanism to read and process federates' buffers in an orderly fashion.
tag_t intended_tag = extract_tag(buffer);
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_from_federate(receive_NET, fed->enclave.id, &intended_tag);
}
LF_PRINT_LOG("RTI received from federate %d the Next Event Tag (NET) " PRINTF_TAG, fed->enclave.id,
intended_tag.time - start_time, intended_tag.microstep);
update_federate_next_event_tag_locked(fed->enclave.id, intended_tag);
LF_MUTEX_UNLOCK(&rti_mutex);
}
/////////////////// STOP functions ////////////////////
/**
* Boolean used to prevent the RTI from sending the
* MSG_TYPE_STOP_GRANTED message multiple times.
*/
bool stop_granted_already_sent_to_federates = false;
/**
* Once the RTI has seen proposed tags from all connected federates,
* it will broadcast a MSG_TYPE_STOP_GRANTED carrying the _RTI.max_stop_tag.
* This function also checks the most recently received NET from
* each federate and resets that be no greater than the _RTI.max_stop_tag.
*
* This function assumes the caller holds the rti_mutex lock.
*/
static void broadcast_stop_time_to_federates_locked() {
if (stop_granted_already_sent_to_federates == true) {
return;
}
stop_granted_already_sent_to_federates = true;
// Reply with a stop granted to all federates
unsigned char outgoing_buffer[MSG_TYPE_STOP_GRANTED_LENGTH];
ENCODE_STOP_GRANTED(outgoing_buffer, rti_remote->base.max_stop_tag.time, rti_remote->base.max_stop_tag.microstep);
// Iterate over federates and send each the message.
for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) {
federate_info_t* fed = GET_FED_INFO(i);
if (fed->enclave.state == NOT_CONNECTED) {
continue;
}
if (lf_tag_compare(fed->enclave.next_event, rti_remote->base.max_stop_tag) >= 0) {
// Need the next_event to be no greater than the stop tag.
fed->enclave.next_event = rti_remote->base.max_stop_tag;
}
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_to_federate(send_STOP_GRN, fed->enclave.id, &rti_remote->base.max_stop_tag);
}
write_to_socket_fail_on_error(&fed->socket, MSG_TYPE_STOP_GRANTED_LENGTH, outgoing_buffer, &rti_mutex,
"RTI failed to send MSG_TYPE_STOP_GRANTED message to federate %d.", fed->enclave.id);
}
LF_PRINT_LOG("RTI sent to federates MSG_TYPE_STOP_GRANTED with tag " PRINTF_TAG,
rti_remote->base.max_stop_tag.time - start_time, rti_remote->base.max_stop_tag.microstep);
}
/**
* Mark a federate requesting stop. If the number of federates handling stop reaches the
* NUM_OF_FEDERATES, broadcast MSG_TYPE_STOP_GRANTED to every federate.
* This function assumes the _RTI.mutex is already locked.
* @param fed The federate that has requested a stop.
* @return 1 if stop time has been sent to all federates and 0 otherwise.
*/
static int mark_federate_requesting_stop(federate_info_t* fed) {
if (!fed->requested_stop) {
rti_remote->base.num_scheduling_nodes_handling_stop++;
fed->requested_stop = true;
}
if (rti_remote->base.num_scheduling_nodes_handling_stop == rti_remote->base.number_of_scheduling_nodes) {
// We now have information about the stop time of all
// federates.
broadcast_stop_time_to_federates_locked();
return 1;
}
return 0;
}
/**
* Thread to time out if federates do not reply to stop request.
*/
static void* wait_for_stop_request_reply(void* args) {
initialize_lf_thread_id();
// Divide the time into small chunks and check periodically.
interval_t chunk = MAX_TIME_FOR_REPLY_TO_STOP_REQUEST / 30;
int count = 0;
while (count++ < 30) {
if (stop_granted_already_sent_to_federates)
return NULL;
lf_sleep(chunk);
}
// If we reach here, then error out.
lf_print_error_and_exit("Received only %d stop request replies within timeout " PRINTF_TIME "ns. RTI is exiting.",
rti_remote->base.num_scheduling_nodes_handling_stop, MAX_TIME_FOR_REPLY_TO_STOP_REQUEST);
return NULL;
}
void handle_stop_request_message(federate_info_t* fed) {
LF_PRINT_DEBUG("RTI handling stop_request from federate %d.", fed->enclave.id);
size_t bytes_to_read = MSG_TYPE_STOP_REQUEST_LENGTH - 1;
unsigned char buffer[bytes_to_read];
read_from_socket_fail_on_error(&fed->socket, bytes_to_read, buffer, NULL,
"RTI failed to read the MSG_TYPE_STOP_REQUEST payload from federate %d.",
fed->enclave.id);
// Extract the proposed stop tag for the federate
tag_t proposed_stop_tag = extract_tag(buffer);
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_from_federate(receive_STOP_REQ, fed->enclave.id, &proposed_stop_tag);
}
LF_PRINT_LOG("RTI received from federate %d a MSG_TYPE_STOP_REQUEST message with tag " PRINTF_TAG ".",
fed->enclave.id, proposed_stop_tag.time - start_time, proposed_stop_tag.microstep);
// Acquire a mutex lock to ensure that this state does change while a
// message is in transport or being used to determine a TAG.
LF_MUTEX_LOCK(&rti_mutex);
// Check whether we have already received a stop_tag
// from this federate
if (fed->requested_stop) {
// If stop request messages have already been broadcast, treat this as if it were a reply.
if (rti_remote->stop_in_progress) {
mark_federate_requesting_stop(fed);
}
LF_MUTEX_UNLOCK(&rti_mutex);
return;
}
// Update the maximum stop tag received from federates
if (lf_tag_compare(proposed_stop_tag, rti_remote->base.max_stop_tag) > 0) {
rti_remote->base.max_stop_tag = proposed_stop_tag;
}
// If all federates have replied, send stop request granted.
if (mark_federate_requesting_stop(fed)) {
// Have send stop request granted to all federates. Nothing more to do.
LF_MUTEX_UNLOCK(&rti_mutex);
return;
}
// Forward the stop request to all other federates that have not
// also issued a stop request.
unsigned char stop_request_buffer[MSG_TYPE_STOP_REQUEST_LENGTH];
ENCODE_STOP_REQUEST(stop_request_buffer, rti_remote->base.max_stop_tag.time, rti_remote->base.max_stop_tag.microstep);
// Iterate over federates and send each the MSG_TYPE_STOP_REQUEST message
// if we do not have a stop_time already for them. Do not do this more than once.
if (rti_remote->stop_in_progress) {
LF_MUTEX_UNLOCK(&rti_mutex);
return;
}
rti_remote->stop_in_progress = true;
// Need a timeout here in case a federate never replies.
lf_thread_t timeout_thread;
lf_thread_create(&timeout_thread, wait_for_stop_request_reply, NULL);
for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) {
federate_info_t* f = GET_FED_INFO(i);
if (f->enclave.id != fed->enclave.id && f->requested_stop == false) {
if (f->enclave.state == NOT_CONNECTED) {
mark_federate_requesting_stop(f);
continue;
}
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_to_federate(send_STOP_REQ, f->enclave.id, &rti_remote->base.max_stop_tag);
}
write_to_socket_fail_on_error(&f->socket, MSG_TYPE_STOP_REQUEST_LENGTH, stop_request_buffer, &rti_mutex,
"RTI failed to forward MSG_TYPE_STOP_REQUEST message to federate %d.",
f->enclave.id);
}
}
LF_PRINT_LOG("RTI forwarded to federates MSG_TYPE_STOP_REQUEST with tag (" PRINTF_TIME ", %u).",
rti_remote->base.max_stop_tag.time - start_time, rti_remote->base.max_stop_tag.microstep);
LF_MUTEX_UNLOCK(&rti_mutex);
}
void handle_stop_request_reply(federate_info_t* fed) {
size_t bytes_to_read = MSG_TYPE_STOP_REQUEST_REPLY_LENGTH - 1;
unsigned char buffer_stop_time[bytes_to_read];
read_from_socket_fail_on_error(&fed->socket, bytes_to_read, buffer_stop_time, NULL,
"RTI failed to read the reply to MSG_TYPE_STOP_REQUEST message from federate %d.",
fed->enclave.id);
tag_t federate_stop_tag = extract_tag(buffer_stop_time);
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_from_federate(receive_STOP_REQ_REP, fed->enclave.id, &federate_stop_tag);
}
LF_PRINT_LOG("RTI received from federate %d STOP reply tag " PRINTF_TAG ".", fed->enclave.id,
federate_stop_tag.time - start_time, federate_stop_tag.microstep);
// Acquire the mutex lock so that we can change the state of the RTI
LF_MUTEX_LOCK(&rti_mutex);
// If the federate has not requested stop before, count the reply
if (lf_tag_compare(federate_stop_tag, rti_remote->base.max_stop_tag) > 0) {
rti_remote->base.max_stop_tag = federate_stop_tag;
}
mark_federate_requesting_stop(fed);
LF_MUTEX_UNLOCK(&rti_mutex);
}
//////////////////////////////////////////////////
void handle_address_query(uint16_t fed_id) {
federate_info_t* fed = GET_FED_INFO(fed_id);
// Use buffer both for reading and constructing the reply.
// The length is what is needed for the reply.
unsigned char buffer[1 + sizeof(int32_t)];
read_from_socket_fail_on_error(&fed->socket, sizeof(uint16_t), (unsigned char*)buffer, NULL,
"Failed to read address query.");
uint16_t remote_fed_id = extract_uint16(buffer);
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_from_federate(receive_ADR_QR, fed_id, NULL);
}
LF_PRINT_DEBUG("RTI received address query from %d for %d.", fed_id, remote_fed_id);
// NOTE: server_port initializes to -1, which means the RTI does not know
// the port number because it has not yet received an MSG_TYPE_ADDRESS_ADVERTISEMENT message
// from this federate. In that case, it will respond by sending -1.
// Response message is MSG_TYPE_ADDRESS_QUERY_REPLY.
buffer[0] = MSG_TYPE_ADDRESS_QUERY_REPLY;
// Encode the port number.
federate_info_t* remote_fed = GET_FED_INFO(remote_fed_id);
// Send the port number (which could be -1).
LF_MUTEX_LOCK(&rti_mutex);
encode_int32(remote_fed->server_port, (unsigned char*)&buffer[1]);
write_to_socket_fail_on_error(&fed->socket, sizeof(int32_t) + 1, (unsigned char*)buffer, &rti_mutex,
"Failed to write port number to socket of federate %d.", fed_id);
// Send the server IP address to federate.
write_to_socket_fail_on_error(&fed->socket, sizeof(remote_fed->server_ip_addr),
(unsigned char*)&remote_fed->server_ip_addr, &rti_mutex,
"Failed to write ip address to socket of federate %d.", fed_id);
LF_MUTEX_UNLOCK(&rti_mutex);
LF_PRINT_DEBUG("Replied to address query from federate %d with address %s:%d.", fed_id, remote_fed->server_hostname,
remote_fed->server_port);
}
void handle_address_ad(uint16_t federate_id) {
federate_info_t* fed = GET_FED_INFO(federate_id);
// Read the port number of the federate that can be used for physical
// connections to other federates
int32_t server_port = -1;
unsigned char buffer[sizeof(int32_t)];
read_from_socket_fail_on_error(&fed->socket, sizeof(int32_t), (unsigned char*)buffer, NULL,
"Error reading port data from federate %d.", federate_id);
server_port = extract_int32(buffer);
assert(server_port < 65536);
LF_MUTEX_LOCK(&rti_mutex);
fed->server_port = server_port;
LF_MUTEX_UNLOCK(&rti_mutex);
LF_PRINT_LOG("Received address advertisement with port %d from federate %d.", server_port, federate_id);
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_from_federate(receive_ADR_AD, federate_id, NULL);
}
}
void handle_timestamp(federate_info_t* my_fed) {
unsigned char buffer[sizeof(int64_t)];
// Read bytes from the socket. We need 8 bytes.
read_from_socket_fail_on_error(&my_fed->socket, sizeof(int64_t), (unsigned char*)&buffer, NULL,
"ERROR reading timestamp from federate %d.\n", my_fed->enclave.id);
int64_t timestamp = swap_bytes_if_big_endian_int64(*((int64_t*)(&buffer)));
if (rti_remote->base.tracing_enabled) {
tag_t tag = {.time = timestamp, .microstep = 0};
tracepoint_rti_from_federate(receive_TIMESTAMP, my_fed->enclave.id, &tag);
}
LF_PRINT_DEBUG("RTI received timestamp message with time: " PRINTF_TIME ".", timestamp);
LF_MUTEX_LOCK(&rti_mutex);
rti_remote->num_feds_proposed_start++;
if (timestamp > rti_remote->max_start_time) {
rti_remote->max_start_time = timestamp;
}
if (rti_remote->num_feds_proposed_start == rti_remote->base.number_of_scheduling_nodes) {
// All federates have proposed a start time.
lf_cond_broadcast(&received_start_times);
} else {
// Some federates have not yet proposed a start time.
// wait for a notification.
while (rti_remote->num_feds_proposed_start < rti_remote->base.number_of_scheduling_nodes) {
// FIXME: Should have a timeout here?
lf_cond_wait(&received_start_times);
}
}
LF_MUTEX_UNLOCK(&rti_mutex);
// Send back to the federate the maximum time plus an offset on a TIMESTAMP
// message.
unsigned char start_time_buffer[MSG_TYPE_TIMESTAMP_LENGTH];
start_time_buffer[0] = MSG_TYPE_TIMESTAMP;
// Add an offset to this start time to get everyone starting together.
start_time = rti_remote->max_start_time + DELAY_START;
lf_tracing_set_start_time(start_time);
encode_int64(swap_bytes_if_big_endian_int64(start_time), &start_time_buffer[1]);
if (rti_remote->base.tracing_enabled) {
tag_t tag = {.time = start_time, .microstep = 0};
tracepoint_rti_to_federate(send_TIMESTAMP, my_fed->enclave.id, &tag);
}
if (write_to_socket(my_fed->socket, MSG_TYPE_TIMESTAMP_LENGTH, start_time_buffer)) {
lf_print_error("Failed to send the starting time to federate %d.", my_fed->enclave.id);
}
LF_MUTEX_LOCK(&rti_mutex);
// Update state for the federate to indicate that the MSG_TYPE_TIMESTAMP
// message has been sent. That MSG_TYPE_TIMESTAMP message grants time advance to
// the federate to the start time.
my_fed->enclave.state = GRANTED;
lf_cond_broadcast(&sent_start_time);
LF_PRINT_LOG("RTI sent start time " PRINTF_TIME " to federate %d.", start_time, my_fed->enclave.id);
LF_MUTEX_UNLOCK(&rti_mutex);
}
void send_physical_clock(unsigned char message_type, federate_info_t* fed, socket_type_t socket_type) {
if (fed->enclave.state == NOT_CONNECTED) {
lf_print_warning("Clock sync: RTI failed to send physical time to federate %d. Socket not connected.\n",
fed->enclave.id);
return;
}
unsigned char buffer[sizeof(int64_t) + 1];
buffer[0] = message_type;
int64_t current_physical_time = lf_time_physical();
encode_int64(current_physical_time, &(buffer[1]));
// Send the message
if (socket_type == UDP) {
// FIXME: UDP_addr is never initialized.
LF_PRINT_DEBUG("Clock sync: RTI sending UDP message type %u.", buffer[0]);
ssize_t bytes_written = sendto(rti_remote->socket_descriptor_UDP, buffer, 1 + sizeof(int64_t), 0,
(struct sockaddr*)&fed->UDP_addr, sizeof(fed->UDP_addr));
if (bytes_written < (ssize_t)sizeof(int64_t) + 1) {
lf_print_warning("Clock sync: RTI failed to send physical time to federate %d: %s\n", fed->enclave.id,
strerror(errno));
return;
}
} else if (socket_type == TCP) {
LF_PRINT_DEBUG("Clock sync: RTI sending TCP message type %u.", buffer[0]);
LF_MUTEX_LOCK(&rti_mutex);
write_to_socket_fail_on_error(&fed->socket, 1 + sizeof(int64_t), buffer, &rti_mutex,
"Clock sync: RTI failed to send physical time to federate %d.", fed->enclave.id);
LF_MUTEX_UNLOCK(&rti_mutex);
}
LF_PRINT_DEBUG("Clock sync: RTI sent PHYSICAL_TIME_SYNC_MESSAGE with timestamp " PRINTF_TIME " to federate %d.",
current_physical_time, fed->enclave.id);
}
void handle_physical_clock_sync_message(federate_info_t* my_fed, socket_type_t socket_type) {
// Lock the mutex to prevent interference between sending the two
// coded probe messages.
LF_MUTEX_LOCK(&rti_mutex);
// Reply with a T4 type message
send_physical_clock(MSG_TYPE_CLOCK_SYNC_T4, my_fed, socket_type);
// Send the corresponding coded probe immediately after,
// but only if this is a UDP channel.
if (socket_type == UDP) {
send_physical_clock(MSG_TYPE_CLOCK_SYNC_CODED_PROBE, my_fed, socket_type);
}
LF_MUTEX_UNLOCK(&rti_mutex);
}
void* clock_synchronization_thread(void* noargs) {
initialize_lf_thread_id();
// Wait until all federates have been notified of the start time.
// FIXME: Use lf_ version of this when merged with master.
LF_MUTEX_LOCK(&rti_mutex);
while (rti_remote->num_feds_proposed_start < rti_remote->base.number_of_scheduling_nodes) {
lf_cond_wait(&received_start_times);
}
LF_MUTEX_UNLOCK(&rti_mutex);
// Wait until the start time before starting clock synchronization.
// The above wait ensures that start_time has been set.
interval_t ns_to_wait = start_time - lf_time_physical();
if (ns_to_wait > 0LL) {
lf_sleep(ns_to_wait);
}
// Initiate a clock synchronization every rti->clock_sync_period_ns
bool any_federates_connected = true;
while (any_federates_connected) {
// Sleep
lf_sleep(rti_remote->clock_sync_period_ns); // Can be interrupted
any_federates_connected = false;
for (int fed_id = 0; fed_id < rti_remote->base.number_of_scheduling_nodes; fed_id++) {
federate_info_t* fed = GET_FED_INFO(fed_id);
if (fed->enclave.state == NOT_CONNECTED) {
// FIXME: We need better error handling here, but clock sync failure
// should not stop execution.
lf_print_error("Clock sync failed with federate %d. Not connected.", fed_id);
continue;
} else if (!fed->clock_synchronization_enabled) {
continue;
}
// Send the RTI's current physical time to the federate
// Send on UDP.
LF_PRINT_DEBUG("RTI sending T1 message to initiate clock sync round.");
send_physical_clock(MSG_TYPE_CLOCK_SYNC_T1, fed, UDP);
// Listen for reply message, which should be T3.
size_t message_size = 1 + sizeof(int32_t);
unsigned char buffer[message_size];
// Maximum number of messages that we discard before giving up on this cycle.
// If the T3 message from this federate does not arrive and we keep receiving
// other messages, then give up on this federate and move to the next federate.
int remaining_attempts = 5;
while (remaining_attempts > 0) {
remaining_attempts--;
int read_failed = read_from_socket(rti_remote->socket_descriptor_UDP, message_size, buffer);
// If any errors occur, either discard the message or the clock sync round.
if (!read_failed) {
if (buffer[0] == MSG_TYPE_CLOCK_SYNC_T3) {
int32_t fed_id_2 = extract_int32(&(buffer[1]));
// Check that this message came from the correct federate.
if (fed_id_2 != fed->enclave.id) {
// Message is from the wrong federate. Discard the message.
lf_print_warning("Clock sync: Received T3 message from federate %d, "
"but expected one from %d. Discarding message.",
fed_id_2, fed->enclave.id);
continue;
}
LF_PRINT_DEBUG("Clock sync: RTI received T3 message from federate %d.", fed_id_2);
handle_physical_clock_sync_message(GET_FED_INFO(fed_id_2), UDP);
break;
} else {
// The message is not a T3 message. Discard the message and
// continue waiting for the T3 message. This is possibly a message
// from a previous cycle that was discarded.
lf_print_warning("Clock sync: Unexpected UDP message %u. Expected %u from federate %d. "
"Discarding message.",
buffer[0], MSG_TYPE_CLOCK_SYNC_T3, fed->enclave.id);
continue;
}
} else {
lf_print_warning("Clock sync: Read from UDP socket failed: %s. "
"Skipping clock sync round for federate %d.",
strerror(errno), fed->enclave.id);
remaining_attempts = -1;
}
}
if (remaining_attempts > 0) {
any_federates_connected = true;
}
}
}
return NULL;
}
/**
* Handle MSG_TYPE_FAILED sent by a federate. This message is sent by a federate
* that is exiting in failure. In this case, the RTI will
* also terminate abnormally, returning a non-zero exit code when it exits.
*
* This function assumes the caller does not hold the mutex.
*
* @param my_fed The federate sending a MSG_TYPE_FAILED message.
*/
static void handle_federate_failed(federate_info_t* my_fed) {
// Nothing more to do. Close the socket and exit.
LF_MUTEX_LOCK(&rti_mutex);
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_from_federate(receive_FAILED, my_fed->enclave.id, NULL);
}
// Set the flag telling the RTI to exit with an error code when it exits.
_lf_federate_reports_error = true;
lf_print_error("RTI: Federate %d reports an error and has exited.", my_fed->enclave.id);
my_fed->enclave.state = NOT_CONNECTED;
// Indicate that there will no further events from this federate.
my_fed->enclave.next_event = FOREVER_TAG;
// According to this: https://stackoverflow.com/questions/4160347/close-vs-shutdown-socket,