-
Notifications
You must be signed in to change notification settings - Fork 0
/
Receiver.java
executable file
·136 lines (123 loc) · 3.81 KB
/
Receiver.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/** Datagram receiver.
*
* This class provides a receive interface to a datagram socket.
* Specifically, it enables one to test for the presence of an
* incoming packet before attempting a (potentially blocking)
* receive operation.
*/
import java.io.*;
import java.net.*;
import java.util.*;
import java.util.concurrent.*;
public class Receiver implements Runnable {
private Thread myThread; // thread that executes
// run() method
private Sender sndr;
private DatagramSocket sock;
private ArrayBlockingQueue<Packet> rcvq;
private InetSocketAddress peerAdr;
private boolean debug;
Receiver(DatagramSocket sock, InetSocketAddress peerAdr, Sender sndr,
boolean debug) {
this.sock = sock;
this.peerAdr = peerAdr;
this.sndr = sndr;
this.debug = debug;
// initialize queue for received packets
// stores both the packet and socket address of the sender
rcvq = new ArrayBlockingQueue<Packet>(1000, true);
}
/** Instantiate run() thread and start it running. */
public void start() {
myThread = new Thread(this);
myThread.start();
}
/** Wait for thread to quit. */
public void join() throws Exception {
myThread.join();
}
/** Receive thread places incoming packet in a queue.
* This method is run by a separate thread. It simply receives
* packets from the datagram socket and places them in a queue.
* If the queue is full when a packet arrives, it is discarded.
*/
public void run() {
long t0 = System.nanoTime();
long now, eventTime, firstEventTime;
now = eventTime = firstEventTime = 0;
Packet p;
byte[] buf = new byte[2000];
DatagramPacket dg = new DatagramPacket(buf, buf.length);
int rcvCount, rcvAck, discCount;
rcvCount = rcvAck = discCount = 0;
// run until nothing has happened for 5 seconds
while (eventTime == 0 || now < eventTime + 5000000000L) {
now = System.nanoTime() - t0;
try {
sock.receive(dg);
} catch (SocketTimeoutException e) {
continue; // check for termination, then retry
} catch (Exception e) {
System.err.println("Receiver: receive " + "exception: " + e);
System.exit(1);
}
eventTime = now;
// set peerAdr if not yet initialized
// otherwise, that it's the same peer
if (peerAdr == null) {
peerAdr = (InetSocketAddress) dg.getSocketAddress();
sndr.setPeerAdr(peerAdr);
} else if (!dg.getSocketAddress().equals(peerAdr)) {
System.err.println("Receiver: received "
+ "packet from unexpected sender: "
+ dg.getSocketAddress());
System.exit(1);
}
p = new Packet();
if (!p.unpack(dg.getData(), dg.getLength())) {
System.err.println("Receiver: error while "
+ "unpacking packet");
System.exit(1);
}
if (debug) {
System.out.println(sock.getLocalSocketAddress()
+ " received from " + dg.getSocketAddress() + " " + p);
System.out.flush();
}
if (p.type == 0)
rcvCount++;
else
rcvAck++;
if (!rcvq.offer(p))
discCount++; // discard if rcvq full
if (firstEventTime == 0)
firstEventTime = now;
}
System.out.println("Receiver: received " + rcvCount + " data packets, "
+ rcvAck + " acks");
System.out.println(" discarded " + discCount + " arrivals ");
System.out.println(" runLength "
+ (((double) (eventTime - firstEventTime)) / 1000000000));
}
/** Receive a packet.
* @return the next packet that has been received on the socket;
* will block if no packets available
*/
public Packet receive() {
Packet p = null;
try {
p = rcvq.take();
} catch (Exception e) {
System.err.println("Receiver:receive: exception " + e);
System.exit(1);
}
return p;
}
/** Send a packet to a specified destination.
* @param p is packet to be sent
* @param dest is the socket address of the destination
*/
public boolean incoming() {
return rcvq.size() > 0;
}
}