Skip to content

Commit

Permalink
AFDatagramSocket: Allow listen/accept, so we can actually serve SEQPA…
Browse files Browse the repository at this point in the history
…CKET

Previously, SEQPACKET connections were only allowed on socketpair
instances, as client connections, and when working indirectly via passed
FileDescriptors.

Add the missing listen/accept logic to AFDatagramSocket, so we can
provide a SEQPACKET server socket, and add a test case demonstrating its
use.
  • Loading branch information
kohlschuetter committed Oct 27, 2023
1 parent f0bafd4 commit 5d178b0
Show file tree
Hide file tree
Showing 11 changed files with 260 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.net.SocketImpl;
import java.nio.channels.AlreadyBoundException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.IllegalBlockingModeException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.eclipse.jdt.annotation.Nullable;
Expand Down Expand Up @@ -454,4 +455,75 @@ public <T> DatagramSocket setOption(AFSocketOption<T> name, T value) throws IOEx
getAFImpl().getCore().setOption(name, value);
return this;
}

/**
* Accepts a connection to this socket. Note that 1., the socket must be in {@code listen} state
* by calling {@link #bind(SocketAddress)}, followed by {@link #listen(int)}, and 2., the socket
* type must allow listen/accept. This is true for {@link AFSocketType#SOCK_SEQPACKET} AF_UNIX
* sockets, for example.
*
* @return The accepted datagram socket.
* @throws IOException on error.
* @see #listen(int)
*/
public AFDatagramSocket<A> accept() throws IOException {
return accept1(true);
}

/**
* Sets this socket into "listen" state, which allows subsequent calls to {@link #accept()}
* receive any connection attempt. Note that 1., the socket must be bound to a local address using
* {@link #bind(SocketAddress)}, and 2., the socket type must allow listen/accept. This is true
* for {@link AFSocketType#SOCK_SEQPACKET} AF_UNIX sockets, for example.
*
* @param backlog The backlog, or {@code 0} for default.
* @throws IOException on error.
*/
public final void listen(int backlog) throws IOException {
FileDescriptor fdesc = getAFImpl().getCore().validFdOrException();
if (backlog <= 0) {
backlog = 50;
}
NativeUnixSocket.listen(fdesc, backlog);
}

/**
* Returns a new {@link AFDatagramSocket} instance to be used for {@link #accept()}, i.e., no
* {@link FileDescriptor} is associated.
*
* @return The new instance.
* @throws IOException on error.
*/
protected abstract AFDatagramSocket<A> newDatagramSocketInstance() throws IOException;

