This repository has been archived by the owner on Dec 3, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
flight.c
126 lines (114 loc) · 3.08 KB
/
flight.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/*
* rocks/flight.c
*
* In-flight buffering and recovery.
*
* Copyright (C) 2001 Victor Zandy
* See COPYING for distribution terms.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <assert.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include "rs.h"
#include "ring.h"
#include "log.h"
/* FIXME: eliminate this by fixing the assumption
of fixed size in inflight_limits */
static void
fix_inflight_size(int sd)
{
static const unsigned len = 32*1024;
if (0 > setsockopt(sd, SOL_SOCKET, SO_SNDBUF,
(void*) &len, sizeof(len)))
assert(0);
if (0 > setsockopt(sd, SOL_SOCKET, SO_RCVBUF,
(void*) &len, sizeof(len)))
assert(0);
}
/* Find the upper limits on the amount of in-flight data in the send
and receive directions on socket SD, and store them in MAXSND and
MAXRCV. A call to this must be synchronized with another call by
the peer. Return 0 on success, -1 on failure. */
int
rs_inflight_limits(int sd, unsigned *maxsnd, unsigned *maxrcv)
{
size_t optlen;
int ret;
unsigned peer_snd, peer_rcv;
unsigned locl_snd, locl_rcv;
unsigned x;
fix_inflight_size(sd);
/* Determine our buffer sizes */
optlen = sizeof(locl_snd);
if (0 > getsockopt(sd, SOL_SOCKET, SO_SNDBUF,
(void*) &locl_snd, &optlen)) {
return -1;
}
optlen = sizeof(locl_rcv);
if (0 > getsockopt(sd, SOL_SOCKET, SO_RCVBUF,
(void*) &locl_rcv, &optlen)) {
return -1;
}
/* Tell peer our buffer sizes */
x = htonl(locl_snd);
ret = rs_xwrite(sd, &x, sizeof(x));
if (0 > ret) {
return -1;
}
x = htonl(locl_rcv);
ret = rs_xwrite(sd, &x, sizeof(x));
if (0 > ret) {
return -1;
}
/* Read buffer sizes of peer */
ret = rs_xread(sd, &peer_snd, sizeof(peer_snd), 0);
if (0 > ret) {
return -1;
}
ret = rs_xread(sd, &peer_rcv, sizeof(peer_rcv), 0);
if (0 > ret) {
return -1;
}
*maxsnd = locl_snd + ntohl(peer_rcv);
*maxrcv = locl_rcv + ntohl(peer_snd);
return 0;
}
int rs_inflight_recover(int sd, ring_t ring,
unsigned long rcvseq, unsigned long sndseq,
unsigned *maxsnd, unsigned *maxrcv)
{
unsigned long rseq; /* peer's recv sequence number */
unsigned long nbytes;
unsigned new_maxsnd, new_maxrcv;
/* Exchange sequence numbers */
rcvseq = htonl(rcvseq);
if (0 > rs_xwrite(sd, &rcvseq, sizeof(rcvseq)))
return -1;
if (0 > rs_xread(sd, &rseq, sizeof(rseq), 0))
return -1;
rseq = ntohl(rseq);
/* Discard bytes the receiver has consumed */
rs_set_ring_seq(ring, rseq);
/* Get new buffer sizes (which currently must be the same) */
if (0 > rs_inflight_limits(sd, &new_maxsnd, &new_maxrcv))
return -1;
assert(new_maxsnd == *maxsnd);
assert(new_maxrcv == *maxrcv);
nbytes = rs_ring_nbytes(ring);
if (!nbytes)
return 0; /* Nothing to resend */
/* This write may block as the data is transferred to peer,
but it should be bounded, as we know there's enough room in
our combined TCP buffers to hold it all. */
assert(nbytes <= *maxsnd); /* Otherwise write might block
indefinitely. */
if (0 > rs_xwrite(sd, rs_ring_data(ring), nbytes))
return -1;
return 0;
}