Prevent “empty catch block” warning
[jFCPlib.git] / src / test / java / net / pterodactylus / fcp / quelaton / DefaultFcpClientTest.java
index f423e97..6e4b807 100644 (file)
@@ -16,6 +16,7 @@ import java.io.File;
 import java.io.IOException;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
@@ -87,45 +88,6 @@ public class DefaultFcpClientTest {
                threadPool.shutdown();
        }
 
-       @Test(expected = ExecutionException.class)
-       public void defaultFcpClientThrowsExceptionIfItCanNotConnect()
-       throws IOException, ExecutionException, InterruptedException {
-               Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
-               fcpServer.connect().get();
-               fcpServer.collectUntil(is("EndMessage"));
-               fcpServer.writeLine(
-                       "CloseConnectionDuplicateClientName",
-                       "EndMessage"
-               );
-               keyPairFuture.get();
-       }
-
-       @Test(expected = ExecutionException.class)
-       public void defaultFcpClientThrowsExceptionIfConnectionIsClosed()
-       throws IOException, ExecutionException, InterruptedException {
-               Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
-               fcpServer.connect().get();
-               fcpServer.collectUntil(is("EndMessage"));
-               fcpServer.close();
-               keyPairFuture.get();
-       }
-
-       @Test
-       public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException {
-               Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine("SSKKeypair",
-                       "InsertURI=" + INSERT_URI + "",
-                       "RequestURI=" + REQUEST_URI + "",
-                       "Identifier=" + identifier,
-                       "EndMessage");
-               FcpKeyPair keyPair = keyPairFuture.get();
-               assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
-               assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
-       }
-
        private void connectNode() throws InterruptedException, ExecutionException, IOException {
                fcpServer.connect().get();
                fcpServer.collectUntil(is("EndMessage"));
@@ -145,30 +107,6 @@ public class DefaultFcpClientTest {
                );
        }
 
-       @Test
-       public void clientGetCanDownloadData() throws InterruptedException, ExecutionException, IOException {
-               Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage("ClientGet", "ReturnType=direct", "URI=KSK@foo.txt"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "AllData",
-                       "Identifier=" + identifier,
-                       "DataLength=6",
-                       "StartupTime=1435610539000",
-                       "CompletionTime=1435610540000",
-                       "Metadata.ContentType=text/plain;charset=utf-8",
-                       "Data",
-                       "Hello"
-               );
-               Optional<Data> data = dataFuture.get();
-               assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
-               assertThat(data.get().size(), is(6L));
-               assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
-                       is("Hello\n".getBytes(StandardCharsets.UTF_8)));
-       }
-
        private String extractIdentifier(List<String> lines) {
                return lines.stream()
                        .filter(s -> s.startsWith("Identifier="))
@@ -177,549 +115,85 @@ public class DefaultFcpClientTest {
                        .orElse("");
        }
 
-       @Test
-       public void clientGetDownloadsDataForCorrectIdentifier()
-       throws InterruptedException, ExecutionException, IOException {
-               Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "AllData",
-                       "Identifier=not-test",
-                       "DataLength=12",
-                       "StartupTime=1435610539000",
-                       "CompletionTime=1435610540000",
-                       "Metadata.ContentType=text/plain;charset=latin-9",
-                       "Data",
-                       "Hello World"
-               );
-               fcpServer.writeLine(
-                       "AllData",
-                       "Identifier=" + identifier,
-                       "DataLength=6",
-                       "StartupTime=1435610539000",
-                       "CompletionTime=1435610540000",
-                       "Metadata.ContentType=text/plain;charset=utf-8",
-                       "Data",
-                       "Hello"
-               );
-               Optional<Data> data = dataFuture.get();
-               assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
-               assertThat(data.get().size(), is(6L));
-               assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
-                       is("Hello\n".getBytes(StandardCharsets.UTF_8)));
-       }
-
-       @Test
-       public void clientGetRecognizesGetFailed() throws InterruptedException, ExecutionException, IOException {
-               Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "GetFailed",
-                       "Identifier=" + identifier,
-                       "Code=3",
-                       "EndMessage"
-               );
-               Optional<Data> data = dataFuture.get();
-               assertThat(data.isPresent(), is(false));
-       }
-
-       @Test
-       public void clientGetRecognizesGetFailedForCorrectIdentifier()
-       throws InterruptedException, ExecutionException, IOException {
-               Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "GetFailed",
-                       "Identifier=not-test",
-                       "Code=3",
-                       "EndMessage"
-               );
-               fcpServer.writeLine(
-                       "GetFailed",
-                       "Identifier=" + identifier,
-                       "Code=3",
-                       "EndMessage"
-               );
-               Optional<Data> data = dataFuture.get();
-               assertThat(data.isPresent(), is(false));
-       }
-
-       @Test(expected = ExecutionException.class)
-       public void clientGetRecognizesConnectionClosed() throws InterruptedException, ExecutionException, IOException {
-               Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
-               fcpServer.close();
-               dataFuture.get();
-       }
-
-       @Test
-       public void defaultFcpClientReusesConnection() throws InterruptedException, ExecutionException, IOException {
-               Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "SSKKeypair",
-                       "InsertURI=" + INSERT_URI + "",
-                       "RequestURI=" + REQUEST_URI + "",
-                       "Identifier=" + identifier,
-                       "EndMessage"
-               );
-               keyPair.get();
-               keyPair = fcpClient.generateKeypair().execute();
-               lines = fcpServer.collectUntil(is("EndMessage"));
-               identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "SSKKeypair",
-                       "InsertURI=" + INSERT_URI + "",
-                       "RequestURI=" + REQUEST_URI + "",
-                       "Identifier=" + identifier,
-                       "EndMessage"
-               );
-               keyPair.get();
-       }
-
-       @Test
-       public void defaultFcpClientCanReconnectAfterConnectionHasBeenClosed()
-       throws InterruptedException, ExecutionException, IOException {
-               Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
-               connectNode();
-               fcpServer.collectUntil(is("EndMessage"));
-               fcpServer.close();
-               try {
-                       keyPair.get();
-                       Assert.fail();
-               } catch (ExecutionException e) {
-               }
-               keyPair = fcpClient.generateKeypair().execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "SSKKeypair",
-                       "InsertURI=" + INSERT_URI + "",
-                       "RequestURI=" + REQUEST_URI + "",
-                       "Identifier=" + identifier,
-                       "EndMessage"
-               );
-               keyPair.get();
-       }
-
-       @Test
-       public void clientGetWithIgnoreDataStoreSettingSendsCorrectCommands()
-       throws InterruptedException, ExecutionException, IOException {
-               fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
-       }
-
-       @Test
-       public void clientGetWithDataStoreOnlySettingSendsCorrectCommands()
-       throws InterruptedException, ExecutionException, IOException {
-               fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
-       }
-
-       @Test
-       public void clientGetWithMaxSizeSettingSendsCorrectCommands()
-       throws InterruptedException, ExecutionException, IOException {
-               fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
-       }
-
-       @Test
-       public void clientGetWithPrioritySettingSendsCorrectCommands()
-       throws InterruptedException, ExecutionException, IOException {
-               fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
-       }
-
-       @Test
-       public void clientGetWithRealTimeSettingSendsCorrectCommands()
-       throws InterruptedException, ExecutionException, IOException {
-               fcpClient.clientGet().realTime().uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
-       }
-
-       @Test
-       public void clientGetWithGlobalSettingSendsCorrectCommands()
-       throws InterruptedException, ExecutionException, IOException {
-               fcpClient.clientGet().global().uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
+       private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
+               return matchesFcpMessageWithTerminator(name, "EndMessage", requiredLines);
        }
 
