private FcpConnection establishConnection() throws IOException {
FcpConnection connection = new FcpConnection(hostname, port);
connection.connect();
- FcpReplySequence<?> nodeHelloSequence = new ClientHelloReplySequence(connection);
+ ClientHelloReplySequence nodeHelloSequence = new ClientHelloReplySequence(connection);
ClientHello clientHello = new ClientHello(clientName.get(), "2.0");
try {
- nodeHelloSequence.send(clientHello).get();
+ if (nodeHelloSequence.send(clientHello).get()) {
+ return connection;
+ }
} catch (InterruptedException | ExecutionException e) {
connection.close();
throw new IOException(String.format("Could not connect to %s:%d.", hostname, port), e);
}
- return connection;
+ connection.close();
+ throw new IOException(String.format("Could not connect to %s:%d.", hostname, port));
}
- private class ClientHelloReplySequence extends FcpReplySequence<Void> {
+ private class ClientHelloReplySequence extends FcpReplySequence<Boolean> {
- private final AtomicReference<NodeHello> receivedNodeHello;
- private final AtomicBoolean receivedClosed;
+ private final AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
+ private final AtomicBoolean receivedClosed = new AtomicBoolean();
public ClientHelloReplySequence(FcpConnection connection) {
super(ClientHelloImpl.this.threadPool, connection);
- receivedNodeHello = new AtomicReference<>();
- receivedClosed = new AtomicBoolean();
}
@Override
}
@Override
+ protected Boolean getResult() {
+ return receivedNodeHello.get() != null;
+ }
+
+ @Override
protected void consumeNodeHello(NodeHello nodeHello) {
receivedNodeHello.set(nodeHello);
}