Issue
I am learning Java NIO and trying to write a simple client/server application using selectors. So far I've got a PwpConnectionManager class that acts as both a server listening for incoming connections and also as a client making connections to remote peers. The remote peers to connect to are added through connectTo() method.
While detecting and accepting incoming connections seems to work, I am unable to trigger OP_CONNECT events when trying to connect to remote peers. I would expect a selection key and selectedKey.isConnectable() to be true after a successfull remote connection has been made. This never happens.
In order to illustrate the problem, I wrote a fairly simple test in the main() method. It starts two instances of PwpConnectionManager, one acting as a server and the other as a client. The client tries to connect to the server and the connection is successfully established at the server side (OP_ACCEPT triggered). However, I expect the OP_CONNECT to be triggered in the client but it never does.
I've read that using localhost as the address to connect to can lead to an instant connection success resulting in channel.connect() returning true and thus never triggering a OP_CONNECT afterwards. However, as you can see in the log output below, isConnected() in processPendingConnections() returns false, so I am expecting OP_CONNECT to eventually be selected by the selector.
Here is the output log after a run of main() (I've run it multiple times with same results):
CLIENT: Waiting for selection events...
SERVER: Waiting for selection events...
CLIENT: Connected to remote peer? false
CLIENT: Waiting for selection events...
SERVER: Accepted remote connection
SERVER: Waiting for selection events...
Main thread done!
CLIENT: A shutdown request was issued, trying to shutdown...
SERVER: A shutdown request was issued, trying to shutdown...
CLIENT: Connection manager was shutdown!
SERVER: Connection manager was shutdown!
Here are my questions:
- Why is OP_CONNECT never triggered in the client?
- Is closing the PwpConnectionManager as in unmanage() correct way to do it, while checking for serverExecutor.isShutdown() in manage()?
- Is it necessary to close accepted SocketChannels when unmanaging() connection manager?
- Any other recommendations and tips?
The code:
/**
*
* A connection manager for PWP-connections. It listens for incoming connections and also
* allows for adding new connections to remote peers.
*
* @author veroslav
*
*/
public class PwpConnectionManager {
public static final String DEFAULT_NETWORK_INTERFACE = "";
private static final int SO_RCVBUF_VALUE = 4 * 1024;
private static final boolean SO_REUSEADDR = true;
private final List<PwpPeer> pendingPeerConnections;
private final ExecutorService serverExecutor;
private final Selector connectionSelector;
private final String name;
private final int maxConnections;
private final int listenPort;
/**
*
* Configure a new connection manager listening on a specified port and serving maxConnections connections
*
* @param name Name identifier for this connection manager (for debugging purposes)
* @param listenPort Port to listen on for incoming connections
* @param maxConnections Max simultaneous connections handled by this connection manager
* @throws IOException If a connection selector can't be opened
*/
public PwpConnectionManager(final String name, final int listenPort, final int maxConnections) throws IOException {
this.name = name;
this.maxConnections = maxConnections;
this.listenPort = listenPort;
pendingPeerConnections = Collections.synchronizedList(new ArrayList<PwpPeer>());
serverExecutor = Executors.newSingleThreadExecutor();
connectionSelector = Selector.open();
}
/**
*
* Start managing the connections (both incoming and outgoing)
*
* @param listenInterface Network interface used to listen for incoming connections
*/
public void manage(final String listenInterface) {
serverExecutor.execute(() -> {
try(final ServerSocketChannel serverChannel = ServerSocketChannel.open()) {
if(!serverChannel.isOpen()) {
final String errorMessage = name + ": Failed to start server on port " +
listenPort + " with interface " + listenInterface;
throw new IOException(errorMessage);
}
setChannelOptions(serverChannel);
serverChannel.bind(getSocketAddressFromNetworkInterface(listenInterface), maxConnections);
serverChannel.configureBlocking(false);
serverChannel.register(connectionSelector, SelectionKey.OP_ACCEPT);
while(true) {
if(serverExecutor.isShutdown()) {
//TODO: Release and shutdown connection channels, release resources
System.out.println(name + ": A shutdown request was issued, trying to shutdown...");
break;
}
System.out.println(name + ": Waiting for selection events...");
final int keysSelected = connectionSelector.select();
if(keysSelected > 0) {
final Set<SelectionKey> selectedKeys = connectionSelector.selectedKeys();
final Iterator<SelectionKey> selectedKeysIterator = selectedKeys.iterator();
while(selectedKeysIterator.hasNext()) {
final SelectionKey selectedKey = selectedKeysIterator.next();
selectedKeysIterator.remove();
if(selectedKey.isValid()) {
handleKeySelection(selectedKey);
}
}
}
//Check for new connection requests to remote peers
processPendingConnections();
}
}
catch(final IOException ioe) {
System.err.println(name + ": An error occured while running the server: " + ioe.getMessage());
}
System.out.println(name + ": Connection manager was shutdown!");
});
}
/**
*
* Initialize the shutdown of selector thread and allow it to die
*/
public void unmanage() {
serverExecutor.shutdown();
connectionSelector.wakeup();
}
/**
*
* Add a new peer and try making a connection to it
*
* @param peer The peer we are attempting to connect to
*/
public void connectTo(final PwpPeer peer) {
synchronized (pendingPeerConnections) {
pendingPeerConnections.add(peer);
}
connectionSelector.wakeup();
}
private void processPendingConnections() {
while(true) {
PwpPeer peer = null;
synchronized(pendingPeerConnections) {
if(!pendingPeerConnections.isEmpty()) {
peer = pendingPeerConnections.remove(0);
}
}
if(peer == null) {
break;
}
//TODO: Offload connection attempt to a worker thread?
try (final SocketChannel peerConnection = SocketChannel.open()) {
peerConnection.configureBlocking(false);
setChannelOptions(peerConnection);
peerConnection.register(connectionSelector, SelectionKey.OP_CONNECT, peer);
final boolean isConnected = peerConnection.connect(new InetSocketAddress(peer.getPeerIp(), peer.getPeerPort()));
System.out.println(name + ": Connected to remote peer? " + isConnected);
}
catch(final IOException ioe) {
System.err.println(name + ": Failed to connect to peer: " + ioe.getMessage());
}
}
}
private void handleKeySelection(final SelectionKey selectedKey) throws IOException {
if(selectedKey.isAcceptable()) {
//Handle a new connection request
final ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectedKey.channel();
try (final SocketChannel connection = serverSocketChannel.accept()){
connection.configureBlocking(false);
final InetSocketAddress connectionAddress = (InetSocketAddress)connection.getRemoteAddress();
final String remotePeerIp = connectionAddress.getAddress().getHostAddress();
final int remotePeerPort = connectionAddress.getPort();
final PwpPeer pwpConnection = new PwpPeer(remotePeerIp, remotePeerPort);
connection.register(selectedKey.selector(), SelectionKey.OP_READ, pwpConnection);
System.out.println(name + ": Accepted remote connection");
} catch (final IOException ioe) {
System.err.println(name + ": Failed to accept incoming connection: " + ioe.getMessage());
}
}
else if(selectedKey.isReadable()) {
//Handle a read attempt to the channel
}
else if(selectedKey.isWritable()) {
//Handle a write attempt to the channel
}
else if(selectedKey.isConnectable()) {
//Handle remote peer accepting our connection attempt
final SocketChannel peerConnection = ((SocketChannel)selectedKey.channel());
try {
if(peerConnection.finishConnect()) {
selectedKey.interestOps(0);
System.out.println(name + ": Successfully connected to the remote peer");
}
} catch (final IOException ioe) {
// Broken connection, disconnect the peer
System.err.println(name + ": Broken connection attempt, disconnecting");
peerConnection.close();
}
}
}
private void setChannelOptions(final NetworkChannel channel) throws IOException {
channel.setOption(StandardSocketOptions.SO_RCVBUF, PwpConnectionManager.SO_RCVBUF_VALUE);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, PwpConnectionManager.SO_REUSEADDR);
}
private InetSocketAddress getSocketAddressFromNetworkInterface(final String networkInterface) {
try {
final NetworkInterface listenInterface = NetworkInterface.getByName(networkInterface);
if(listenInterface == null) {
//Invalid/non-existing network interface specified, use default
return new InetSocketAddress(listenPort);
}
final InetAddress inetAddress = listenInterface.getInetAddresses().nextElement();
return new InetSocketAddress(inetAddress, listenPort);
} catch (final SocketException se) {
//Invalid/non-existing network interface specified, use default
return new InetSocketAddress(listenPort);
}
}
public static void main(String[] args) throws IOException {
final int maxConnections = 50;
final int listenPort = 8006;
final PwpConnectionManager connectionManager = new PwpConnectionManager("SERVER", listenPort, maxConnections);
connectionManager.manage(PwpConnectionManager.DEFAULT_NETWORK_INTERFACE);
final PwpConnectionManager client = new PwpConnectionManager("CLIENT", listenPort + 1, maxConnections);
client.manage(PwpConnectionManager.DEFAULT_NETWORK_INTERFACE);
try {
Thread.sleep(3000);
client.connectTo(new PwpPeer("localhost", listenPort));
Thread.sleep(4000);
} catch (final InterruptedException ie) {
Thread.interrupted();
}
connectionManager.unmanage();
client.unmanage();
System.out.println("Main thread done!");
}
}
Solution
I managed to resolve it on my own. The problem was in the code accepting the client connection at the server side. Using try-with-resources on the newly accepted SocketChannel meant that after establishing the connection, it was immediately closed, as is the case when using try-with-resources (it always makes sure that the resources are closed after the try block has executed).
In this case it meant that it closed the client connection each time the client was trying to connect. That also explains why the client never managed to connect and why OP_CONNECT was never triggered in the client selector. I made changes to handleKeySelection() method as follows in order to fix it:
if(selectedKey.isAcceptable()) {
//Handle a new connection request
System.out.println(name + ": Got an OP_ACCEPT key");
final ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectedKey.channel();
SocketChannel connection = null;
try {
connection = serverSocketChannel.accept();
connection.configureBlocking(false);
final InetSocketAddress connectionAddress = (InetSocketAddress)connection.getRemoteAddress();
final String remotePeerIp = connectionAddress.getAddress().getHostAddress();
final int remotePeerPort = connectionAddress.getPort();
final PwpPeer pwpConnection = new PwpPeer(remotePeerIp, remotePeerPort);
connection.register(selectedKey.selector(), SelectionKey.OP_READ, pwpConnection);
System.out.println(name + ": Accepted remote connection: ip: " + remotePeerIp + ", port " + remotePeerPort);
} catch (final IOException ioe) {
System.err.println(name + ": Failed to accept incoming connection: " + ioe.getMessage());
if(connection != null) {
connection.close();
}
}
}
Answered By - veroslav
Answer Checked By - Candace Johnson (JavaFixing Volunteer)