}
@Override
- protected void consumeConnectionClosed(Throwable throwable) {
- failed.set(true);
- }
-
- @Override
public ListenableFuture<Optional<Data>> send(FcpMessage fcpMessage) throws IOException {
identifier.set(fcpMessage.getField("Identifier"));
return super.send(fcpMessage);
sendMessage(originalClientPut.get());
}
- @Override
- protected void consumeConnectionClosed(Throwable throwable) {
- putFinished.set(true);
- }
-
}
}
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
private final FcpConnection fcpConnection;
private final Queue<FcpMessage> messages = new ConcurrentLinkedQueue<>();
private final AtomicReference<String> identifier = new AtomicReference<>();
+ private final AtomicReference<Throwable> connectionFailureReason = new AtomicReference<>();
public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
this.executorService = MoreExecutors.listeningDecorator(executorService);
messages.add(fcpMessage);
return executorService.submit(() -> {
synchronized (syncObject) {
- while (!isFinished() || !messages.isEmpty()) {
+ while ((connectionFailureReason.get() == null) && (!isFinished() || !messages.isEmpty())) {
while (messages.peek() != null) {
FcpMessage message = messages.poll();
fcpConnection.sendMessage(message);
}
- if (isFinished()) {
+ if (isFinished() || (connectionFailureReason.get() != null)) {
continue;
}
syncObject.wait();
}
}
+ Throwable throwable = connectionFailureReason.get();
+ if (throwable != null) {
+ throw new ExecutionException(throwable);
+ }
return getResult();
});
}
}
private void consumeClose(Throwable throwable) {
- consumeConnectionClosed(throwable);
+ connectionFailureReason.set(throwable);
notifySyncObject();
}
consumeClose(throwable);
}
- protected void consumeConnectionClosed(Throwable throwable) { }
-
}
keyPairFuture.get();
}
+ @Test(expected = ExecutionException.class)
+ public void defaultFcpClientThrowsExceptionIfConnectionIsClosed()
+ throws IOException, ExecutionException, InterruptedException {
+ Logger.getAnonymousLogger().getParent().setLevel(Level.FINEST);
+ Logger.getAnonymousLogger().getParent().getHandlers()[0].setLevel(Level.FINEST);
+ Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
+ fcpServer.connect().get();
+ fcpServer.collectUntil(is("EndMessage"));
+ fcpServer.close();
+ keyPairFuture.get();
+ }
+
@Test
public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException {
Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
assertThat(data.isPresent(), is(false));
}
- @Test
+ @Test(expected = ExecutionException.class)
public void clientGetRecognizesConnectionClosed() throws InterruptedException, ExecutionException, IOException {
Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
connectNode();
List<String> lines = fcpServer.collectUntil(is("EndMessage"));
assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
fcpServer.close();
- Optional<Data> data = dataFuture.get();
- assertThat(data.isPresent(), is(false));
+ dataFuture.get();
}
@Test
package net.pterodactylus.fcp.quelaton;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@Test
public void waitingForConnectionClosureWorks() throws IOException, ExecutionException, InterruptedException {
- replySequence.setExpectedMessage("ConnectionClosed");
+ replySequence.setExpectedMessage("none");
Future<Boolean> result = replySequence.send(fcpMessage);
Throwable throwable = new Throwable();
replySequence.connectionClosed(fcpConnection, throwable);
- assertThat(result.get(), is(true));
- assertThat(replySequence.receivedThrowable.get(), is(throwable));
+ try {
+ result.get();
+ } catch (ExecutionException e) {
+ Throwable t = e;
+ while (t.getCause() != null) {
+ t = t.getCause();
+ }
+ assertThat(t, sameInstance(throwable));
+ }
}
@FunctionalInterface
private final AtomicReference<String> gotMessage = new AtomicReference<>();
private final AtomicReference<String> expectedMessage = new AtomicReference<>();
- private final AtomicReference<Throwable> receivedThrowable = new AtomicReference<>();
public TestFcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
super(executorService, fcpConnection);
gotMessage.set(fcpMessage.getName());
}
- @Override
- protected void consumeConnectionClosed(Throwable throwable) {
- receivedThrowable.set(throwable);
- gotMessage.set("ConnectionClosed");
- }
-
}
}