-       private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
-               return new TypeSafeDiagnosingMatcher<List<String>>() {
+       private Matcher<Iterable<String>> hasHead(String firstElement) {
+               return new TypeSafeDiagnosingMatcher<Iterable<String>>() {
                        @Override
-                       protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
-                               if (!item.get(0).equals(name)) {
-                                       mismatchDescription.appendText("FCP message is named ").appendValue(item.get(0));
+                       protected boolean matchesSafely(Iterable<String> iterable, Description mismatchDescription) {
+                               if (!iterable.iterator().hasNext()) {
+                                       mismatchDescription.appendText("is empty");
                                        return false;
                                }
-                               for (String requiredLine : requiredLines) {
-                                       if (item.indexOf(requiredLine) < 1) {
-                                               mismatchDescription.appendText("FCP message does not contain ").appendValue(requiredLine);
-                                               return false;
-                                       }
+                               String element = iterable.iterator().next();
+                               if (!element.equals(firstElement)) {
+                                       mismatchDescription.appendText("starts with ").appendValue(element);
+                                       return false;
                                }
                                return true;
                        }
 
                        @Override
                        public void describeTo(Description description) {
-                               description.appendText("FCP message named ").appendValue(name);
-                               description.appendValueList(", containing the lines ", ", ", "", requiredLines);
+                               description.appendText("starts with ").appendValue(firstElement);
                        }
                };
        }
 
-       @Test
-       public void clientPutWithDirectDataSendsCorrectCommand()
-       throws IOException, ExecutionException, InterruptedException {
-               fcpClient.clientPut()
-                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                       .length(6)
-                       .uri("KSK@foo.txt")
-                       .execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("Hello"));
-               assertThat(lines, matchesFcpMessage("ClientPut", "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"));
+       private Matcher<List<String>> matchesFcpMessageWithTerminator(
+               String name, String terminator, String... requiredLines) {
+               return allOf(hasHead(name), hasParameters(1, 1, requiredLines), hasTail(terminator));
        }
 
-       @Test
-       public void clientPutWithDirectDataSucceedsOnCorrectIdentifier()
-       throws InterruptedException, ExecutionException, IOException {
-               Future<Optional<Key>> key = fcpClient.clientPut()
-                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                       .length(6)
-                       .uri("KSK@foo.txt")
-                       .execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("Hello"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "PutFailed",
-                       "Identifier=not-the-right-one",
-                       "EndMessage"
-               );
-               fcpServer.writeLine(
-                       "PutSuccessful",
-                       "URI=KSK@foo.txt",
-                       "Identifier=" + identifier,
-                       "EndMessage"
-               );
-               assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
-       }
-
-       @Test
-       public void clientPutWithDirectDataFailsOnCorrectIdentifier()
-       throws InterruptedException, ExecutionException, IOException {
-               Future<Optional<Key>> key = fcpClient.clientPut()
-                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                       .length(6)
-                       .uri("KSK@foo.txt")
-                       .execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("Hello"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "PutSuccessful",
-                       "Identifier=not-the-right-one",
-                       "URI=KSK@foo.txt",
-                       "EndMessage"
-               );
-               fcpServer.writeLine(
-                       "PutFailed",
-                       "Identifier=" + identifier,
-                       "EndMessage"
-               );
-               assertThat(key.get().isPresent(), is(false));
-       }
-
-       @Test
-       public void clientPutWithRenamedDirectDataSendsCorrectCommand()
-       throws InterruptedException, ExecutionException, IOException {
-               fcpClient.clientPut()
-                       .named("otherName.txt")
-                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                       .length(6)
-                       .uri("KSK@foo.txt")
-                       .execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("Hello"));
-               assertThat(lines, matchesFcpMessage("ClientPut", "TargetFilename=otherName.txt", "UploadFrom=direct",
-                       "DataLength=6", "URI=KSK@foo.txt"));
-       }
-
-       @Test
-       public void clientPutWithRedirectSendsCorrectCommand()
-       throws IOException, ExecutionException, InterruptedException {
-               fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines,
-                       matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
-       }
-
-       @Test
-       public void clientPutWithFileSendsCorrectCommand() throws InterruptedException, ExecutionException, IOException {
-               fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines,
-                       matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
-       }
-
-       @Test
-       public void clientPutWithFileCanCompleteTestDdaSequence()
-       throws IOException, ExecutionException, InterruptedException {
-               File tempFile = createTempFile();
-               fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "ProtocolError",
-                       "Identifier=" + identifier,
-                       "Code=25",
-                       "EndMessage"
-               );
-               lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage(
-                       "TestDDARequest",
-                       "Directory=" + tempFile.getParent(),
-                       "WantReadDirectory=true",
-                       "WantWriteDirectory=false",
-                       "EndMessage"
-               ));
-               fcpServer.writeLine(
-                       "TestDDAReply",
-                       "Directory=" + tempFile.getParent(),
-                       "ReadFilename=" + tempFile,
-                       "EndMessage"
-               );
-               lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage(
-                       "TestDDAResponse",
-                       "Directory=" + tempFile.getParent(),
-                       "ReadContent=test-content",
-                       "EndMessage"
-               ));
-               fcpServer.writeLine(
-                       "TestDDAComplete",
-                       "Directory=" + tempFile.getParent(),
-                       "ReadDirectoryAllowed=true",
-                       "EndMessage"
-               );
-               lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines,
-                       matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt",
-                               "Filename=" + new File(tempFile.getParent(), "test.dat")));
-       }
-
-       private File createTempFile() throws IOException {
-               File tempFile = File.createTempFile("test-dda-", ".dat");
-               tempFile.deleteOnExit();
-               Files.write("test-content", tempFile, StandardCharsets.UTF_8);
-               return tempFile;
-       }
-
-       @Test
-       public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
-       throws InterruptedException, ExecutionException, IOException {
-               Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "ProtocolError",
-                       "Identifier=not-the-right-one",
-                       "Code=25",
-                       "EndMessage"
-               );
-               fcpServer.writeLine(
-                       "PutSuccessful",
-                       "Identifier=" + identifier,
-                       "URI=KSK@foo.txt",
-                       "EndMessage"
-               );
-               assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
-       }
-
-       @Test
-       public void clientPutAbortsOnProtocolErrorOtherThan25()
-       throws InterruptedException, ExecutionException, IOException {
-               Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "ProtocolError",
-                       "Identifier=" + identifier,
-                       "Code=1",
-                       "EndMessage"
-               );
-               assertThat(key.get().isPresent(), is(false));
-       }
-
-       @Test
-       public void clientPutDoesNotReplyToWrongTestDdaReply() throws IOException, ExecutionException,
-       InterruptedException {
-               File tempFile = createTempFile();
-               fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "ProtocolError",
-                       "Identifier=" + identifier,
-                       "Code=25",
-                       "EndMessage"
-               );
-               lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage(
-                       "TestDDARequest",
-                       "Directory=" + tempFile.getParent(),
-                       "WantReadDirectory=true",
-                       "WantWriteDirectory=false",
-                       "EndMessage"
-               ));
-               fcpServer.writeLine(
-                       "TestDDAReply",
-                       "Directory=/some-other-directory",
-                       "ReadFilename=" + tempFile,
-                       "EndMessage"
-               );
-               fcpServer.writeLine(
-                       "TestDDAReply",
-                       "Directory=" + tempFile.getParent(),
-                       "ReadFilename=" + tempFile,
-                       "EndMessage"
-               );
-               lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage(
-                       "TestDDAResponse",
-                       "Directory=" + tempFile.getParent(),
-                       "ReadContent=test-content",
-                       "EndMessage"
-               ));
-       }
-
-       @Test
-       public void clientPutSendsResponseEvenIfFileCanNotBeRead()
-       throws IOException, ExecutionException, InterruptedException {
-               File tempFile = createTempFile();
-               fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "ProtocolError",
-                       "Identifier=" + identifier,
-                       "Code=25",
-                       "EndMessage"
-               );
-               lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage(
-                       "TestDDARequest",
-                       "Directory=" + tempFile.getParent(),
-                       "WantReadDirectory=true",
-                       "WantWriteDirectory=false",
-                       "EndMessage"
-               ));
-               fcpServer.writeLine(
-                       "TestDDAReply",
-                       "Directory=" + tempFile.getParent(),
-                       "ReadFilename=" + tempFile + ".foo",
-                       "EndMessage"
-               );
-               lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage(
-                       "TestDDAResponse",
-                       "Directory=" + tempFile.getParent(),
-                       "ReadContent=failed-to-read",
-                       "EndMessage"
-               ));
-       }
-
-       @Test
-       public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
-       throws IOException, ExecutionException, InterruptedException {
-               File tempFile = createTempFile();
-               fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "TestDDAComplete",
-                       "Directory=/some-other-directory",
-                       "EndMessage"
-               );
-               fcpServer.writeLine(
-                       "ProtocolError",
-                       "Identifier=" + identifier,
-                       "Code=25",
-                       "EndMessage"
-               );
-               lines = fcpServer.collectUntil(is("EndMessage"));
-               assertThat(lines, matchesFcpMessage(
-                       "TestDDARequest",
-                       "Directory=" + tempFile.getParent(),
-                       "WantReadDirectory=true",
-                       "WantWriteDirectory=false",
-                       "EndMessage"
-               ));
+       private Matcher<List<String>> hasParameters(int ignoreStart, int ignoreEnd, String... lines) {
+               return new TypeSafeDiagnosingMatcher<List<String>>() {
+                       @Override
+                       protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
+                               if (item.size() < (ignoreStart + ignoreEnd)) {
+                                       mismatchDescription.appendText("has only ").appendValue(item.size()).appendText(" elements");
+                                       return false;
+                               }
+                               for (String line : lines) {
+                                       if ((item.indexOf(line) < ignoreStart) || (item.indexOf(line) >= (item.size() - ignoreEnd))) {
+                                               mismatchDescription.appendText("does not contains ").appendValue(line);
+                                               return false;
+                                       }
+                               }
+                               return true;
+                       }
+
+                       @Override
+                       public void describeTo(Description description) {
+                               description.appendText("contains ").appendValueList("(", ", ", ")", lines);
+                               description.appendText(", ignoring the first ").appendValue(ignoreStart);
+                               description.appendText(" and the last ").appendValue(ignoreEnd);
+                       }
+               };
        }
 