// CPD-OFF
AFDatagramSocket<A> accept1(boolean throwOnFail) throws IOException {
AFDatagramSocket<A> as = newDatagramSocketInstance();

boolean success = getAFImpl().accept0(as.getAFImpl(false));
if (isClosed()) {
// We may have connected to the socket to unblock it
throw new SocketClosedException("Socket is closed");
}

if (!success) {
if (throwOnFail) {
if (getChannel().isBlocking()) {
// unexpected
return null;
} else {
// non-blocking socket, nothing to accept
throw new IllegalBlockingModeException();
}
} else {
return null;
}
}

as.getAFImpl(true); // trigger create
as.connect(AFSocketAddress.INTERNAL_DUMMY_CONNECT);
as.getAFImpl().updatePorts(getAFImpl().getLocalPort1(), getAFImpl().getRemotePort());

return as;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.newsclub.net.unix;

import static org.newsclub.net.unix.NativeUnixSocket.SHUT_RD_WR;

import java.io.FileDescriptor;
import java.io.IOException;
import java.net.DatagramPacket;
Expand All @@ -40,6 +42,7 @@
* @param <A> The associated address type.
* @author Christian Kohlschütter
*/
@SuppressWarnings("PMD.CyclomaticComplexity")
public abstract class AFDatagramSocketImpl<A extends AFSocketAddress> extends
DatagramSocketImplShim {
private final AFSocketType socketType;
Expand All @@ -49,6 +52,7 @@ public abstract class AFDatagramSocketImpl<A extends AFSocketAddress> extends
private final AtomicBoolean bound = new AtomicBoolean(false);

private final AtomicInteger socketTimeout = new AtomicInteger(0);
private int localPort;
private int remotePort = 0;
private final AFAddressFamily<@NonNull A> addressFamily;
private AFSocketImplExtensions<A> implExtensions = null;
Expand Down Expand Up @@ -401,4 +405,88 @@ protected final synchronized AFSocketImplExtensions<A> getImplExtensions() {
}
return implExtensions;
}

// CPD-OFF
@SuppressWarnings("Finally" /* errorprone */)
final boolean accept0(AFDatagramSocketImpl<A> socket) throws IOException {
FileDescriptor fdesc = core.validFdOrException();
if (isClosed()) {
throw new SocketException("Socket is closed");
} else if (!isBound()) {
throw new SocketException("Socket is not bound");
}

AFSocketAddress socketAddress = core.socketAddress;
AFSocketAddress boundSocketAddress = getLocalSocketAddress();
if (boundSocketAddress != null) {
// Always resolve bound address from wildcard address, etc.
core.socketAddress = socketAddress = boundSocketAddress;
}

if (socketAddress == null) {
throw new SocketException("Socket is not bound");
}

final AFDatagramSocketImpl<A> si = socket;
core.incPendingAccepts();
try {
ByteBuffer ab = socketAddress.getNativeAddressDirectBuffer();

SocketException caught = null;
try {
if (!NativeUnixSocket.accept(ab, ab.limit(), fdesc, si.fd, core.inode.get(), socketTimeout
.get())) {
return false;
}
} catch (SocketException e) { // NOPMD.ExceptionAsFlowControl
caught = e;
} finally { // NOPMD.DoNotThrowExceptionInFinally
if (!isBound() || isClosed()) {
try {
NativeUnixSocket.shutdown(si.fd, SHUT_RD_WR);
} catch (Exception e) {
// ignore
}
try {
NativeUnixSocket.close(si.fd);
} catch (Exception e) {
// ignore
}
if (caught != null) {
throw caught;
} else {
throw new SocketClosedException("Socket is closed");
}
} else if (caught != null) {
throw caught;
}
}
} finally {
core.decPendingAccepts();
}
si.setSocketAddress(socketAddress);
si.connected.set(true);

return true;
}

final int getLocalPort1() {
return localPort;
}

final int getRemotePort() {
return remotePort;
}

final void setSocketAddress(AFSocketAddress socketAddress) {
if (socketAddress == null) {
this.core.socketAddress = null;
this.localPort = -1;
} else {
this.core.socketAddress = socketAddress;
if (this.localPort <= 0) {
this.localPort = socketAddress.getPort();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import java.io.FileDescriptor;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -29,6 +31,7 @@
* @author Christian Kohlschütter
*/
class AFSocketCore extends AFCore {
private final AtomicInteger pendingAccepts = new AtomicInteger(0);
private static final int SHUT_RD_WR = 2;

/**
Expand Down Expand Up @@ -134,4 +137,18 @@ <T> void setOption(AFSocketOption<T> name, T value) throws IOException {
}
}
}

protected void incPendingAccepts() throws SocketException {
if (pendingAccepts.incrementAndGet() >= Integer.MAX_VALUE) {
throw new SocketException("Too many pending accepts");
}
}

protected void decPendingAccepts() throws SocketException {
pendingAccepts.decrementAndGet();
}

protected boolean hasPendingAccepts() {
return pendingAccepts.get() > 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
*/
package org.newsclub.net.unix;

import static org.newsclub.net.unix.NativeUnixSocket.SHUT_RD;
import static org.newsclub.net.unix.NativeUnixSocket.SHUT_RD_WR;
import static org.newsclub.net.unix.NativeUnixSocket.SHUT_WR;

import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.IOException;
Expand Down Expand Up @@ -48,9 +52,6 @@
"PMD.CyclomaticComplexity", "PMD.CouplingBetweenObjects",
"UnsafeFinalization" /* errorprone */})
public abstract class AFSocketImpl<A extends AFSocketAddress> extends SocketImplShim {
private static final int SHUT_RD = 0;
private static final int SHUT_WR = 1;
private static final int SHUT_RD_WR = 2;
private static final int SHUTDOWN_RD_WR = (1 << SHUT_RD) | (1 << SHUT_WR);

private final AFSocketStreamCore core;
Expand Down Expand Up @@ -82,23 +83,11 @@ public abstract class AFSocketImpl<A extends AFSocketAddress> extends SocketImpl
* @author Christian Kohlschütter
*/
static final class AFSocketStreamCore extends AFSocketCore {
private final AtomicInteger pendingAccepts = new AtomicInteger(0);

AFSocketStreamCore(AFSocketImpl<?> observed, FileDescriptor fd,
AncillaryDataSupport ancillaryDataSupport, AFAddressFamily<?> af) {
super(observed, fd, ancillaryDataSupport, af, false);
}

private void incPendingAccepts() throws SocketException {
if (pendingAccepts.incrementAndGet() >= Integer.MAX_VALUE) {
throw new SocketException("Too many pending accepts");
}
}

private void decPendingAccepts() throws SocketException {
pendingAccepts.decrementAndGet();
}

void createSocket(FileDescriptor fdTarget, AFSocketType type) throws IOException {
NativeUnixSocket.createSocket(fdTarget, addressFamily().getDomain(), type.getId());
}
Expand All @@ -113,7 +102,7 @@ protected void unblockAccepts() {
return;
}

while (pendingAccepts.get() > 0) {
while (hasPendingAccepts()) {
try {
FileDescriptor tmpFd = new FileDescriptor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,9 @@ public AFUNIXSocketCredentials getPeerCredentials() throws IOException {
}
return ((AFUNIXDatagramSocketImpl) getAFImpl()).getPeerCredentials();
}

@Override
protected AFDatagramSocket<AFUNIXSocketAddress> newDatagramSocketInstance() throws IOException {
return new AFUNIXDatagramSocket(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ final class NativeUnixSocket {
@SuppressWarnings("StaticAssignmentOfThrowable" /* errorprone */)
private static Throwable initError = null;

static final int SHUT_RD = 0;
static final int SHUT_WR = 1;
static final int SHUT_RD_WR = 2;

@ExcludeFromCodeCoverageGeneratedReport(reason = "unreachable")
private NativeUnixSocket() {
throw new UnsupportedOperationException("No instances");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,22 @@
package org.newsclub.net.unix.domain;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;
import org.newsclub.net.unix.AFDatagramSocket;
import org.newsclub.net.unix.AFSocketCapability;
import org.newsclub.net.unix.AFSocketCapabilityRequirement;
import org.newsclub.net.unix.AFSocketType;
import org.newsclub.net.unix.AFUNIXDatagramChannel;
import org.newsclub.net.unix.AFUNIXDatagramSocket;
import org.newsclub.net.unix.AFUNIXSocketAddress;
import org.newsclub.net.unix.AFUNIXSocketPair;
import org.newsclub.net.unix.OperationNotSupportedSocketException;
Expand All @@ -45,7 +52,7 @@ public DatagramSocketTest() {
}

@Test
public void testSeqPacket() throws Exception {
public void testSeqPacketPair() throws Exception {
AFUNIXSocketPair<AFUNIXDatagramChannel> pair;
try {
pair = AFUNIXSocketPair.openDatagram(AFSocketType.SOCK_SEQPACKET);
Expand All @@ -65,4 +72,44 @@ public void testSeqPacket() throws Exception {

assertEquals(msg, StandardCharsets.UTF_8.decode(dst).toString());
}

@Test
public void testSeqPacket() throws Exception {
boolean gotInstance = false;
try (AFUNIXDatagramSocket s1 = AFUNIXDatagramSocket.newInstance(AFSocketType.SOCK_SEQPACKET)) {
gotInstance = true;
AFUNIXSocketAddress addr = AFUNIXSocketAddress.ofNewTempFile();
s1.bind(addr);
s1.listen(0);

CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> {
try {
AFDatagramSocket<AFUNIXSocketAddress> s;
s = s1.accept();

ByteBuffer bb = ByteBuffer.allocate(1024).order(ByteOrder.LITTLE_ENDIAN);
s.getChannel().receive(bb);
bb.flip();
assertEquals(4, bb.remaining());
assertEquals(0x04030201, bb.getInt());
} catch (IOException e) {
fail(e);
}
});

try (AFUNIXDatagramSocket s2 = AFUNIXDatagramSocket.newInstance(
AFSocketType.SOCK_SEQPACKET)) {
s2.connect(addr);

s2.getChannel().send(ByteBuffer.wrap(new byte[] {1, 2, 3, 4}), addr);
}

cf.get(5, TimeUnit.SECONDS);

} catch (OperationNotSupportedSocketException e) {
if (!gotInstance) {
throw new TestAbortedNotAnIssueException("SEQPACKET not supported", e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,9 @@ public int getNodeIdentity(String name) throws IOException {
public int getNodeIdentity(WellKnownKernelControlNames name) throws IOException {
return getNodeIdentity(name.getControlName());
}

@Override
protected AFDatagramSocket<AFSYSTEMSocketAddress> newDatagramSocketInstance() throws IOException {
return new AFSYSTEMDatagramSocket(null);
}
}
Loading

0 comments on commit 5d178b0

Please sign in to comment.