Send progress updates from ClientPutDiskDir
[jFCPlib.git] / src / test / java / net / pterodactylus / fcp / quelaton / DefaultFcpClientTest.java
index e526f2f..f238d5a 100644 (file)
@@ -58,10 +58,7 @@ import org.hamcrest.Matchers;
 import org.hamcrest.TypeSafeDiagnosingMatcher;
 import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 
 /**
@@ -1333,316 +1330,325 @@ public class DefaultFcpClientTest {
 
        }
 
-       public class ClientPut {
+       public class PutCommands {
 
-               @Test
-               public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException {
-                       fcpClient.clientPut()
-                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                               .length(6)
-                               .uri("KSK@foo.txt")
-                               .execute();
-                       connectNode();
-                       readMessage("Hello", this::matchesDirectClientPut);
-               }
-
-               @Test
-               public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
-                       Future<Optional<Key>> key = fcpClient.clientPut()
-                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                               .length(6)
-                               .uri("KSK@foo.txt")
-                               .execute();
-                       connectNode();
-                       readMessage("Hello", this::matchesDirectClientPut);
-                       replyWithPutFailed("not-the-right-one");
-                       replyWithPutSuccessful(identifier);
-                       assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
-               }
-
-               @Test
-               public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
-                       Future<Optional<Key>> key = fcpClient.clientPut()
-                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                               .length(6)
-                               .uri("KSK@foo.txt")
-                               .execute();
-                       connectNode();
-                       readMessage("Hello", this::matchesDirectClientPut);
-                       replyWithPutSuccessful("not-the-right-one");
-                       replyWithPutFailed(identifier);
-                       assertThat(key.get().isPresent(), is(false));
-               }
-
-               @Test
-               public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
-                       fcpClient.clientPut()
-                               .named("otherName.txt")
-                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                               .length(6)
-                               .uri("KSK@foo.txt")
-                               .execute();
-                       connectNode();
-                       readMessage("Hello", () -> allOf(
-                               hasHead("ClientPut"),
-                               hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6",
-                                       "URI=KSK@foo.txt"),
-                               hasTail("EndMessage", "Hello")
-                       ));
-               }
-
-               @Test
-               public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException {
-                       fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
-                       connectAndAssert(() ->
-                               matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
-               }
-
-               @Test
-               public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
-                       fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
-                       connectAndAssert(() ->
-                               matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
-               }
-
-               public class DDA {
+               public class ClientPut {
 
-                       private final File ddaFile;
-                       private final File fileToUpload;
-
-                       public DDA() throws IOException {
-                               ddaFile = createDdaFile();
-                               fileToUpload = new File(ddaFile.getParent(), "test.dat");
+                       @Test
+                       public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException {
+                               fcpClient.clientPut()
+                                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                                       .length(6)
+                                       .uri("KSK@foo.txt")
+                                       .execute();
+                               connectNode();
+                               readMessage("Hello", this::matchesDirectClientPut);
                        }
 
-                       private Matcher<List<String>> matchesFileClientPut(File file) {
-                               return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file);
+                       @Test
+                       public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
+                               Future<Optional<Key>> key = fcpClient.clientPut()
+                                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                                       .length(6)
+                                       .uri("KSK@foo.txt")
+                                       .execute();
+                               connectNode();
+                               readMessage("Hello", this::matchesDirectClientPut);
+                               replyWithPutFailed("not-the-right-one");
+                               replyWithPutSuccessful(identifier);
+                               assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
                        }
 
                        @Test
-                       public void completeDda() throws IOException, ExecutionException, InterruptedException {
-                               fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
-                               connectAndAssert(() -> matchesFileClientPut(fileToUpload));
-                               sendDdaRequired(identifier);
-                               readMessage(() -> matchesTestDDARequest(ddaFile));
-                               sendTestDDAReply(ddaFile.getParent(), ddaFile);
-                               readMessage(() -> matchesTestDDAResponse(ddaFile));
-                               writeTestDDAComplete(ddaFile);
-                               readMessage(() -> matchesFileClientPut(fileToUpload));
+                       public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
+                               Future<Optional<Key>> key = fcpClient.clientPut()
+                                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                                       .length(6)
+                                       .uri("KSK@foo.txt")
+                                       .execute();
+                               connectNode();
+                               readMessage("Hello", this::matchesDirectClientPut);
+                               replyWithPutSuccessful("not-the-right-one");
+                               replyWithPutFailed(identifier);
+                               assertThat(key.get().isPresent(), is(false));
                        }
 
                        @Test
-                       public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException {
-                               fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
-                               connectAndAssert(() -> matchesFileClientPut(fileToUpload));
-                               sendDdaRequired(identifier);
-                               readMessage(() -> matchesTestDDARequest(ddaFile));
-                               sendTestDDAReply("/some-other-directory", ddaFile);
-                               sendTestDDAReply(ddaFile.getParent(), ddaFile);
-                               readMessage(() -> matchesTestDDAResponse(ddaFile));
+                       public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
+                               fcpClient.clientPut()
+                                       .named("otherName.txt")
+                                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                                       .length(6)
+                                       .uri("KSK@foo.txt")
+                                       .execute();
+                               connectNode();
+                               readMessage("Hello", () -> allOf(
+                                       hasHead("ClientPut"),
+                                       hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6",
+                                               "URI=KSK@foo.txt"),
+                                       hasTail("EndMessage", "Hello")
+                               ));
                        }
 
                        @Test
-                       public void sendResponseIfFileUnreadable() throws IOException, ExecutionException, InterruptedException {
-                               fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
-                               connectAndAssert(() -> matchesFileClientPut(fileToUpload));
-                               sendDdaRequired(identifier);
-                               readMessage(() -> matchesTestDDARequest(ddaFile));
-                               sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo"));
-                               readMessage(this::matchesFailedToReadResponse);
+                       public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException {
+                               fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
+                               connectAndAssert(() ->
+                                       matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
                        }
 
                        @Test
-                       public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
-                       throws IOException, ExecutionException, InterruptedException {
-                               fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
-                               connectNode();
-                               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-                               String identifier = extractIdentifier(lines);
-                               fcpServer.writeLine(
-                                       "TestDDAComplete",
-                                       "Directory=/some-other-directory",
-                                       "EndMessage"
-                               );
-                               sendDdaRequired(identifier);
-                               lines = fcpServer.collectUntil(is("EndMessage"));
-                               assertThat(lines, matchesFcpMessage(
-                                       "TestDDARequest",
-                                       "Directory=" + ddaFile.getParent(),
-                                       "WantReadDirectory=true",
-                                       "WantWriteDirectory=false"
-                               ));
+                       public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
+                               fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
+                               connectAndAssert(() ->
+                                       matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
                        }
 
-                       private Matcher<List<String>> matchesFailedToReadResponse() {
-                               return matchesFcpMessage(
-                                       "TestDDAResponse",
-                                       "Directory=" + ddaFile.getParent(),
-                                       "ReadContent=failed-to-read"
+                       public class DDA {
+
+                               private final File ddaFile;
+                               private final File fileToUpload;
+
+                               public DDA() throws IOException {
+                                       ddaFile = createDdaFile();
+                                       fileToUpload = new File(ddaFile.getParent(), "test.dat");
+                               }
+
+                               private Matcher<List<String>> matchesFileClientPut(File file) {
+                                       return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file);
+                               }
+
+                               @Test
+                               public void completeDda() throws IOException, ExecutionException, InterruptedException {
+                                       fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
+                                       connectAndAssert(() -> matchesFileClientPut(fileToUpload));
+                                       sendDdaRequired(identifier);
+                                       readMessage(() -> matchesTestDDARequest(ddaFile));
+                                       sendTestDDAReply(ddaFile.getParent(), ddaFile);
+                                       readMessage(() -> matchesTestDDAResponse(ddaFile));
+                                       writeTestDDAComplete(ddaFile);
+                                       readMessage(() -> matchesFileClientPut(fileToUpload));
+                               }
+
+                               @Test
+                               public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException {
+                                       fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
+                                       connectAndAssert(() -> matchesFileClientPut(fileToUpload));
+                                       sendDdaRequired(identifier);
+                                       readMessage(() -> matchesTestDDARequest(ddaFile));
+                                       sendTestDDAReply("/some-other-directory", ddaFile);
+                                       sendTestDDAReply(ddaFile.getParent(), ddaFile);
+                                       readMessage(() -> matchesTestDDAResponse(ddaFile));
+                               }
+
+                               @Test
+                               public void sendResponseIfFileUnreadable()
+                               throws IOException, ExecutionException, InterruptedException {
+                                       fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
+                                       connectAndAssert(() -> matchesFileClientPut(fileToUpload));
+                                       sendDdaRequired(identifier);
+                                       readMessage(() -> matchesTestDDARequest(ddaFile));
+                                       sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo"));
+                                       readMessage(this::matchesFailedToReadResponse);
+                               }
+
+                               @Test
+                               public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
+                               throws IOException, ExecutionException, InterruptedException {
+                                       fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
+                                       connectNode();
+                                       List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+                                       String identifier = extractIdentifier(lines);
+                                       fcpServer.writeLine(
+                                               "TestDDAComplete",
+                                               "Directory=/some-other-directory",
+                                               "EndMessage"
+                                       );
+                                       sendDdaRequired(identifier);
+                                       lines = fcpServer.collectUntil(is("EndMessage"));
+                                       assertThat(lines, matchesFcpMessage(
+                                               "TestDDARequest",
+                                               "Directory=" + ddaFile.getParent(),
+                                               "WantReadDirectory=true",
+                                               "WantWriteDirectory=false"
+                                       ));
+                               }
+
+                               private Matcher<List<String>> matchesFailedToReadResponse() {
+                                       return matchesFcpMessage(
+                                               "TestDDAResponse",
+                                               "Directory=" + ddaFile.getParent(),
+                                               "ReadContent=failed-to-read"
+                                       );
+                               }
+
+                               private void writeTestDDAComplete(File tempFile) throws IOException {
+                                       fcpServer.writeLine(
+                                               "TestDDAComplete",
+                                               "Directory=" + tempFile.getParent(),
+                                               "ReadDirectoryAllowed=true",
+                                               "EndMessage"
+                                       );
+                               }
+
+                               private Matcher<List<String>> matchesTestDDAResponse(File tempFile) {
+                                       return matchesFcpMessage(
+                                               "TestDDAResponse",
+                                               "Directory=" + tempFile.getParent(),
+                                               "ReadContent=test-content"
+                                       );
+                               }
+
+                               private void sendTestDDAReply(String directory, File tempFile) throws IOException {
+                                       fcpServer.writeLine(
+                                               "TestDDAReply",
+                                               "Directory=" + directory,
+                                               "ReadFilename=" + tempFile,
+                                               "EndMessage"
+                                       );
+                               }
+
+                               private Matcher<List<String>> matchesTestDDARequest(File tempFile) {
+                                       return matchesFcpMessage(
+                                               "TestDDARequest",
+                                               "Directory=" + tempFile.getParent(),
+                                               "WantReadDirectory=true",
+                                               "WantWriteDirectory=false"
+                                       );
+                               }
+
+                               private void sendDdaRequired(String identifier) throws IOException {
+                                       fcpServer.writeLine(
+                                               "ProtocolError",
+                                               "Identifier=" + identifier,
+                                               "Code=25",
+                                               "EndMessage"
+                                       );
+                               }
+
+                       }
+
+                       private void replyWithPutSuccessful(String identifier) throws IOException {
+                               fcpServer.writeLine(
+                                       "PutSuccessful",
+                                       "URI=KSK@foo.txt",
+                                       "Identifier=" + identifier,
+                                       "EndMessage"
                                );
                        }
 
-                       private void writeTestDDAComplete(File tempFile) throws IOException {
+                       private void replyWithPutFailed(String identifier) throws IOException {
                                fcpServer.writeLine(
-                                       "TestDDAComplete",
-                                       "Directory=" + tempFile.getParent(),
-                                       "ReadDirectoryAllowed=true",
+                                       "PutFailed",
+                                       "Identifier=" + identifier,
                                        "EndMessage"
                                );
                        }
 
-                       private Matcher<List<String>> matchesTestDDAResponse(File tempFile) {
-                               return matchesFcpMessage(
-                                       "TestDDAResponse",
-                                       "Directory=" + tempFile.getParent(),
-                                       "ReadContent=test-content"
+                       private Matcher<List<String>> matchesDirectClientPut(String... additionalLines) {
+                               List<String> lines =
+                                       new ArrayList<>(Arrays.asList("UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"));
+                               Arrays.asList(additionalLines).forEach(lines::add);
+                               return allOf(
+                                       hasHead("ClientPut"),
+                                       hasParameters(1, 2, lines.toArray(new String[lines.size()])),
+                                       hasTail("EndMessage", "Hello")
                                );
                        }
 
-                       private void sendTestDDAReply(String directory, File tempFile) throws IOException {
+                       private File createDdaFile() throws IOException {
+                               File tempFile = File.createTempFile("test-dda-", ".dat");
+                               tempFile.deleteOnExit();
+                               Files.write("test-content", tempFile, StandardCharsets.UTF_8);
+                               return tempFile;
+                       }
+
+                       @Test
+                       public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
+                       throws InterruptedException, ExecutionException, IOException {
+                               Future<Optional<Key>> key =
+                                       fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
+                               connectNode();
+                               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+                               String identifier = extractIdentifier(lines);
                                fcpServer.writeLine(
-                                       "TestDDAReply",
-                                       "Directory=" + directory,
-                                       "ReadFilename=" + tempFile,
+                                       "ProtocolError",
+                                       "Identifier=not-the-right-one",
+                                       "Code=25",
                                        "EndMessage"
                                );
-                       }
-
-                       private Matcher<List<String>> matchesTestDDARequest(File tempFile) {
-                               return matchesFcpMessage(
-                                       "TestDDARequest",
-                                       "Directory=" + tempFile.getParent(),
-                                       "WantReadDirectory=true",
-                                       "WantWriteDirectory=false"
+                               fcpServer.writeLine(
+                                       "PutSuccessful",
+                                       "Identifier=" + identifier,
+                                       "URI=KSK@foo.txt",
+                                       "EndMessage"
                                );
+                               assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
                        }
 
-                       private void sendDdaRequired(String identifier) throws IOException {
+                       @Test
+                       public void clientPutAbortsOnProtocolErrorOtherThan25()
+                       throws InterruptedException, ExecutionException, IOException {
+                               Future<Optional<Key>> key =
+                                       fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
+                               connectNode();
+                               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+                               String identifier = extractIdentifier(lines);
                                fcpServer.writeLine(
                                        "ProtocolError",
                                        "Identifier=" + identifier,
-                                       "Code=25",
+                                       "Code=1",
                                        "EndMessage"
                                );
+                               assertThat(key.get().isPresent(), is(false));
                        }
 
-               }
-
-               private void replyWithPutSuccessful(String identifier) throws IOException {
-                       fcpServer.writeLine(
-                               "PutSuccessful",
-                               "URI=KSK@foo.txt",
-                               "Identifier=" + identifier,
-                               "EndMessage"
-                       );
-               }
-
-               private void replyWithPutFailed(String identifier) throws IOException {
-                       fcpServer.writeLine(
-                               "PutFailed",
-                               "Identifier=" + identifier,
-                               "EndMessage"
-                       );
-               }
-
-               private Matcher<List<String>> matchesDirectClientPut(String... additionalLines) {
-                       List<String> lines = new ArrayList<>(Arrays.asList("UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"));
-                       Arrays.asList(additionalLines).forEach(lines::add);
-                       return allOf(
-                               hasHead("ClientPut"),
-                               hasParameters(1, 2, lines.toArray(new String[lines.size()])),
-                               hasTail("EndMessage", "Hello")
-                       );
-               }
-
-               private File createDdaFile() throws IOException {
-                       File tempFile = File.createTempFile("test-dda-", ".dat");
-                       tempFile.deleteOnExit();
-                       Files.write("test-content", tempFile, StandardCharsets.UTF_8);
-                       return tempFile;
-               }
-
-               @Test
-               public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
-               throws InterruptedException, ExecutionException, IOException {
-                       Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
-                       connectNode();
-                       List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-                       String identifier = extractIdentifier(lines);
-                       fcpServer.writeLine(
-                               "ProtocolError",
-                               "Identifier=not-the-right-one",
-                               "Code=25",
-                               "EndMessage"
-                       );
-                       fcpServer.writeLine(
-                               "PutSuccessful",
-                               "Identifier=" + identifier,
-                               "URI=KSK@foo.txt",
-                               "EndMessage"
-                       );
-                       assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
-               }
-
-               @Test
-               public void clientPutAbortsOnProtocolErrorOtherThan25()
-               throws InterruptedException, ExecutionException, IOException {
-                       Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
-                       connectNode();
-                       List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-                       String identifier = extractIdentifier(lines);
-                       fcpServer.writeLine(
-                               "ProtocolError",
-                               "Identifier=" + identifier,
-                               "Code=1",
-                               "EndMessage"
-                       );
-                       assertThat(key.get().isPresent(), is(false));
-               }
+                       @Test
+                       public void clientPutSendsNotificationsForGeneratedKeys()
+                       throws InterruptedException, ExecutionException, IOException {
+                               List<String> generatedKeys = new CopyOnWriteArrayList<>();
+                               Future<Optional<Key>> key = fcpClient.clientPut()
+                                       .onKeyGenerated(generatedKeys::add)
+                                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                                       .length(6)
+                                       .uri("KSK@foo.txt")
+                                       .execute();
+                               connectNode();
+                               List<String> lines = fcpServer.collectUntil(is("Hello"));
+                               String identifier = extractIdentifier(lines);
+                               fcpServer.writeLine(
+                                       "URIGenerated",
+                                       "Identifier=" + identifier,
+                                       "URI=KSK@foo.txt",
+                                       "EndMessage"
+                               );
+                               replyWithPutSuccessful(identifier);
+                               assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
+                               assertThat(generatedKeys, contains("KSK@foo.txt"));
+                       }
 
-               @Test
-               public void clientPutSendsNotificationsForGeneratedKeys()
-               throws InterruptedException, ExecutionException, IOException {
-                       List<String> generatedKeys = new CopyOnWriteArrayList<>();
-                       Future<Optional<Key>> key = fcpClient.clientPut()
-                               .onKeyGenerated(generatedKeys::add)
-                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                               .length(6)
-                               .uri("KSK@foo.txt")
-                               .execute();
-                       connectNode();
-                       List<String> lines = fcpServer.collectUntil(is("Hello"));
-                       String identifier = extractIdentifier(lines);
-                       fcpServer.writeLine(
-                               "URIGenerated",
-                               "Identifier=" + identifier,
-                               "URI=KSK@foo.txt",
-                               "EndMessage"
-                       );
-                       replyWithPutSuccessful(identifier);
-                       assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
-                       assertThat(generatedKeys, contains("KSK@foo.txt"));
-               }
+                       @Test
+                       public void clientPutSendsNotificationOnProgress()
+                       throws InterruptedException, ExecutionException, IOException {
+                               List<RequestProgress> requestProgress = new ArrayList<>();
+                               Future<Optional<Key>> key = fcpClient.clientPut()
+                                       .onProgress(requestProgress::add)
+                                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                                       .length(6)
+                                       .uri("KSK@foo.txt")
+                                       .execute();
+                               connectNode();
+                               readMessage("Hello", () -> matchesDirectClientPut("Verbosity=1"));
+                               replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8);
+                               replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18);
+                               replyWithPutSuccessful(identifier);
+                               assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
+                               assertThat(requestProgress, contains(
+                                       isRequestProgress(1, 2, 3, 4, 5, 6, true, 8),
+                                       isRequestProgress(11, 12, 13, 14, 15, 16, false, 18)
+                               ));
+                       }
 
-               @Test
-               public void clientPutSendsNotificationOnProgress() throws InterruptedException, ExecutionException, IOException {
-                       List<RequestProgress> requestProgress = new ArrayList<>();
-                   Future<Optional<Key>> key = fcpClient.clientPut()
-                               .onProgress(requestProgress::add)
-                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                               .length(6)
-                               .uri("KSK@foo.txt")
-                               .execute();
-                       connectNode();
-                       readMessage("Hello", () -> matchesDirectClientPut("Verbosity=1"));
-                       replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8);
-                       replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18);
-                       replyWithPutSuccessful(identifier);
-                       assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
-                       assertThat(requestProgress, contains(
-                               isRequestProgress(1, 2, 3, 4, 5, 6, true, 8),
-                               isRequestProgress(11, 12, 13, 14, 15, 16, false, 18)
-                       ));
                }
 
                private void replyWithSimpleProgress(
@@ -1663,33 +1669,46 @@ public class DefaultFcpClientTest {
                        );
                }
 
-       }
+               public class ClientPutDiskDir {
 
-       public class ClientPutDiskDir {
+                       @Test
+                       public void commandIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
+                               Future<Optional<Key>> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute();
+                               connectAndAssert(this::matchesClientPutDiskDir);
+                               fcpServer.writeLine("PutSuccessful", "Identifier=" + identifier, "URI=CHK@abc", "EndMessage");
+                               assertThat(key.get().get().getKey(), is("CHK@abc"));
+                       }
 
-               @Test
-               public void commandIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
-                       Future<Optional<Key>> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute();
-                       connectAndAssert(this::matchesClientPutDiskDir);
-                       fcpServer.writeLine("PutSuccessful", "Identifier=" + identifier, "URI=CHK@abc", "EndMessage");
-                       assertThat(key.get().get().getKey(), is("CHK@abc"));
-               }
+                       @Test
+                       public void protocolErrorAbortsCommand() throws InterruptedException, ExecutionException, IOException {
+                               Future<Optional<Key>> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute();
+                               connectAndAssert(this::matchesClientPutDiskDir);
+                               replyWithProtocolError();
+                               assertThat(key.get().isPresent(), is(false));
+                       }
 
-               @Test
-               public void protocolErrorAbortsCommand() throws InterruptedException, ExecutionException, IOException {
-                       Future<Optional<Key>> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute();
-                       connectAndAssert(this::matchesClientPutDiskDir);
-                       replyWithProtocolError();
-                       assertThat(key.get().isPresent(), is(false));
-               }
+                       @Test
+                       public void progressIsSentToConsumerCorrectly() throws InterruptedException, ExecutionException, IOException {
+                               List<RequestProgress> requestProgress = new ArrayList<>();
+                               Future<Optional<Key>> key = fcpClient.clientPutDiskDir().onProgress(requestProgress::add)
+                                       .fromDirectory(new File("")).uri("CHK@").execute();
+                               connectAndAssert(() -> matchesClientPutDiskDir("Verbosity=1"));
+                               replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8);
+                               replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18);
+                               fcpServer.writeLine("PutSuccessful", "Identifier=" + identifier, "URI=CHK@abc", "EndMessage");
+                               assertThat(key.get().get().getKey(), is("CHK@abc"));
+                               assertThat(requestProgress, contains(
+                                       isRequestProgress(1, 2, 3, 4, 5, 6, true, 8),
+                                       isRequestProgress(11, 12, 13, 14, 15, 16, false, 18)
+                               ));
+                       }
+
+                       private Matcher<List<String>> matchesClientPutDiskDir(String... additionalLines) {
+                               List<String> lines = new ArrayList<>(Arrays.asList("Identifier=" + identifier, "URI=CHK@", "Filename=" + new File("").getPath()));
+                               Arrays.asList(additionalLines).forEach(lines::add);
+                               return matchesFcpMessage("ClientPutDiskDir", lines.toArray(new String[lines.size()]));
+                       }
 
-               private Matcher<List<String>> matchesClientPutDiskDir() {
-                       return matchesFcpMessage(
-                               "ClientPutDiskDir",
-                               "Identifier=" + identifier,
-                               "URI=CHK@",
-                               "Filename=" + new File("").getPath()
-                       );
                }
 
        }