Send progress updates from ClientPutDiskDir
authorDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Thu, 15 Oct 2015 21:47:02 +0000 (23:47 +0200)
committerDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Thu, 15 Oct 2015 21:47:07 +0000 (23:47 +0200)
src/main/java/net/pterodactylus/fcp/quelaton/ClientPutDiskDirCommand.java
src/main/java/net/pterodactylus/fcp/quelaton/ClientPutDiskDirCommandImpl.java
src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java

index e3c5344..dad6750 100644 (file)
@@ -2,8 +2,10 @@ package net.pterodactylus.fcp.quelaton;
 
 import java.io.File;
 import java.util.Optional;
+import java.util.function.Consumer;
 
 import net.pterodactylus.fcp.Key;
+import net.pterodactylus.fcp.RequestProgress;
 
 /**
  * FCP command that inserts a directory from the filesystem local to the node into Freenet.
@@ -12,6 +14,8 @@ import net.pterodactylus.fcp.Key;
  */
 public interface ClientPutDiskDirCommand {
 
+       ClientPutDiskDirCommand onProgress(Consumer<RequestProgress> requestProgressConsumer);
+
        WithUri fromDirectory(File directory);
 
        interface WithUri {
index 3a21922..2cf10c7 100644 (file)
@@ -2,17 +2,23 @@ package net.pterodactylus.fcp.quelaton;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import net.pterodactylus.fcp.ClientPutDiskDir;
 import net.pterodactylus.fcp.Key;
 import net.pterodactylus.fcp.ProtocolError;
 import net.pterodactylus.fcp.PutSuccessful;
+import net.pterodactylus.fcp.RequestProgress;
+import net.pterodactylus.fcp.SimpleProgress;
+import net.pterodactylus.fcp.Verbosity;
 
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -29,6 +35,7 @@ public class ClientPutDiskDirCommandImpl implements ClientPutDiskDirCommand {
        private final Supplier<String> identifierGenerator;
        private final AtomicReference<String> directory = new AtomicReference<>();
        private final AtomicReference<String> uri = new AtomicReference<>();
+       private final List<Consumer<RequestProgress>> requestProgressConsumers = new CopyOnWriteArrayList<>();
 
        public ClientPutDiskDirCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier, Supplier<String> identifierGenerator) {
                this.threadPool = MoreExecutors.listeningDecorator(threadPool);
@@ -37,6 +44,12 @@ public class ClientPutDiskDirCommandImpl implements ClientPutDiskDirCommand {
        }
 
        @Override
+       public ClientPutDiskDirCommand onProgress(Consumer<RequestProgress> requestProgressConsumer) {
+               requestProgressConsumers.add(Objects.requireNonNull(requestProgressConsumer));
+               return this;
+       }
+
+       @Override
        public WithUri fromDirectory(File directory) {
                this.directory.set(Objects.requireNonNull(directory).getPath());
                return this::uri;
@@ -49,6 +62,9 @@ public class ClientPutDiskDirCommandImpl implements ClientPutDiskDirCommand {
 
        private Optional<Key> execute() throws IOException, ExecutionException, InterruptedException {
                ClientPutDiskDir clientPutDiskDir = new ClientPutDiskDir(uri.get(), identifierGenerator.get(), directory.get());
+               if (!requestProgressConsumers.isEmpty()) {
+                       clientPutDiskDir.setVerbosity(Verbosity.PROGRESS);
+               }
                try (ClientPutDiskDirDialog clientPutDiskDirDialog = new ClientPutDiskDirDialog()) {
                        return clientPutDiskDirDialog.send(clientPutDiskDir).get();
                }
@@ -66,6 +82,21 @@ public class ClientPutDiskDirCommandImpl implements ClientPutDiskDirCommand {
                }
 
                @Override
+               protected void consumeSimpleProgress(SimpleProgress simpleProgress) {
+                       RequestProgress requestProgress = new RequestProgress(
+                               simpleProgress.getTotal(),
+                               simpleProgress.getRequired(),
+                               simpleProgress.getFailed(),
+                               simpleProgress.getFatallyFailed(),
+                               simpleProgress.getLastProgress(),
+                               simpleProgress.getSucceeded(),
+                               simpleProgress.isFinalizedTotal(),
+                               simpleProgress.getMinSuccessFetchBlocks()
+                       );
+                       requestProgressConsumers.stream().forEach(consumer -> consumer.accept(requestProgress));
+               }
+
+               @Override
                protected void consumeProtocolError(ProtocolError protocolError) {
                        finish();
                }
index e526f2f..f238d5a 100644 (file)
@@ -58,10 +58,7 @@ import org.hamcrest.Matchers;
 import org.hamcrest.TypeSafeDiagnosingMatcher;
 import org.junit.After;
 import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 
 /**
@@ -1333,316 +1330,325 @@ public class DefaultFcpClientTest {
 
        }
 
-       public class ClientPut {
+       public class PutCommands {
 
-               @Test
-               public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException {
-                       fcpClient.clientPut()
-                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                               .length(6)
-                               .uri("KSK@foo.txt")
-                               .execute();
-                       connectNode();
-                       readMessage("Hello", this::matchesDirectClientPut);
-               }
-
-               @Test
-               public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
-                       Future<Optional<Key>> key = fcpClient.clientPut()
-                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                               .length(6)
-                               .uri("KSK@foo.txt")
-                               .execute();
-                       connectNode();
-                       readMessage("Hello", this::matchesDirectClientPut);
-                       replyWithPutFailed("not-the-right-one");
-                       replyWithPutSuccessful(identifier);
-                       assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
-               }
-
-               @Test
-               public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
-                       Future<Optional<Key>> key = fcpClient.clientPut()
-                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                               .length(6)
-                               .uri("KSK@foo.txt")
-                               .execute();
-                       connectNode();
-                       readMessage("Hello", this::matchesDirectClientPut);
-                       replyWithPutSuccessful("not-the-right-one");
-                       replyWithPutFailed(identifier);
-                       assertThat(key.get().isPresent(), is(false));
-               }
-
-               @Test
-               public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
-                       fcpClient.clientPut()
-                               .named("otherName.txt")
-                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                               .length(6)
-                               .uri("KSK@foo.txt")
-                               .execute();
-                       connectNode();
-                       readMessage("Hello", () -> allOf(
-                               hasHead("ClientPut"),
-                               hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6",
-                                       "URI=KSK@foo.txt"),
-                               hasTail("EndMessage", "Hello")
-                       ));
-               }
-
-               @Test
-               public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException {
-                       fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
-                       connectAndAssert(() ->
-                               matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
-               }
-
-               @Test
-               public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
-                       fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
-                       connectAndAssert(() ->
-                               matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
-               }
-
-               public class DDA {
+               public class ClientPut {
 
-                       private final File ddaFile;
-                       private final File fileToUpload;
-
-                       public DDA() throws IOException {
-                               ddaFile = createDdaFile();
-                               fileToUpload = new File(ddaFile.getParent(), "test.dat");
+                       @Test
+                       public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException {
+                               fcpClient.clientPut()
+                                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                                       .length(6)
+                                       .uri("KSK@foo.txt")
+                                       .execute();
+                               connectNode();
+                               readMessage("Hello", this::matchesDirectClientPut);
                        }
 
-                       private Matcher<List<String>> matchesFileClientPut(File file) {
-                               return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file);
+                       @Test
+                       public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
+                               Future<Optional<Key>> key = fcpClient.clientPut()
+                                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                                       .length(6)
+                                       .uri("KSK@foo.txt")
+                                       .execute();
+                               connectNode();
+                               readMessage("Hello", this::matchesDirectClientPut);
+                               replyWithPutFailed("not-the-right-one");
+                               replyWithPutSuccessful(identifier);
+                               assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
                        }
 
                        @Test
-                       public void completeDda() throws IOException, ExecutionException, InterruptedException {
-                               fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
-                               connectAndAssert(() -> matchesFileClientPut(fileToUpload));
-                               sendDdaRequired(identifier);
-                               readMessage(() -> matchesTestDDARequest(ddaFile));
-                               sendTestDDAReply(ddaFile.getParent(), ddaFile);
-                               readMessage(() -> matchesTestDDAResponse(ddaFile));
-                               writeTestDDAComplete(ddaFile);
-                               readMessage(() -> matchesFileClientPut(fileToUpload));
+                       public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
+                               Future<Optional<Key>> key = fcpClient.clientPut()
+                                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                                       .length(6)
+                                       .uri("KSK@foo.txt")
+                                       .execute();
+                               connectNode();
+                               readMessage("Hello", this::matchesDirectClientPut);
+                               replyWithPutSuccessful("not-the-right-one");
+                               replyWithPutFailed(identifier);
+                               assertThat(key.get().isPresent(), is(false));
                        }
 
                        @Test
-                       public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException {
-                               fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
-                               connectAndAssert(() -> matchesFileClientPut(fileToUpload));
-                               sendDdaRequired(identifier);
-                               readMessage(() -> matchesTestDDARequest(ddaFile));
-                               sendTestDDAReply("/some-other-directory", ddaFile);
-                               sendTestDDAReply(ddaFile.getParent(), ddaFile);
-                               readMessage(() -> matchesTestDDAResponse(ddaFile));
+                       public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
+                               fcpClient.clientPut()
+                                       .named("otherName.txt")
+                                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                                       .length(6)
+                                       .uri("KSK@foo.txt")
+                                       .execute();
+                               connectNode();
+                               readMessage("Hello", () -> allOf(
+                                       hasHead("ClientPut"),
+                                       hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6",
+                                               "URI=KSK@foo.txt"),
+                                       hasTail("EndMessage", "Hello")
+                               ));
                        }
 
                        @Test
-                       public void sendResponseIfFileUnreadable() throws IOException, ExecutionException, InterruptedException {
-                               fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
-                               connectAndAssert(() -> matchesFileClientPut(fileToUpload));
-                               sendDdaRequired(identifier);
-                               readMessage(() -> matchesTestDDARequest(ddaFile));
-                               sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo"));
-                               readMessage(this::matchesFailedToReadResponse);
+                       public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException {
+                               fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
+                               connectAndAssert(() ->
+                                       matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
                        }
 
                        @Test
-                       public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
-                       throws IOException, ExecutionException, InterruptedException {
-                               fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
-                               connectNode();
-                               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-                               String identifier = extractIdentifier(lines);
-                               fcpServer.writeLine(
-                                       "TestDDAComplete",
-                                       "Directory=/some-other-directory",
-                                       "EndMessage"
-                               );
-                               sendDdaRequired(identifier);
-                               lines = fcpServer.collectUntil(is("EndMessage"));
-                               assertThat(lines, matchesFcpMessage(
-                                       "TestDDARequest",
-                                       "Directory=" + ddaFile.getParent(),
-                                       "WantReadDirectory=true",
-                                       "WantWriteDirectory=false"
-                               ));
+                       public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
+                               fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
+                               connectAndAssert(() ->
+                                       matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
                        }
 
-                       private Matcher<List<String>> matchesFailedToReadResponse() {
-                               return matchesFcpMessage(
-                                       "TestDDAResponse",
-                                       "Directory=" + ddaFile.getParent(),
-                                       "ReadContent=failed-to-read"
+                       public class DDA {
+
+                               private final File ddaFile;
+                               private final File fileToUpload;
+
+                               public DDA() throws IOException {
+                                       ddaFile = createDdaFile();
+                                       fileToUpload = new File(ddaFile.getParent(), "test.dat");
+                               }
+
+                               private Matcher<List<String>> matchesFileClientPut(File file) {
+                                       return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file);
+                               }
+
+                               @Test
+                               public void completeDda() throws IOException, ExecutionException, InterruptedException {
+                                       fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
+                                       connectAndAssert(() -> matchesFileClientPut(fileToUpload));
+                                       sendDdaRequired(identifier);
+                                       readMessage(() -> matchesTestDDARequest(ddaFile));
+                                       sendTestDDAReply(ddaFile.getParent(), ddaFile);
+                                       readMessage(() -> matchesTestDDAResponse(ddaFile));
+                                       writeTestDDAComplete(ddaFile);
+                                       readMessage(() -> matchesFileClientPut(fileToUpload));
+                               }
+
+                               @Test
+                               public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException {
+                                       fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
+                                       connectAndAssert(() -> matchesFileClientPut(fileToUpload));
+                                       sendDdaRequired(identifier);
+                                       readMessage(() -> matchesTestDDARequest(ddaFile));
+                                       sendTestDDAReply("/some-other-directory", ddaFile);
+                                       sendTestDDAReply(ddaFile.getParent(), ddaFile);
+                                       readMessage(() -> matchesTestDDAResponse(ddaFile));
+                               }
+
+                               @Test
+                               public void sendResponseIfFileUnreadable()
+                               throws IOException, ExecutionException, InterruptedException {
+                                       fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
+                                       connectAndAssert(() -> matchesFileClientPut(fileToUpload));
+                                       sendDdaRequired(identifier);
+                                       readMessage(() -> matchesTestDDARequest(ddaFile));
+                                       sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo"));
+                                       readMessage(this::matchesFailedToReadResponse);
+                               }
+
+                               @Test
+                               public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
+                               throws IOException, ExecutionException, InterruptedException {
+                                       fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
+                                       connectNode();
+                                       List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+                                       String identifier = extractIdentifier(lines);
+                                       fcpServer.writeLine(
+                                               "TestDDAComplete",
+                                               "Directory=/some-other-directory",
+                                               "EndMessage"
+                                       );
+                                       sendDdaRequired(identifier);
+                                       lines = fcpServer.collectUntil(is("EndMessage"));
+                                       assertThat(lines, matchesFcpMessage(
+                                               "TestDDARequest",
+                                               "Directory=" + ddaFile.getParent(),
+                                               "WantReadDirectory=true",
+                                               "WantWriteDirectory=false"
+                                       ));
+                               }
+
+                               private Matcher<List<String>> matchesFailedToReadResponse() {
+                                       return matchesFcpMessage(
+                                               "TestDDAResponse",
+                                               "Directory=" + ddaFile.getParent(),
+                                               "ReadContent=failed-to-read"
+                                       );
+                               }
+
+                               private void writeTestDDAComplete(File tempFile) throws IOException {
+                                       fcpServer.writeLine(
+                                               "TestDDAComplete",
+                                               "Directory=" + tempFile.getParent(),
+                                               "ReadDirectoryAllowed=true",
+                                               "EndMessage"
+                                       );
+                               }
+
+                               private Matcher<List<String>> matchesTestDDAResponse(File tempFile) {
+                                       return matchesFcpMessage(
+                                               "TestDDAResponse",
+                                               "Directory=" + tempFile.getParent(),
+                                               "ReadContent=test-content"
+                                       );
+                               }
+
+                               private void sendTestDDAReply(String directory, File tempFile) throws IOException {
+                                       fcpServer.writeLine(
+                                               "TestDDAReply",
+                                               "Directory=" + directory,
+                                               "ReadFilename=" + tempFile,
+                                               "EndMessage"
+                                       );
+                               }
+
+                               private Matcher<List<String>> matchesTestDDARequest(File tempFile) {
+                                       return matchesFcpMessage(
+                                               "TestDDARequest",
+                                               "Directory=" + tempFile.getParent(),
+                                               "WantReadDirectory=true",
+                                               "WantWriteDirectory=false"
+                                       );
+                               }
+
+                               private void sendDdaRequired(String identifier) throws IOException {
+                                       fcpServer.writeLine(
+                                               "ProtocolError",
+                                               "Identifier=" + identifier,
+                                               "Code=25",
+                                               "EndMessage"
+                                       );
+                               }
+
+                       }
+
+                       private void replyWithPutSuccessful(String identifier) throws IOException {
+                               fcpServer.writeLine(
+                                       "PutSuccessful",
+                                       "URI=KSK@foo.txt",
+                                       "Identifier=" + identifier,
+                                       "EndMessage"
                                );
                        }
 
-                       private void writeTestDDAComplete(File tempFile) throws IOException {
+                       private void replyWithPutFailed(String identifier) throws IOException {
                                fcpServer.writeLine(
-                                       "TestDDAComplete",
-                                       "Directory=" + tempFile.getParent(),
-                                       "ReadDirectoryAllowed=true",
+                                       "PutFailed",
+                                       "Identifier=" + identifier,
                                        "EndMessage"
                                );
                        }
 
-                       private Matcher<List<String>> matchesTestDDAResponse(File tempFile) {
-                               return matchesFcpMessage(
-                                       "TestDDAResponse",
-                                       "Directory=" + tempFile.getParent(),
-                                       "ReadContent=test-content"
+                       private Matcher<List<String>> matchesDirectClientPut(String... additionalLines) {
+                               List<String> lines =
+                                       new ArrayList<>(Arrays.asList("UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"));
+                               Arrays.asList(additionalLines).forEach(lines::add);
+                               return allOf(
+                                       hasHead("ClientPut"),
+                                       hasParameters(1, 2, lines.toArray(new String[lines.size()])),
+                                       hasTail("EndMessage", "Hello")
                                );
                        }
 
-                       private void sendTestDDAReply(String directory, File tempFile) throws IOException {
+                       private File createDdaFile() throws IOException {
+                               File tempFile = File.createTempFile("test-dda-", ".dat");
+                               tempFile.deleteOnExit();
+                               Files.write("test-content", tempFile, StandardCharsets.UTF_8);
+                               return tempFile;
+                       }
+
+                       @Test
+                       public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
+                       throws InterruptedException, ExecutionException, IOException {
+                               Future<Optional<Key>> key =
+                                       fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
+                               connectNode();
+                               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+                               String identifier = extractIdentifier(lines);
                                fcpServer.writeLine(
-                                       "TestDDAReply",
-                                       "Directory=" + directory,
-                                       "ReadFilename=" + tempFile,
+                                       "ProtocolError",
+                                       "Identifier=not-the-right-one",
+                                       "Code=25",
                                        "EndMessage"
                                );
-                       }
-
-                       private Matcher<List<String>> matchesTestDDARequest(File tempFile) {
-                               return matchesFcpMessage(
-                                       "TestDDARequest",
-                                       "Directory=" + tempFile.getParent(),
-                                       "WantReadDirectory=true",
-                                       "WantWriteDirectory=false"
+                               fcpServer.writeLine(
+                                       "PutSuccessful",
+                                       "Identifier=" + identifier,
+                                       "URI=KSK@foo.txt",
+                                       "EndMessage"
                                );
+                               assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
                        }
 
-                       private void sendDdaRequired(String identifier) throws IOException {
+                       @Test
+                       public void clientPutAbortsOnProtocolErrorOtherThan25()
+                       throws InterruptedException, ExecutionException, IOException {
+                               Future<Optional<Key>> key =
+                                       fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
+                               connectNode();
+                               List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+                               String identifier = extractIdentifier(lines);
                                fcpServer.writeLine(
                                        "ProtocolError",
                                        "Identifier=" + identifier,
-                                       "Code=25",
+                                       "Code=1",
                                        "EndMessage"
                                );
+                               assertThat(key.get().isPresent(), is(false));
                        }
 
-               }
-
-               private void replyWithPutSuccessful(String identifier) throws IOException {
-                       fcpServer.writeLine(
-                               "PutSuccessful",
-                               "URI=KSK@foo.txt",
-                               "Identifier=" + identifier,
-                               "EndMessage"
-                       );
-               }
-
-               private void replyWithPutFailed(String identifier) throws IOException {
-                       fcpServer.writeLine(
-                               "PutFailed",
-                               "Identifier=" + identifier,
-                               "EndMessage"
-                       );
-               }
-
-               private Matcher<List<String>> matchesDirectClientPut(String... additionalLines) {
-                       List<String> lines = new ArrayList<>(Arrays.asList("UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"));
-                       Arrays.asList(additionalLines).forEach(lines::add);
-                       return allOf(
-                               hasHead("ClientPut"),
-                               hasParameters(1, 2, lines.toArray(new String[lines.size()])),
-                               hasTail("EndMessage", "Hello")
-                       );
-               }
-
-               private File createDdaFile() throws IOException {
-                       File tempFile = File.createTempFile("test-dda-", ".dat");
-                       tempFile.deleteOnExit();
-                       Files.write("test-content", tempFile, StandardCharsets.UTF_8);
-                       return tempFile;
-               }
-
-               @Test
-               public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
-               throws InterruptedException, ExecutionException, IOException {
-                       Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
-                       connectNode();
-                       List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-                       String identifier = extractIdentifier(lines);
-                       fcpServer.writeLine(
-                               "ProtocolError",
-                               "Identifier=not-the-right-one",
-                               "Code=25",
-                               "EndMessage"
-                       );
-                       fcpServer.writeLine(
-                               "PutSuccessful",
-                               "Identifier=" + identifier,
-                               "URI=KSK@foo.txt",
-                               "EndMessage"
-                       );
-                       assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
-               }
-
-               @Test
-               public void clientPutAbortsOnProtocolErrorOtherThan25()
-               throws InterruptedException, ExecutionException, IOException {
-                       Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
-                       connectNode();
-                       List<String> lines = fcpServer.collectUntil(is("EndMessage"));
-                       String identifier = extractIdentifier(lines);
-                       fcpServer.writeLine(
-                               "ProtocolError",
-                               "Identifier=" + identifier,
-                               "Code=1",
-                               "EndMessage"
-                       );
-                       assertThat(key.get().isPresent(), is(false));
-               }
+                       @Test
+                       public void clientPutSendsNotificationsForGeneratedKeys()
+                       throws InterruptedException, ExecutionException, IOException {
+                               List<String> generatedKeys = new CopyOnWriteArrayList<>();
+                               Future<Optional<Key>> key = fcpClient.clientPut()
+                                       .onKeyGenerated(generatedKeys::add)
+                                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                                       .length(6)
+                                       .uri("KSK@foo.txt")
+                                       .execute();
+                               connectNode();
+                               List<String> lines = fcpServer.collectUntil(is("Hello"));
+                               String identifier = extractIdentifier(lines);
+                               fcpServer.writeLine(
+                                       "URIGenerated",
+                                       "Identifier=" + identifier,
+                                       "URI=KSK@foo.txt",
+                                       "EndMessage"
+                               );
+                               replyWithPutSuccessful(identifier);
+                               assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
+                               assertThat(generatedKeys, contains("KSK@foo.txt"));
+                       }
 
-               @Test
-               public void clientPutSendsNotificationsForGeneratedKeys()
-               throws InterruptedException, ExecutionException, IOException {
-                       List<String> generatedKeys = new CopyOnWriteArrayList<>();
-                       Future<Optional<Key>> key = fcpClient.clientPut()
-                               .onKeyGenerated(generatedKeys::add)
-                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                               .length(6)
-                               .uri("KSK@foo.txt")
-                               .execute();
-                       connectNode();
-                       List<String> lines = fcpServer.collectUntil(is("Hello"));
-                       String identifier = extractIdentifier(lines);
-                       fcpServer.writeLine(
-                               "URIGenerated",
-                               "Identifier=" + identifier,
-                               "URI=KSK@foo.txt",
-                               "EndMessage"
-                       );
-                       replyWithPutSuccessful(identifier);
-                       assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
-                       assertThat(generatedKeys, contains("KSK@foo.txt"));
-               }
+                       @Test
+                       public void clientPutSendsNotificationOnProgress()
+                       throws InterruptedException, ExecutionException, IOException {
+                               List<RequestProgress> requestProgress = new ArrayList<>();
+                               Future<Optional<Key>> key = fcpClient.clientPut()
+                                       .onProgress(requestProgress::add)
+                                       .from(new ByteArrayInputStream("Hello\n".getBytes()))
+                                       .length(6)
+                                       .uri("KSK@foo.txt")
+                                       .execute();
+                               connectNode();
+                               readMessage("Hello", () -> matchesDirectClientPut("Verbosity=1"));
+                               replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8);
+                               replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18);
+                               replyWithPutSuccessful(identifier);
+                               assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
+                               assertThat(requestProgress, contains(
+                                       isRequestProgress(1, 2, 3, 4, 5, 6, true, 8),
+                                       isRequestProgress(11, 12, 13, 14, 15, 16, false, 18)
+                               ));
+                       }
 
-               @Test
-               public void clientPutSendsNotificationOnProgress() throws InterruptedException, ExecutionException, IOException {
-                       List<RequestProgress> requestProgress = new ArrayList<>();
-                   Future<Optional<Key>> key = fcpClient.clientPut()
-                               .onProgress(requestProgress::add)
-                               .from(new ByteArrayInputStream("Hello\n".getBytes()))
-                               .length(6)
-                               .uri("KSK@foo.txt")
-                               .execute();
-                       connectNode();
-                       readMessage("Hello", () -> matchesDirectClientPut("Verbosity=1"));
-                       replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8);
-                       replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18);
-                       replyWithPutSuccessful(identifier);
-                       assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
-                       assertThat(requestProgress, contains(
-                               isRequestProgress(1, 2, 3, 4, 5, 6, true, 8),
-                               isRequestProgress(11, 12, 13, 14, 15, 16, false, 18)
-                       ));
                }
 
                private void replyWithSimpleProgress(
@@ -1663,33 +1669,46 @@ public class DefaultFcpClientTest {
                        );
                }
 
-       }
+               public class ClientPutDiskDir {
 
-       public class ClientPutDiskDir {
+                       @Test
+                       public void commandIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
+                               Future<Optional<Key>> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute();
+                               connectAndAssert(this::matchesClientPutDiskDir);
+                               fcpServer.writeLine("PutSuccessful", "Identifier=" + identifier, "URI=CHK@abc", "EndMessage");
+                               assertThat(key.get().get().getKey(), is("CHK@abc"));
+                       }
 
-               @Test
-               public void commandIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
-                       Future<Optional<Key>> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute();
-                       connectAndAssert(this::matchesClientPutDiskDir);
-                       fcpServer.writeLine("PutSuccessful", "Identifier=" + identifier, "URI=CHK@abc", "EndMessage");
-                       assertThat(key.get().get().getKey(), is("CHK@abc"));
-               }
+                       @Test
+                       public void protocolErrorAbortsCommand() throws InterruptedException, ExecutionException, IOException {
+                               Future<Optional<Key>> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute();
+                               connectAndAssert(this::matchesClientPutDiskDir);
+                               replyWithProtocolError();
+                               assertThat(key.get().isPresent(), is(false));
+                       }
 
-               @Test
-               public void protocolErrorAbortsCommand() throws InterruptedException, ExecutionException, IOException {
-                       Future<Optional<Key>> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute();
-                       connectAndAssert(this::matchesClientPutDiskDir);
-                       replyWithProtocolError();
-                       assertThat(key.get().isPresent(), is(false));
-               }
+                       @Test
+                       public void progressIsSentToConsumerCorrectly() throws InterruptedException, ExecutionException, IOException {
+                               List<RequestProgress> requestProgress = new ArrayList<>();
+                               Future<Optional<Key>> key = fcpClient.clientPutDiskDir().onProgress(requestProgress::add)
+                                       .fromDirectory(new File("")).uri("CHK@").execute();
+                               connectAndAssert(() -> matchesClientPutDiskDir("Verbosity=1"));
+                               replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8);
+                               replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18);
+                               fcpServer.writeLine("PutSuccessful", "Identifier=" + identifier, "URI=CHK@abc", "EndMessage");
+                               assertThat(key.get().get().getKey(), is("CHK@abc"));
+                               assertThat(requestProgress, contains(
+                                       isRequestProgress(1, 2, 3, 4, 5, 6, true, 8),
+                                       isRequestProgress(11, 12, 13, 14, 15, 16, false, 18)
+                               ));
+                       }
+
+                       private Matcher<List<String>> matchesClientPutDiskDir(String... additionalLines) {
+                               List<String> lines = new ArrayList<>(Arrays.asList("Identifier=" + identifier, "URI=CHK@", "Filename=" + new File("").getPath()));
+                               Arrays.asList(additionalLines).forEach(lines::add);
+                               return matchesFcpMessage("ClientPutDiskDir", lines.toArray(new String[lines.size()]));
+                       }
 
-               private Matcher<List<String>> matchesClientPutDiskDir() {
-                       return matchesFcpMessage(
-                               "ClientPutDiskDir",
-                               "Identifier=" + identifier,
-                               "URI=CHK@",
-                               "Filename=" + new File("").getPath()
-                       );
                }
 
        }