Prevent “empty catch block” warning
[jFCPlib.git] / src / test / java / net / pterodactylus / fcp / quelaton / DefaultFcpClientTest.java
index 3374556..6e4b807 100644 (file)
@@ -88,29 +88,6 @@ public class DefaultFcpClientTest {
                threadPool.shutdown();
        }
 
-       @Test(expected = ExecutionException.class)
-       public void defaultFcpClientThrowsExceptionIfItCanNotConnect()
-       throws IOException, ExecutionException, InterruptedException {
-               Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
-               fcpServer.connect().get();
-               fcpServer.collectUntil(is("EndMessage"));
-               fcpServer.writeLine(
-                       "CloseConnectionDuplicateClientName",
-                       "EndMessage"
-               );
-               keyPairFuture.get();
-       }
-
-       @Test(expected = ExecutionException.class)
-       public void defaultFcpClientThrowsExceptionIfConnectionIsClosed()
-       throws IOException, ExecutionException, InterruptedException {
-               Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
-               fcpServer.connect().get();
-               fcpServer.collectUntil(is("EndMessage"));
-               fcpServer.close();
-               keyPairFuture.get();
-       }
-
        private void connectNode() throws InterruptedException, ExecutionException, IOException {
                fcpServer.connect().get();
                fcpServer.collectUntil(is("EndMessage"));
@@ -130,30 +107,6 @@ public class DefaultFcpClientTest {
                );
        }
 
-       @Test
-       public void clientGetCanDownloadData() throws InterruptedException, ExecutionException, IOException {
-               Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage("ClientGet", "ReturnType=direct", "URI=KSK@foo.txt"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "AllData",
-                       "Identifier=" + identifier,
-                       "DataLength=6",
-                       "StartupTime=1435610539000",
-                       "CompletionTime=1435610540000",
-                       "Metadata.ContentType=text/plain;charset=utf-8",
-                       "Data",
-                       "Hello"
-               );
-               Optional<Data> data = dataFuture.get();
-               assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
-               assertThat(data.get().size(), is(6L));
-               assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
-                       is("Hello\n".getBytes(StandardCharsets.UTF_8)));
-       }
-
        private String extractIdentifier(List<String> lines) {
                return lines.stream()
                        .filter(s -> s.startsWith("Identifier="))
@@ -162,67 +115,10 @@ public class DefaultFcpClientTest {
                        .orElse("");
        }
 
-       @Test
-       public void defaultFcpClientReusesConnection() throws InterruptedException, ExecutionException, IOException {
-               Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "SSKKeypair",
-                       "InsertURI=" + INSERT_URI + "",
-                       "RequestURI=" + REQUEST_URI + "",
-                       "Identifier=" + identifier,
-                       "EndMessage"
-               );
-               keyPair.get();
-               keyPair = fcpClient.generateKeypair().execute();
-               lines = fcpServer.collectUntil(is("EndMessage"));
-               identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "SSKKeypair",
-                       "InsertURI=" + INSERT_URI + "",
-                       "RequestURI=" + REQUEST_URI + "",
-                       "Identifier=" + identifier,
-                       "EndMessage"
-               );
-               keyPair.get();
-       }
-
-       @Test
-       public void defaultFcpClientCanReconnectAfterConnectionHasBeenClosed()
-       throws InterruptedException, ExecutionException, IOException {
-               Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
-               connectNode();
-               fcpServer.collectUntil(is("EndMessage"));
-               fcpServer.close();
-               try {
-                       keyPair.get();
-                       Assert.fail();
-               } catch (ExecutionException e) {
-               }
-               keyPair = fcpClient.generateKeypair().execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "SSKKeypair",
-                       "InsertURI=" + INSERT_URI + "",
-                       "RequestURI=" + REQUEST_URI + "",
-                       "Identifier=" + identifier,
-                       "EndMessage"
-               );
-               keyPair.get();
-       }
-
        private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
                return matchesFcpMessageWithTerminator(name, "EndMessage", requiredLines);
        }
 
