private class ClientHelloReplySequence extends FcpReplySequence<Boolean> {
private final AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
- private final AtomicBoolean receivedClosed = new AtomicBoolean();
public ClientHelloReplySequence(FcpConnection connection) {
super(ClientHelloImpl.this.threadPool, connection);
@Override
protected boolean isFinished() {
- return receivedNodeHello.get() != null || receivedClosed.get();
+ return receivedNodeHello.get() != null;
}
@Override
receivedNodeHello.set(nodeHello);
}
- @Override
- protected void consumeCloseConnectionDuplicateClientName(
- CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
- receivedClosed.set(true);
- }
-
}
}
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
private final FcpConnection fcpConnection;
private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
private final AtomicReference<String> identifier = new AtomicReference<>();
+ private final AtomicBoolean connectionClosed = new AtomicBoolean();
private final AtomicReference<Throwable> connectionFailureReason = new AtomicReference<>();
public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
messages.add(fcpMessage);
return executorService.submit(() -> {
synchronized (syncObject) {
- while ((connectionFailureReason.get() == null) && (!isFinished() || !messages.isEmpty())) {
+ while (!connectionClosed.get() && (!isFinished() || !messages.isEmpty())) {
while (messages.peek() != null) {
FcpMessage message = messages.poll();
fcpConnection.sendMessage(message);
}
- if (isFinished() || (connectionFailureReason.get() != null)) {
+ if (isFinished() || connectionClosed.get()) {
continue;
}
syncObject.wait();
private void consumeClose(Throwable throwable) {
connectionFailureReason.set(throwable);
+ connectionClosed.set(true);
notifySyncObject();
}
@Override
public final void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
- consumeAlways(this::consumeCloseConnectionDuplicateClientName, closeConnectionDuplicateClientName);
+ connectionFailureReason.set(new IOException("duplicate client name"));
+ connectionClosed.set(true);
+ notifySyncObject();
}
- protected void consumeCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { }
-
@Override
public final void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
consume(this::consumeSSKKeypair, sskKeypair);
waitForASpecificMessage(replySequence::receivedNodeHello, NodeHello.class, NodeHello::new);
}
- @Test
+ @Test(expected = ExecutionException.class)
public void waitingForConnectionClosedDuplicateClientNameWorks() throws IOException, ExecutionException, InterruptedException {
- waitForASpecificMessage( replySequence::receivedCloseConnectionDuplicateClientName, CloseConnectionDuplicateClientName.class, CloseConnectionDuplicateClientName::new);
+ replySequence.setExpectedMessage("");
+ Future<Boolean> result = replySequence.send(fcpMessage);
+ replySequence.receivedCloseConnectionDuplicateClientName(fcpConnection,
+ new CloseConnectionDuplicateClientName(new FcpMessage("CloseConnectionDuplicateClientName")));
+ result.get();
}
@Test
}
@Override
- protected void consumeCloseConnectionDuplicateClientName(
- CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
- gotMessage.set(closeConnectionDuplicateClientName.getName());
- }
-
- @Override
protected void consumeSSKKeypair(SSKKeypair sskKeypair) {
gotMessage.set(sskKeypair.getName());
}