-       @Test
-       public void clientPutSendsNotificationsForGeneratedKeys()
-       throws InterruptedException, ExecutionException, IOException {
-               List<String> generatedKeys = new CopyOnWriteArrayList<>();
-               Future<Optional<Key>> key = fcpClient.clientPut()
-                       .onKeyGenerated(generatedKeys::add)
-                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                       .length(6)
-                       .uri("KSK@foo.txt")
-                       .execute();
-               connectNode();
-               List<String> lines = fcpServer.collectUntil(is("Hello"));
-               String identifier = extractIdentifier(lines);
-               fcpServer.writeLine(
-                       "URIGenerated",
-                       "Identifier=" + identifier,
-                       "URI=KSK@foo.txt",
-                       "EndMessage"
-               );
-               fcpServer.writeLine(
-                       "PutSuccessful",
-                       "URI=KSK@foo.txt",
-                       "Identifier=" + identifier,
-                       "EndMessage"
-               );
-               assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
-               assertThat(generatedKeys, contains("KSK@foo.txt"));
+       private Matcher<List<String>> hasTail(String... lastElements) {
+               return new TypeSafeDiagnosingMatcher<List<String>>() {
+                       @Override
+                       protected boolean matchesSafely(List<String> list, Description mismatchDescription) {
+                               if (list.size() < lastElements.length) {
+                                       mismatchDescription.appendText("is too small");
+                                       return false;
+                               }
+                               List<String> tail = list.subList(list.size() - lastElements.length, list.size());
+                               if (!tail.equals(Arrays.asList(lastElements))) {
+                                       mismatchDescription.appendText("ends with ").appendValueList("(", ", ", ")", tail);
+                                       return false;
+                               }
+                               return true;
+                       }
+
+                       @Override
+                       public void describeTo(Description description) {
+                               description.appendText("ends with ").appendValueList("(", ", ", ")", lastElements);
+                       }
+               };
        }
 
        @Test
@@ -733,8 +207,7 @@ public class DefaultFcpClientTest {
                        "Identifier=" + identifier,
                        "GiveOpennetRef=false",
                        "WithPrivate=false",
-                       "WithVolatile=false",
-                       "EndMessage"
+                       "WithVolatile=false"
                ));
                fcpServer.writeLine(
                        "NodeData",
@@ -761,8 +234,7 @@ public class DefaultFcpClientTest {
                        "Identifier=" + identifier,
                        "GiveOpennetRef=true",
                        "WithPrivate=false",
-                       "WithVolatile=false",
-                       "EndMessage"
+                       "WithVolatile=false"
                ));
                fcpServer.writeLine(
                        "NodeData",
@@ -790,8 +262,7 @@ public class DefaultFcpClientTest {
                        "Identifier=" + identifier,
                        "GiveOpennetRef=false",
                        "WithPrivate=true",
-                       "WithVolatile=false",
-                       "EndMessage"
+                       "WithVolatile=false"
                ));
                fcpServer.writeLine(
                        "NodeData",
@@ -820,8 +291,7 @@ public class DefaultFcpClientTest {
                        "Identifier=" + identifier,
                        "GiveOpennetRef=false",
                        "WithPrivate=false",
-                       "WithVolatile=true",
-                       "EndMessage"
+                       "WithVolatile=true"
                ));
                fcpServer.writeLine(
                        "NodeData",
@@ -847,8 +317,7 @@ public class DefaultFcpClientTest {
                String identifier = extractIdentifier(lines);
                assertThat(lines, matchesFcpMessage(
                        "GetConfig",
-                       "Identifier=" + identifier,
-                       "EndMessage"
+                       "Identifier=" + identifier
                ));
                fcpServer.writeLine(
                        "ConfigData",
@@ -868,8 +337,7 @@ public class DefaultFcpClientTest {
                assertThat(lines, matchesFcpMessage(
                        "GetConfig",
                        "Identifier=" + identifier,
-                       "WithCurrent=true",
-                       "EndMessage"
+                       "WithCurrent=true"
                ));
                fcpServer.writeLine(
                        "ConfigData",
@@ -890,8 +358,7 @@ public class DefaultFcpClientTest {
                assertThat(lines, matchesFcpMessage(
                        "GetConfig",
                        "Identifier=" + identifier,
-                       "WithDefaults=true",
-                       "EndMessage"
+                       "WithDefaults=true"
                ));
                fcpServer.writeLine(
                        "ConfigData",
@@ -912,8 +379,7 @@ public class DefaultFcpClientTest {
                assertThat(lines, matchesFcpMessage(
                        "GetConfig",
                        "Identifier=" + identifier,
-                       "WithSortOrder=true",
-                       "EndMessage"
+                       "WithSortOrder=true"
                ));
                fcpServer.writeLine(
                        "ConfigData",
@@ -934,8 +400,7 @@ public class DefaultFcpClientTest {
                assertThat(lines, matchesFcpMessage(
                        "GetConfig",
                        "Identifier=" + identifier,
-                       "WithExpertFlag=true",
-                       "EndMessage"
+                       "WithExpertFlag=true"
                ));
                fcpServer.writeLine(
                        "ConfigData",
@@ -956,8 +421,7 @@ public class DefaultFcpClientTest {
                assertThat(lines, matchesFcpMessage(
                        "GetConfig",
                        "Identifier=" + identifier,
-                       "WithForceWriteFlag=true",
-                       "EndMessage"
+                       "WithForceWriteFlag=true"
                ));
                fcpServer.writeLine(
                        "ConfigData",
@@ -978,8 +442,7 @@ public class DefaultFcpClientTest {
                assertThat(lines, matchesFcpMessage(
                        "GetConfig",
                        "Identifier=" + identifier,
-                       "WithShortDescription=true",
-                       "EndMessage"
+                       "WithShortDescription=true"
                ));
                fcpServer.writeLine(
                        "ConfigData",
@@ -1000,8 +463,7 @@ public class DefaultFcpClientTest {
                assertThat(lines, matchesFcpMessage(
                        "GetConfig",
                        "Identifier=" + identifier,
-                       "WithLongDescription=true",
-                       "EndMessage"
+                       "WithLongDescription=true"
                ));
                fcpServer.writeLine(
                        "ConfigData",
@@ -1022,8 +484,7 @@ public class DefaultFcpClientTest {
                assertThat(lines, matchesFcpMessage(
                        "GetConfig",
                        "Identifier=" + identifier,
-                       "WithDataTypes=true",
-                       "EndMessage"
+                       "WithDataTypes=true"
                ));
                fcpServer.writeLine(
                        "ConfigData",
@@ -1043,8 +504,7 @@ public class DefaultFcpClientTest {
                assertThat(lines, matchesFcpMessage(
                        "ModifyConfig",
                        "Identifier=" + identifier,
-                       "foo.bar=baz",
-                       "EndMessage"
+                       "foo.bar=baz"
                ));
                fcpServer.writeLine(
                        "ConfigData",
@@ -1065,11 +525,96 @@ public class DefaultFcpClientTest {
        }
 
        private void readMessage(Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
-               lines = fcpServer.collectUntil(is("EndMessage"));
+               readMessage("EndMessage", requestMatcher);
+       }
+
+       private void readMessage(String terminator, Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
+               lines = fcpServer.collectUntil(is(terminator));
                identifier = extractIdentifier(lines);
                assertThat(lines, requestMatcher.get());
        }
 
+       public class ConnectionsAndKeyPairs {
+
+               public class Connections {
+
+                       @Test(expected = ExecutionException.class)
+                       public void throwsExceptionOnFailure() throws IOException, ExecutionException, InterruptedException {
+                               Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
+                               connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
+                               fcpServer.writeLine(
+                                       "CloseConnectionDuplicateClientName",
+                                       "EndMessage"
+                               );
+                               keyPairFuture.get();
+                       }
+
+                       @Test(expected = ExecutionException.class)
+                       public void throwsExceptionIfConnectionIsClosed() throws IOException, ExecutionException, InterruptedException {
+                               Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
+                               connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
+                               fcpServer.close();
+                               keyPairFuture.get();
+                       }
+
+                       @Test
+                       public void connectionIsReused() throws InterruptedException, ExecutionException, IOException {
+                               Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
+                               connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
+                               replyWithKeyPair();
+                               keyPair.get();
+                               keyPair = fcpClient.generateKeypair().execute();
+                               readMessage(() -> matchesFcpMessage("GenerateSSK"));
+                               identifier = extractIdentifier(lines);
+                               replyWithKeyPair();
+                               keyPair.get();
+                       }
+
+                       @Test
+                       public void defaultFcpClientCanReconnectAfterConnectionHasBeenClosed()
+                       throws InterruptedException, ExecutionException, IOException {
+                               Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
+                               connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
+                               fcpServer.close();
+                               try {
+                                       keyPair.get();
+                                       Assert.fail();
+                               } catch (ExecutionException e) {
+                                       /* ignore. */
+                               }
+                               keyPair = fcpClient.generateKeypair().execute();
+                               connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
+                               replyWithKeyPair();
+                               keyPair.get();
+                       }
+
+               }
+
+               public class GenerateKeyPair {
+
+                       @Test
+                       public void defaultFcpClientCanGenerateKeypair()
+                       throws ExecutionException, InterruptedException, IOException {
+                               Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
+                               connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
+                               replyWithKeyPair();
+                               FcpKeyPair keyPair = keyPairFuture.get();
+                               assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
+                               assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
+                       }
+
+               }
+
+               private void replyWithKeyPair() throws IOException {
+                       fcpServer.writeLine("SSKKeypair",
+                               "InsertURI=" + INSERT_URI + "",
+                               "RequestURI=" + REQUEST_URI + "",
+                               "Identifier=" + identifier,
+                               "EndMessage");
+               }
+
+       }
+
        public class Peers {
 
                public class PeerCommands {
@@ -1112,8 +657,7 @@ public class DefaultFcpClientTest {
                                        return matchesFcpMessage(
                                                "ListPeer",
                                                "Identifier=" + identifier,
-                                               "NodeIdentifier=" + nodeId,
-                                               "EndMessage"
+                                               "NodeIdentifier=" + nodeId
                                        );
                                }
 
@@ -1161,8 +705,7 @@ public class DefaultFcpClientTest {
                                        return matchesFcpMessage(
                                                "ListPeers",
                                                "WithVolatile=" + withVolatile,
-                                               "WithMetadata=" + withMetadata,
-                                               "EndMessage"
+                                               "WithMetadata=" + withMetadata
                                        );
                                }
 
@@ -1230,8 +773,7 @@ public class DefaultFcpClientTest {
                                private Matcher<List<String>> matchesAddPeer() {
                                        return matchesFcpMessage(
                                                "AddPeer",
-                                               "Identifier=" + identifier,
-                                               "EndMessage"
+                                               "Identifier=" + identifier
                                        );
                                }
 
@@ -1393,8 +935,7 @@ public class DefaultFcpClientTest {
                                                "ModifyPeer",
                                                "Identifier=" + identifier,
                                                "NodeIdentifier=" + nodeIdentifier,
-                                               setting + "=" + value,
-                                               "EndMessage"
+                                               setting + "=" + value
                                        );
                                }
 
@@ -1438,8 +979,7 @@ public class DefaultFcpClientTest {
                                        return matchesFcpMessage(
                                                "RemovePeer",
                                                "Identifier=" + identifier,
-                                               "NodeIdentifier=" + nodeIdentifier,
-                                               "EndMessage"
+                                               "NodeIdentifier=" + nodeIdentifier
                                        );
                                }
 
@@ -1518,8 +1058,7 @@ public class DefaultFcpClientTest {
                                private Matcher<List<String>> matchesListPeerNotes(String nodeIdentifier) {
                                        return matchesFcpMessage(
                                                "ListPeerNotes",
-                                               "NodeIdentifier=" + nodeIdentifier,
-                                               "EndMessage"
+                                               "NodeIdentifier=" + nodeIdentifier
                                        );
                                }
 
@@ -1595,8 +1134,7 @@ public class DefaultFcpClientTest {
                                                "Identifier=" + identifier,
                                                "NodeIdentifier=" + nodeIdentifier,
                                                "PeerNoteType=1",
-                                               "NoteText=Zm9v",
-                                               "EndMessage"
+                                               "NoteText=Zm9v"
                                        );
                                }
 
@@ -1693,8 +1231,7 @@ public class DefaultFcpClientTest {
                                                "Identifier=" + identifier,
                                                "PluginURL=superPlugin",
                                                "URLType=official",
-                                               "OfficialSource=" + officialSource,
-                                               "EndMessage"
+                                               "OfficialSource=" + officialSource
                                        );
                                }
 
@@ -1735,8 +1272,7 @@ public class DefaultFcpClientTest {
                                                "LoadPlugin",
                                                "Identifier=" + identifier,
                                                "PluginURL=" + url,
-                                               "URLType=" + urlType,
-                                               "EndMessage"
+                                               "URLType=" + urlType
                                        );
                                }
 
@@ -1748,7 +1284,7 @@ public class DefaultFcpClientTest {
                                public void failedLoad() throws ExecutionException, InterruptedException, IOException {
                                        Future<Optional<PluginInfo>> pluginInfo =
                                                fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
-                                       connectAndAssert(() -> matchesFcpMessage("LoadPlugin", "EndMessage"));
+                                       connectAndAssert(() -> matchesFcpMessage("LoadPlugin"));
                                        replyWithProtocolError();
                                        assertThat(pluginInfo.get().isPresent(), is(false));
                                }
@@ -1770,7 +1306,7 @@ public class DefaultFcpClientTest {
                        @Test
                        public void reloadingPluginWorks() throws InterruptedException, ExecutionException, IOException {
                                Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
-                               connectAndAssert(() -> matchReloadPluginMessage());
+                               connectAndAssert(this::matchReloadPluginMessage);
                                replyWithPluginInfo();
                                verifyPluginInfo(pluginInfo);
                        }
@@ -1809,8 +1345,7 @@ public class DefaultFcpClientTest {
                                return matchesFcpMessage(
                                        "ReloadPlugin",
                                        "Identifier=" + identifier,
-                                       "PluginName=" + CLASS_NAME,
-                                       "EndMessage"
+                                       "PluginName=" + CLASS_NAME
                                );
                        }
 
@@ -1821,7 +1356,7 @@ public class DefaultFcpClientTest {
                        @Test
                        public void removingPluginWorks() throws InterruptedException, ExecutionException, IOException {
                                Future<Boolean> pluginRemoved = fcpClient.removePlugin().plugin(CLASS_NAME).execute();
-                               connectAndAssert(() -> matchPluginRemovedMessage());
+                               connectAndAssert(this::matchPluginRemovedMessage);
                                replyWithPluginRemoved();
                                assertThat(pluginRemoved.get(), is(true));
                        }
@@ -1857,8 +1392,7 @@ public class DefaultFcpClientTest {
                                return matchesFcpMessage(
                                        "RemovePlugin",
                                        "Identifier=" + identifier,
-                                       "PluginName=" + CLASS_NAME,
-                                       "EndMessage"
+                                       "PluginName=" + CLASS_NAME
                                );
                        }
 
@@ -1869,7 +1403,7 @@ public class DefaultFcpClientTest {
                        @Test
                        public void gettingPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
                                Future<Optional<PluginInfo>> pluginInfo = fcpClient.getPluginInfo().plugin(CLASS_NAME).execute();
-                               connectAndAssert(() -> matchGetPluginInfoMessage());
+                               connectAndAssert(this::matchGetPluginInfoMessage);
                                replyWithPluginInfo();
                                verifyPluginInfo(pluginInfo);
                        }
@@ -1898,8 +1432,7 @@ public class DefaultFcpClientTest {
                                return matchesFcpMessage(
                                        "GetPluginInfo",
                                        "Identifier=" + identifier,
-                                       "PluginName=" + CLASS_NAME,
-                                       "EndMessage"
+                                       "PluginName=" + CLASS_NAME
                                );
                        }
 
@@ -1914,7 +1447,7 @@ public class DefaultFcpClientTest {
                @Test
                public void subscriptionWorks() throws InterruptedException, ExecutionException, IOException {
                        Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
-                       connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI, "EndMessage"));
+                       connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
                        replyWithSubscribed();
                        assertThat(uskSubscription.get().get().getUri(), is(URI));
                        AtomicInteger edition = new AtomicInteger();
@@ -1932,7 +1465,7 @@ public class DefaultFcpClientTest {
                @Test
                public void subscriptionUpdatesMultipleTimes() throws InterruptedException, ExecutionException, IOException {
                        Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
-                       connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI, "EndMessage"));
+                       connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
                        replyWithSubscribed();
                        assertThat(uskSubscription.get().get().getUri(), is(URI));
                        AtomicInteger edition = new AtomicInteger();
@@ -1950,13 +1483,13 @@ public class DefaultFcpClientTest {
                @Test
                public void subscriptionCanBeCancelled() throws InterruptedException, ExecutionException, IOException {
                        Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
-                       connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI, "EndMessage"));
+                       connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
                        replyWithSubscribed();
                        assertThat(uskSubscription.get().get().getUri(), is(URI));
                        AtomicBoolean updated = new AtomicBoolean();
                        uskSubscription.get().get().onUpdate(e -> updated.set(true));
                        uskSubscription.get().get().cancel();
-                       readMessage(() -> matchesFcpMessage("UnsubscribeUSK", "Identifier=" + identifier, "EndMessage"));
+                       readMessage(() -> matchesFcpMessage("UnsubscribeUSK", "Identifier=" + identifier));
                        sendUpdateNotification(23);
                        assertThat(updated.get(), is(false));
                }
@@ -1984,4 +1517,407 @@ public class DefaultFcpClientTest {
 
        }
 
+       public class ClientGet {
+
+               @Test
+               public void works() throws InterruptedException, ExecutionException, IOException {
+                       Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
+                       connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "ReturnType=direct"));
+                       replyWithAllData("not-test", "Hello World", "text/plain;charset=latin-9");
+                       replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
+                       Optional<Data> data = dataFuture.get();
+                       verifyData(data);
+               }
+
+               @Test
+               public void getFailedIsRecognized() throws InterruptedException, ExecutionException, IOException {
+                       Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
+                       connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
+                       replyWithGetFailed("not-test");
+                       replyWithGetFailed(identifier);
+                       Optional<Data> data = dataFuture.get();
+                       assertThat(data.isPresent(), is(false));
+               }
+
+               @Test
+               public void getFailedForDifferentIdentifierIsIgnored()
+               throws InterruptedException, ExecutionException, IOException {
+                       Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
+                       connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
+                       replyWithGetFailed("not-test");
+                       replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
+                       Optional<Data> data = dataFuture.get();
+                       verifyData(data);
+               }
+
+               @Test(expected = ExecutionException.class)
+               public void connectionClosedIsRecognized() throws InterruptedException, ExecutionException, IOException {
+                       Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
+                       connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
+                       fcpServer.close();
+                       dataFuture.get();
+               }
+
+               @Test
+               public void withIgnoreData() throws InterruptedException, ExecutionException, IOException {
+                       fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt").execute();
+                       connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
+               }
+
+               @Test
+               public void withDataStoreOnly() throws InterruptedException, ExecutionException, IOException {
+                       fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt").execute();
+                       connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
+               }
+
+               @Test
+               public void clientGetWithMaxSizeSettingSendsCorrectCommands()
+               throws InterruptedException, ExecutionException, IOException {
+                       fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt").execute();
+                       connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
+               }
+
+               @Test
+               public void clientGetWithPrioritySettingSendsCorrectCommands()
+               throws InterruptedException, ExecutionException, IOException {
+                       fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt").execute();
+                       connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
+               }
+
+               @Test
+               public void clientGetWithRealTimeSettingSendsCorrectCommands()
+               throws InterruptedException, ExecutionException, IOException {
+                       fcpClient.clientGet().realTime().uri("KSK@foo.txt").execute();
+                       connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
+               }
+
+               @Test
+               public void clientGetWithGlobalSettingSendsCorrectCommands()
+               throws InterruptedException, ExecutionException, IOException {
+                       fcpClient.clientGet().global().uri("KSK@foo.txt").execute();
+                       connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
+               }
+
+               private void replyWithGetFailed(String identifier) throws IOException {
+                       fcpServer.writeLine(
+                               "GetFailed",
+                               "Identifier=" + identifier,
+                               "Code=3",
+                               "EndMessage"
+                       );
+               }
+
+               private void replyWithAllData(String identifier, String text, String contentType) throws IOException {
+                       fcpServer.writeLine(
+                               "AllData",
+                               "Identifier=" + identifier,
+                               "DataLength=" + (text.length() + 1),
+                               "StartupTime=1435610539000",
+                               "CompletionTime=1435610540000",
+                               "Metadata.ContentType=" + contentType,
+                               "Data",
+                               text
+                       );
+               }
+
+               private void verifyData(Optional<Data> data) throws IOException {
+                       assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
+                       assertThat(data.get().size(), is(6L));
+                       assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
+                               is("Hello\n".getBytes(StandardCharsets.UTF_8)));
+               }
+
+       }
+
+       public class ClientPut {
+
+               @Test
+               public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException {
+                       fcpClient.clientPut()
+                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                               .length(6)
+                               .uri("KSK@foo.txt")
+                               .execute();
+                       connectNode();
+                       readMessage("Hello", this::matchesDirectClientPut);
+               }
+
+               @Test
+               public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
+                       Future<Optional<Key>> key = fcpClient.clientPut()
+                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                               .length(6)
+                               .uri("KSK@foo.txt")
+                               .execute();
+                       connectNode();
+                       readMessage("Hello", this::matchesDirectClientPut);
+                       replyWithPutFailed("not-the-right-one");
+                       replyWithPutSuccessful(identifier);
+                       assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
+               }
+
+               @Test
+               public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
+                       Future<Optional<Key>> key = fcpClient.clientPut()
+                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                               .length(6)
+                               .uri("KSK@foo.txt")
+                               .execute();
+                       connectNode();
+                       readMessage("Hello", this::matchesDirectClientPut);
+                       replyWithPutSuccessful("not-the-right-one");
+                       replyWithPutFailed(identifier);
+                       assertThat(key.get().isPresent(), is(false));
+               }
+
+               @Test
+               public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
+                       fcpClient.clientPut()
+                               .named("otherName.txt")
+                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                               .length(6)
+                               .uri("KSK@foo.txt")
+                               .execute();
+                       connectNode();
+                       readMessage("Hello", () -> allOf(
+                               hasHead("ClientPut"),
+                               hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6",
+                                       "URI=KSK@foo.txt"),
+                               hasTail("EndMessage", "Hello")
+                       ));
+               }
+
+               @Test
+               public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException {
+                       fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
+                       connectAndAssert(() ->
+                               matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
+               }
+
+               @Test
+               public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
+                       fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
+                       connectAndAssert(() ->
+                               matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
+               }
+
+               public class DDA {
+
+                       private final File ddaFile;
+                       private final File fileToUpload;
+
+                       public DDA() throws IOException {
+                               ddaFile = createDdaFile();
+                               fileToUpload = new File(ddaFile.getParent(), "test.dat");
+                       }
+
+                       private Matcher<List<String>> matchesFileClientPut(File file) {
+                               return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file);
+                       }
+
+                       @Test
+                       public void completeDda() throws IOException, ExecutionException, InterruptedException {
+                               fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
+                               connectAndAssert(() -> matchesFileClientPut(fileToUpload));
+                               sendDdaRequired(identifier);
+                               readMessage(() -> matchesTestDDARequest(ddaFile));
+                               sendTestDDAReply(ddaFile.getParent(), ddaFile);
+                               readMessage(() -> matchesTestDDAResponse(ddaFile));
+                               writeTestDDAComplete(ddaFile);
+                               readMessage(() -> matchesFileClientPut(fileToUpload));
+                       }
+
+                       @Test
+                       public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException {
+                               fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
+                               connectAndAssert(() -> matchesFileClientPut(fileToUpload));
+                               sendDdaRequired(identifier);
+                               readMessage(() -> matchesTestDDARequest(ddaFile));
+                               sendTestDDAReply("/some-other-directory", ddaFile);
+                               sendTestDDAReply(ddaFile.getParent(), ddaFile);
+                               readMessage(() -> matchesTestDDAResponse(ddaFile));
+                       }
+
+                       @Test
+                       public void sendResponseIfFileUnreadable() throws IOException, ExecutionException, InterruptedException {
+                               fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
+                               connectAndAssert(() -> matchesFileClientPut(fileToUpload));
+                               sendDdaRequired(identifier);
+                               readMessage(() -> matchesTestDDARequest(ddaFile));
+                               sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo"));
+                               readMessage(this::matchesFailedToReadResponse);
+                       }
+
+                       @Test
+                       public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
+                       throws IOException, ExecutionException, InterruptedException {
+                               fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
+                               connectNode();
+                               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+                               String identifier = extractIdentifier(lines);
+                               fcpServer.writeLine(
+                                       "TestDDAComplete",
+                                       "Directory=/some-other-directory",
+                                       "EndMessage"
+                               );
+                               sendDdaRequired(identifier);
+                               lines = fcpServer.collectUntil(is("EndMessage"));
+                               assertThat(lines, matchesFcpMessage(
+                                       "TestDDARequest",
+                                       "Directory=" + ddaFile.getParent(),
+                                       "WantReadDirectory=true",
+                                       "WantWriteDirectory=false"
+                               ));
+                       }
+
+                       private Matcher<List<String>> matchesFailedToReadResponse() {
+                               return matchesFcpMessage(
+                                       "TestDDAResponse",
+                                       "Directory=" + ddaFile.getParent(),
+                                       "ReadContent=failed-to-read"
+                               );
+                       }
+
+                       private void writeTestDDAComplete(File tempFile) throws IOException {
+                               fcpServer.writeLine(
+                                       "TestDDAComplete",
+                                       "Directory=" + tempFile.getParent(),
+                                       "ReadDirectoryAllowed=true",
+                                       "EndMessage"
+                               );
+                       }
+
+                       private Matcher<List<String>> matchesTestDDAResponse(File tempFile) {
+                               return matchesFcpMessage(
+                                       "TestDDAResponse",
+                                       "Directory=" + tempFile.getParent(),
+                                       "ReadContent=test-content"
+                               );
+                       }
+
+                       private void sendTestDDAReply(String directory, File tempFile) throws IOException {
+                               fcpServer.writeLine(
+                                       "TestDDAReply",
+                                       "Directory=" + directory,
+                                       "ReadFilename=" + tempFile,
+                                       "EndMessage"
+                               );
+                       }
+
+                       private Matcher<List<String>> matchesTestDDARequest(File tempFile) {
+                               return matchesFcpMessage(
+                                       "TestDDARequest",
+                                       "Directory=" + tempFile.getParent(),
+                                       "WantReadDirectory=true",
+                                       "WantWriteDirectory=false"
+                               );
+                       }
+
+                       private void sendDdaRequired(String identifier) throws IOException {
+                               fcpServer.writeLine(
+                                       "ProtocolError",
+                                       "Identifier=" + identifier,
+                                       "Code=25",
+                                       "EndMessage"
+                               );
+                       }
+
+               }
+
+               private void replyWithPutSuccessful(String identifier) throws IOException {
+                       fcpServer.writeLine(
+                               "PutSuccessful",
+                               "URI=KSK@foo.txt",
+                               "Identifier=" + identifier,
+                               "EndMessage"
+                       );
+               }
+
+               private void replyWithPutFailed(String identifier) throws IOException {
+                       fcpServer.writeLine(
+                               "PutFailed",
+                               "Identifier=" + identifier,
+                               "EndMessage"
+                       );
+               }
+
+               private Matcher<List<String>> matchesDirectClientPut() {
+                       return allOf(
+                               hasHead("ClientPut"),
+                               hasParameters(1, 2, "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"),
+                               hasTail("EndMessage", "Hello")
+                       );
+               }
+
+               private File createDdaFile() throws IOException {
+                       File tempFile = File.createTempFile("test-dda-", ".dat");
+                       tempFile.deleteOnExit();
+                       Files.write("test-content", tempFile, StandardCharsets.UTF_8);
+                       return tempFile;
+               }
+
+               @Test
+               public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
+               throws InterruptedException, ExecutionException, IOException {
+                       Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
+                       connectNode();
+                       List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+                       String identifier = extractIdentifier(lines);
+                       fcpServer.writeLine(
+                               "ProtocolError",
+                               "Identifier=not-the-right-one",
+                               "Code=25",
+                               "EndMessage"
+                       );
+                       fcpServer.writeLine(
+                               "PutSuccessful",
+                               "Identifier=" + identifier,
+                               "URI=KSK@foo.txt",
+                               "EndMessage"
+                       );
+                       assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
+               }
+
+               @Test
+               public void clientPutAbortsOnProtocolErrorOtherThan25()
+               throws InterruptedException, ExecutionException, IOException {
+                       Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
+                       connectNode();
+                       List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+                       String identifier = extractIdentifier(lines);
+                       fcpServer.writeLine(
+                               "ProtocolError",
+                               "Identifier=" + identifier,
+                               "Code=1",
+                               "EndMessage"
+                       );
+                       assertThat(key.get().isPresent(), is(false));
+               }
+
+               @Test
+               public void clientPutSendsNotificationsForGeneratedKeys()
+               throws InterruptedException, ExecutionException, IOException {
+                       List<String> generatedKeys = new CopyOnWriteArrayList<>();
+                       Future<Optional<Key>> key = fcpClient.clientPut()
+                               .onKeyGenerated(generatedKeys::add)
+                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                               .length(6)
+                               .uri("KSK@foo.txt")
+                               .execute();
+                       connectNode();
+                       List<String> lines = fcpServer.collectUntil(is("Hello"));
+                       String identifier = extractIdentifier(lines);
+                       fcpServer.writeLine(
+                               "URIGenerated",
+                               "Identifier=" + identifier,
+                               "URI=KSK@foo.txt",
+                               "EndMessage"
+                       );
+                       replyWithPutSuccessful(identifier);
+                       assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
+                       assertThat(generatedKeys, contains("KSK@foo.txt"));
+               }
+
+       }
+
 }