-       private Matcher<List<String>> matchesDataMessage(String name, String... requiredLines) {
-               return matchesFcpMessageWithTerminator(name, "Data", requiredLines);
-       }
-
        private Matcher<Iterable<String>> hasHead(String firstElement) {
                return new TypeSafeDiagnosingMatcher<Iterable<String>>() {
                        @Override
@@ -301,333 +197,6 @@ public class DefaultFcpClientTest {
        }
 
        @Test
-       public void clientPutWithDirectDataSendsCorrectCommand()
-       throws IOException, ExecutionException, InterruptedException {
-               fcpClient.clientPut()
-                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                       .length(6)
-                       .uri("KSK@foo.txt")
-                       .execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("Hello"));
-               assertThat(lines, allOf(
-                       hasHead("ClientPut"),
-                       hasParameters(1, 2, "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"),
-                       hasTail("EndMessage", "Hello")
-               ));
-       }
-
-       @Test
-       public void clientPutWithDirectDataSucceedsOnCorrectIdentifier()
-       throws InterruptedException, ExecutionException, IOException {
-               Future<Optional<Key>> key = fcpClient.clientPut()
-                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                       .length(6)
-                       .uri("KSK@foo.txt")
-                       .execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("Hello"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "PutFailed",
-                       "Identifier=not-the-right-one",
-                       "EndMessage"
-               );
-               fcpServer.writeLine(
-                       "PutSuccessful",
-                       "URI=KSK@foo.txt",
-                       "Identifier=" + identifier,
-                       "EndMessage"
-               );
-               assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
-       }
-
-       @Test
-       public void clientPutWithDirectDataFailsOnCorrectIdentifier()
-       throws InterruptedException, ExecutionException, IOException {
-               Future<Optional<Key>> key = fcpClient.clientPut()
-                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                       .length(6)
-                       .uri("KSK@foo.txt")
-                       .execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("Hello"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "PutSuccessful",
-                       "Identifier=not-the-right-one",
-                       "URI=KSK@foo.txt",
-                       "EndMessage"
-               );
-               fcpServer.writeLine(
-                       "PutFailed",
-                       "Identifier=" + identifier,
-                       "EndMessage"
-               );
-               assertThat(key.get().isPresent(), is(false));
-       }
-
-       @Test
-       public void clientPutWithRenamedDirectDataSendsCorrectCommand()
-       throws InterruptedException, ExecutionException, IOException {
-               fcpClient.clientPut()
-                       .named("otherName.txt")
-                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                       .length(6)
-                       .uri("KSK@foo.txt")
-                       .execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("Hello"));
-               assertThat(lines, allOf(
-                       hasHead("ClientPut"),
-                       hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"),
-                       hasTail("EndMessage", "Hello")
-               ));
-       }
-
-       @Test
-       public void clientPutWithRedirectSendsCorrectCommand()
-       throws IOException, ExecutionException, InterruptedException {
-               fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines,
-                       matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
-       }
-
-       @Test
-       public void clientPutWithFileSendsCorrectCommand() throws InterruptedException, ExecutionException, IOException {
-               fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines,
-                       matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
-       }
-
-       @Test
-       public void clientPutWithFileCanCompleteTestDdaSequence()
-       throws IOException, ExecutionException, InterruptedException {
-               File tempFile = createTempFile();
-               fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "ProtocolError",
-                       "Identifier=" + identifier,
-                       "Code=25",
-                       "EndMessage"
-               );
-               lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage(
-                       "TestDDARequest",
-                       "Directory=" + tempFile.getParent(),
-                       "WantReadDirectory=true",
-                       "WantWriteDirectory=false"
-               ));
-               fcpServer.writeLine(
-                       "TestDDAReply",
-                       "Directory=" + tempFile.getParent(),
-                       "ReadFilename=" + tempFile,
-                       "EndMessage"
-               );
-               lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage(
-                       "TestDDAResponse",
-                       "Directory=" + tempFile.getParent(),
-                       "ReadContent=test-content"
-               ));
-               fcpServer.writeLine(
-                       "TestDDAComplete",
-                       "Directory=" + tempFile.getParent(),
-                       "ReadDirectoryAllowed=true",
-                       "EndMessage"
-               );
-               lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines,
-                       matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt",
-                               "Filename=" + new File(tempFile.getParent(), "test.dat")));
-       }
-
-       private File createTempFile() throws IOException {
-               File tempFile = File.createTempFile("test-dda-", ".dat");
-               tempFile.deleteOnExit();
-               Files.write("test-content", tempFile, StandardCharsets.UTF_8);
-               return tempFile;
-       }
-
-       @Test
-       public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
-       throws InterruptedException, ExecutionException, IOException {
-               Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "ProtocolError",
-                       "Identifier=not-the-right-one",
-                       "Code=25",
-                       "EndMessage"
-               );
-               fcpServer.writeLine(
-                       "PutSuccessful",
-                       "Identifier=" + identifier,
-                       "URI=KSK@foo.txt",
-                       "EndMessage"
-               );
-               assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
-       }
-
-       @Test
-       public void clientPutAbortsOnProtocolErrorOtherThan25()
-       throws InterruptedException, ExecutionException, IOException {
-               Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "ProtocolError",
-                       "Identifier=" + identifier,
-                       "Code=1",
-                       "EndMessage"
-               );
-               assertThat(key.get().isPresent(), is(false));
-       }
-
-       @Test
-       public void clientPutDoesNotReplyToWrongTestDdaReply() throws IOException, ExecutionException,
-       InterruptedException {
-               File tempFile = createTempFile();
-               fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "ProtocolError",
-                       "Identifier=" + identifier,
-                       "Code=25",
-                       "EndMessage"
-               );
-               lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage(
-                       "TestDDARequest",
-                       "Directory=" + tempFile.getParent(),
-                       "WantReadDirectory=true",
-                       "WantWriteDirectory=false"
-               ));
-               fcpServer.writeLine(
-                       "TestDDAReply",
-                       "Directory=/some-other-directory",
-                       "ReadFilename=" + tempFile,
-                       "EndMessage"
-               );
-               fcpServer.writeLine(
-                       "TestDDAReply",
-                       "Directory=" + tempFile.getParent(),
-                       "ReadFilename=" + tempFile,
-                       "EndMessage"
-               );
-               lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage(
-                       "TestDDAResponse",
-                       "Directory=" + tempFile.getParent(),
-                       "ReadContent=test-content"
-               ));
-       }
-
-       @Test
-       public void clientPutSendsResponseEvenIfFileCanNotBeRead()
-       throws IOException, ExecutionException, InterruptedException {
-               File tempFile = createTempFile();
-               fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "ProtocolError",
-                       "Identifier=" + identifier,
-                       "Code=25",
-                       "EndMessage"
-               );
-               lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage(
-                       "TestDDARequest",
-                       "Directory=" + tempFile.getParent(),
-                       "WantReadDirectory=true",
-                       "WantWriteDirectory=false"
-               ));
-               fcpServer.writeLine(
-                       "TestDDAReply",
-                       "Directory=" + tempFile.getParent(),
-                       "ReadFilename=" + tempFile + ".foo",
-                       "EndMessage"
-               );
-               lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage(
-                       "TestDDAResponse",
-                       "Directory=" + tempFile.getParent(),
-                       "ReadContent=failed-to-read"
-               ));
-       }
-
-       @Test
-       public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
-       throws IOException, ExecutionException, InterruptedException {
-               File tempFile = createTempFile();
-               fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "TestDDAComplete",
-                       "Directory=/some-other-directory",
-                       "EndMessage"
-               );
-               fcpServer.writeLine(
-                       "ProtocolError",
-                       "Identifier=" + identifier,
-                       "Code=25",
-                       "EndMessage"
-               );
-               lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage(
-                       "TestDDARequest",
-                       "Directory=" + tempFile.getParent(),
-                       "WantReadDirectory=true",
-                       "WantWriteDirectory=false"
-               ));
-       }
-
-       @Test
-       public void clientPutSendsNotificationsForGeneratedKeys()
-       throws InterruptedException, ExecutionException, IOException {
-               List<String> generatedKeys = new CopyOnWriteArrayList<>();
-               Future<Optional<Key>> key = fcpClient.clientPut()
-                       .onKeyGenerated(generatedKeys::add)
-                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                       .length(6)
-                       .uri("KSK@foo.txt")
-                       .execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("Hello"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "URIGenerated",
-                       "Identifier=" + identifier,
-                       "URI=KSK@foo.txt",
-                       "EndMessage"
-               );
-               fcpServer.writeLine(
-                       "PutSuccessful",
-                       "URI=KSK@foo.txt",
-                       "Identifier=" + identifier,
-                       "EndMessage"
-               );
-               assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
-               assertThat(generatedKeys, contains("KSK@foo.txt"));
-       }
-
-       @Test
        public void defaultFcpClientCanGetNodeInformation() throws InterruptedException, ExecutionException, IOException {
                Future<NodeData> nodeData = fcpClient.getNode().execute();
                connectNode();
@@ -956,21 +525,84 @@ public class DefaultFcpClientTest {
        }
 
        private void readMessage(Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
-               lines = fcpServer.collectUntil(is("EndMessage"));
+               readMessage("EndMessage", requestMatcher);
+       }
+
+       private void readMessage(String terminator, Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
+               lines = fcpServer.collectUntil(is(terminator));
                identifier = extractIdentifier(lines);
                assertThat(lines, requestMatcher.get());
        }
 
-       public class GenerateKeyPair {
+       public class ConnectionsAndKeyPairs {
+
+               public class Connections {
+
+                       @Test(expected = ExecutionException.class)
+                       public void throwsExceptionOnFailure() throws IOException, ExecutionException, InterruptedException {
+                               Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
+                               connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
+                               fcpServer.writeLine(
+                                       "CloseConnectionDuplicateClientName",
+                                       "EndMessage"
+                               );
+                               keyPairFuture.get();
+                       }
+
+                       @Test(expected = ExecutionException.class)
+                       public void throwsExceptionIfConnectionIsClosed() throws IOException, ExecutionException, InterruptedException {
+                               Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
+                               connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
+                               fcpServer.close();
+                               keyPairFuture.get();
+                       }
+
+                       @Test
+                       public void connectionIsReused() throws InterruptedException, ExecutionException, IOException {
+                               Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
+                               connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
+                               replyWithKeyPair();
+                               keyPair.get();
+                               keyPair = fcpClient.generateKeypair().execute();
+                               readMessage(() -> matchesFcpMessage("GenerateSSK"));
+                               identifier = extractIdentifier(lines);
+                               replyWithKeyPair();
+                               keyPair.get();
+                       }
+
+                       @Test
+                       public void defaultFcpClientCanReconnectAfterConnectionHasBeenClosed()
+                       throws InterruptedException, ExecutionException, IOException {
+                               Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
+                               connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
+                               fcpServer.close();
+                               try {
+                                       keyPair.get();
+                                       Assert.fail();
+                               } catch (ExecutionException e) {
+                                       /* ignore. */
+                               }
+                               keyPair = fcpClient.generateKeypair().execute();
+                               connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
+                               replyWithKeyPair();
+                               keyPair.get();
+                       }
+
+               }
+
+               public class GenerateKeyPair {
+
+                       @Test
+                       public void defaultFcpClientCanGenerateKeypair()
+                       throws ExecutionException, InterruptedException, IOException {
+                               Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
+                               connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
+                               replyWithKeyPair();
+                               FcpKeyPair keyPair = keyPairFuture.get();
+                               assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
+                               assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
+                       }
 
-               @Test
-               public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException {
-                       Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
-                       connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
-                       replyWithKeyPair();
-                       FcpKeyPair keyPair = keyPairFuture.get();
-                       assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
-                       assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
                }
 
                private void replyWithKeyPair() throws IOException {
@@ -1674,7 +1306,7 @@ public class DefaultFcpClientTest {
                        @Test
                        public void reloadingPluginWorks() throws InterruptedException, ExecutionException, IOException {
                                Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
-                               connectAndAssert(() -> matchReloadPluginMessage());
+                               connectAndAssert(this::matchReloadPluginMessage);
                                replyWithPluginInfo();
                                verifyPluginInfo(pluginInfo);
                        }
@@ -1724,7 +1356,7 @@ public class DefaultFcpClientTest {
                        @Test
                        public void removingPluginWorks() throws InterruptedException, ExecutionException, IOException {
                                Future<Boolean> pluginRemoved = fcpClient.removePlugin().plugin(CLASS_NAME).execute();
-                               connectAndAssert(() -> matchPluginRemovedMessage());
+                               connectAndAssert(this::matchPluginRemovedMessage);
                                replyWithPluginRemoved();
                                assertThat(pluginRemoved.get(), is(true));
                        }
@@ -1771,7 +1403,7 @@ public class DefaultFcpClientTest {
                        @Test
                        public void gettingPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
                                Future<Optional<PluginInfo>> pluginInfo = fcpClient.getPluginInfo().plugin(CLASS_NAME).execute();
-                               connectAndAssert(() -> matchGetPluginInfoMessage());
+                               connectAndAssert(this::matchGetPluginInfoMessage);
                                replyWithPluginInfo();
                                verifyPluginInfo(pluginInfo);
                        }
@@ -1890,7 +1522,7 @@ public class DefaultFcpClientTest {
                @Test
                public void works() throws InterruptedException, ExecutionException, IOException {
                        Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
-                       connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
+                       connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "ReturnType=direct"));
                        replyWithAllData("not-test", "Hello World", "text/plain;charset=latin-9");
                        replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
                        Optional<Data> data = dataFuture.get();
@@ -1901,6 +1533,7 @@ public class DefaultFcpClientTest {
                public void getFailedIsRecognized() throws InterruptedException, ExecutionException, IOException {
                        Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
                        connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
+                       replyWithGetFailed("not-test");
                        replyWithGetFailed(identifier);
                        Optional<Data> data = dataFuture.get();
                        assertThat(data.isPresent(), is(false));
@@ -1996,4 +1629,295 @@ public class DefaultFcpClientTest {
 
        }
 
+       public class ClientPut {
+
+               @Test
+               public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException {
+                       fcpClient.clientPut()
+                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                               .length(6)
+                               .uri("KSK@foo.txt")
+                               .execute();
+                       connectNode();
+                       readMessage("Hello", this::matchesDirectClientPut);
+               }
+
+               @Test
+               public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
+                       Future<Optional<Key>> key = fcpClient.clientPut()
+                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                               .length(6)
+                               .uri("KSK@foo.txt")
+                               .execute();
+                       connectNode();
+                       readMessage("Hello", this::matchesDirectClientPut);
+                       replyWithPutFailed("not-the-right-one");
+                       replyWithPutSuccessful(identifier);
+                       assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
+               }
+
+               @Test
+               public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
+                       Future<Optional<Key>> key = fcpClient.clientPut()
+                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                               .length(6)
+                               .uri("KSK@foo.txt")
+                               .execute();
+                       connectNode();
+                       readMessage("Hello", this::matchesDirectClientPut);
+                       replyWithPutSuccessful("not-the-right-one");
+                       replyWithPutFailed(identifier);
+                       assertThat(key.get().isPresent(), is(false));
+               }
+
+               @Test
+               public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
+                       fcpClient.clientPut()
+                               .named("otherName.txt")
+                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                               .length(6)
+                               .uri("KSK@foo.txt")
+                               .execute();
+                       connectNode();
+                       readMessage("Hello", () -> allOf(
+                               hasHead("ClientPut"),
+                               hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6",
+                                       "URI=KSK@foo.txt"),
+                               hasTail("EndMessage", "Hello")
+                       ));
+               }
+
+               @Test
+               public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException {
+                       fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
+                       connectAndAssert(() ->
+                               matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
+               }
+
+               @Test
+               public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
+                       fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
+                       connectAndAssert(() ->
+                               matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
+               }
+
+               public class DDA {
+
+                       private final File ddaFile;
+                       private final File fileToUpload;
+
+                       public DDA() throws IOException {
+                               ddaFile = createDdaFile();
+                               fileToUpload = new File(ddaFile.getParent(), "test.dat");
+                       }
+
+                       private Matcher<List<String>> matchesFileClientPut(File file) {
+                               return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file);
+                       }
+
+                       @Test
+                       public void completeDda() throws IOException, ExecutionException, InterruptedException {
+                               fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
+                               connectAndAssert(() -> matchesFileClientPut(fileToUpload));
+                               sendDdaRequired(identifier);
+                               readMessage(() -> matchesTestDDARequest(ddaFile));
+                               sendTestDDAReply(ddaFile.getParent(), ddaFile);
+                               readMessage(() -> matchesTestDDAResponse(ddaFile));
+                               writeTestDDAComplete(ddaFile);
+                               readMessage(() -> matchesFileClientPut(fileToUpload));
+                       }
+
+                       @Test
+                       public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException {
+                               fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
+                               connectAndAssert(() -> matchesFileClientPut(fileToUpload));
+                               sendDdaRequired(identifier);
+                               readMessage(() -> matchesTestDDARequest(ddaFile));
+                               sendTestDDAReply("/some-other-directory", ddaFile);
+                               sendTestDDAReply(ddaFile.getParent(), ddaFile);
+                               readMessage(() -> matchesTestDDAResponse(ddaFile));
+                       }
+
+                       @Test
+                       public void sendResponseIfFileUnreadable() throws IOException, ExecutionException, InterruptedException {
+                               fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
+                               connectAndAssert(() -> matchesFileClientPut(fileToUpload));
+                               sendDdaRequired(identifier);
+                               readMessage(() -> matchesTestDDARequest(ddaFile));
+                               sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo"));
+                               readMessage(this::matchesFailedToReadResponse);
+                       }
+
+                       @Test
+                       public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
+                       throws IOException, ExecutionException, InterruptedException {
+                               fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
+                               connectNode();
+                               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+                               String identifier = extractIdentifier(lines);
+                               fcpServer.writeLine(
+                                       "TestDDAComplete",
+                                       "Directory=/some-other-directory",
+                                       "EndMessage"
+                               );
+                               sendDdaRequired(identifier);
+                               lines = fcpServer.collectUntil(is("EndMessage"));
+                               assertThat(lines, matchesFcpMessage(
+                                       "TestDDARequest",
+                                       "Directory=" + ddaFile.getParent(),
+                                       "WantReadDirectory=true",
+                                       "WantWriteDirectory=false"
+                               ));
+                       }
+
+                       private Matcher<List<String>> matchesFailedToReadResponse() {
+                               return matchesFcpMessage(
+                                       "TestDDAResponse",
+                                       "Directory=" + ddaFile.getParent(),
+                                       "ReadContent=failed-to-read"
+                               );
+                       }
+
+                       private void writeTestDDAComplete(File tempFile) throws IOException {
+                               fcpServer.writeLine(
+                                       "TestDDAComplete",
+                                       "Directory=" + tempFile.getParent(),
+                                       "ReadDirectoryAllowed=true",
+                                       "EndMessage"
+                               );
+                       }
+
+                       private Matcher<List<String>> matchesTestDDAResponse(File tempFile) {
+                               return matchesFcpMessage(
+                                       "TestDDAResponse",
+                                       "Directory=" + tempFile.getParent(),
+                                       "ReadContent=test-content"
+                               );
+                       }
+
+                       private void sendTestDDAReply(String directory, File tempFile) throws IOException {
+                               fcpServer.writeLine(
+                                       "TestDDAReply",
+                                       "Directory=" + directory,
+                                       "ReadFilename=" + tempFile,
+                                       "EndMessage"
+                               );
+                       }
+
+                       private Matcher<List<String>> matchesTestDDARequest(File tempFile) {
+                               return matchesFcpMessage(
+                                       "TestDDARequest",
+                                       "Directory=" + tempFile.getParent(),
+                                       "WantReadDirectory=true",
+                                       "WantWriteDirectory=false"
+                               );
+                       }
+
+                       private void sendDdaRequired(String identifier) throws IOException {
+                               fcpServer.writeLine(
+                                       "ProtocolError",
+                                       "Identifier=" + identifier,
+                                       "Code=25",
+                                       "EndMessage"
+                               );
+                       }
+
+               }
+
+               private void replyWithPutSuccessful(String identifier) throws IOException {
+                       fcpServer.writeLine(
+                               "PutSuccessful",
+                               "URI=KSK@foo.txt",
+                               "Identifier=" + identifier,
+                               "EndMessage"
+                       );
+               }
+
+               private void replyWithPutFailed(String identifier) throws IOException {
+                       fcpServer.writeLine(
+                               "PutFailed",
+                               "Identifier=" + identifier,
+                               "EndMessage"
+                       );
+               }
+
+               private Matcher<List<String>> matchesDirectClientPut() {
+                       return allOf(
+                               hasHead("ClientPut"),
+                               hasParameters(1, 2, "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"),
+                               hasTail("EndMessage", "Hello")
+                       );
+               }
+
+               private File createDdaFile() throws IOException {
+                       File tempFile = File.createTempFile("test-dda-", ".dat");
+                       tempFile.deleteOnExit();
+                       Files.write("test-content", tempFile, StandardCharsets.UTF_8);
+                       return tempFile;
+               }
+
+               @Test
+               public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
+               throws InterruptedException, ExecutionException, IOException {
+                       Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
+                       connectNode();
+                       List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+                       String identifier = extractIdentifier(lines);
+                       fcpServer.writeLine(
+                               "ProtocolError",
+                               "Identifier=not-the-right-one",
+                               "Code=25",
+                               "EndMessage"
+                       );
+                       fcpServer.writeLine(
+                               "PutSuccessful",
+                               "Identifier=" + identifier,
+                               "URI=KSK@foo.txt",
+                               "EndMessage"
+                       );
+                       assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
+               }
+
+               @Test
+               public void clientPutAbortsOnProtocolErrorOtherThan25()
+               throws InterruptedException, ExecutionException, IOException {
+                       Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
+                       connectNode();
+                       List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+                       String identifier = extractIdentifier(lines);
+                       fcpServer.writeLine(
+                               "ProtocolError",
+                               "Identifier=" + identifier,
+                               "Code=1",
+                               "EndMessage"
+                       );
+                       assertThat(key.get().isPresent(), is(false));
+               }
+
+               @Test
+               public void clientPutSendsNotificationsForGeneratedKeys()
+               throws InterruptedException, ExecutionException, IOException {
+                       List<String> generatedKeys = new CopyOnWriteArrayList<>();
+                       Future<Optional<Key>> key = fcpClient.clientPut()
+                               .onKeyGenerated(generatedKeys::add)
+                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                               .length(6)
+                               .uri("KSK@foo.txt")
+                               .execute();
+                       connectNode();
+                       List<String> lines = fcpServer.collectUntil(is("Hello"));
+                       String identifier = extractIdentifier(lines);
+                       fcpServer.writeLine(
+                               "URIGenerated",
+                               "Identifier=" + identifier,
+                               "URI=KSK@foo.txt",
+                               "EndMessage"
+                       );
+                       replyWithPutSuccessful(identifier);
+                       assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
+                       assertThat(generatedKeys, contains("KSK@foo.txt"));
+               }
+
+       }
+
 }