From: David ‘Bombe’ Roden Date: Thu, 15 Oct 2015 21:47:02 +0000 (+0200) Subject: Send progress updates from ClientPutDiskDir X-Git-Url: https://git.pterodactylus.net/?p=jFCPlib.git;a=commitdiff_plain;h=c2522a887d02f692017b4b9c8e1d31472e41001f Send progress updates from ClientPutDiskDir --- diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutDiskDirCommand.java b/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutDiskDirCommand.java index e3c5344..dad6750 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutDiskDirCommand.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutDiskDirCommand.java @@ -2,8 +2,10 @@ package net.pterodactylus.fcp.quelaton; import java.io.File; import java.util.Optional; +import java.util.function.Consumer; import net.pterodactylus.fcp.Key; +import net.pterodactylus.fcp.RequestProgress; /** * FCP command that inserts a directory from the filesystem local to the node into Freenet. @@ -12,6 +14,8 @@ import net.pterodactylus.fcp.Key; */ public interface ClientPutDiskDirCommand { + ClientPutDiskDirCommand onProgress(Consumer requestProgressConsumer); + WithUri fromDirectory(File directory); interface WithUri { diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutDiskDirCommandImpl.java b/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutDiskDirCommandImpl.java index 3a21922..2cf10c7 100644 --- a/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutDiskDirCommandImpl.java +++ b/src/main/java/net/pterodactylus/fcp/quelaton/ClientPutDiskDirCommandImpl.java @@ -2,17 +2,23 @@ package net.pterodactylus.fcp.quelaton; import java.io.File; import java.io.IOException; +import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Supplier; import net.pterodactylus.fcp.ClientPutDiskDir; import net.pterodactylus.fcp.Key; import net.pterodactylus.fcp.ProtocolError; import net.pterodactylus.fcp.PutSuccessful; +import net.pterodactylus.fcp.RequestProgress; +import net.pterodactylus.fcp.SimpleProgress; +import net.pterodactylus.fcp.Verbosity; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -29,6 +35,7 @@ public class ClientPutDiskDirCommandImpl implements ClientPutDiskDirCommand { private final Supplier identifierGenerator; private final AtomicReference directory = new AtomicReference<>(); private final AtomicReference uri = new AtomicReference<>(); + private final List> requestProgressConsumers = new CopyOnWriteArrayList<>(); public ClientPutDiskDirCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier, Supplier identifierGenerator) { this.threadPool = MoreExecutors.listeningDecorator(threadPool); @@ -37,6 +44,12 @@ public class ClientPutDiskDirCommandImpl implements ClientPutDiskDirCommand { } @Override + public ClientPutDiskDirCommand onProgress(Consumer requestProgressConsumer) { + requestProgressConsumers.add(Objects.requireNonNull(requestProgressConsumer)); + return this; + } + + @Override public WithUri fromDirectory(File directory) { this.directory.set(Objects.requireNonNull(directory).getPath()); return this::uri; @@ -49,6 +62,9 @@ public class ClientPutDiskDirCommandImpl implements ClientPutDiskDirCommand { private Optional execute() throws IOException, ExecutionException, InterruptedException { ClientPutDiskDir clientPutDiskDir = new ClientPutDiskDir(uri.get(), identifierGenerator.get(), directory.get()); + if (!requestProgressConsumers.isEmpty()) { + clientPutDiskDir.setVerbosity(Verbosity.PROGRESS); + } try (ClientPutDiskDirDialog clientPutDiskDirDialog = new ClientPutDiskDirDialog()) { return clientPutDiskDirDialog.send(clientPutDiskDir).get(); } @@ -66,6 +82,21 @@ public class ClientPutDiskDirCommandImpl implements ClientPutDiskDirCommand { } @Override + protected void consumeSimpleProgress(SimpleProgress simpleProgress) { + RequestProgress requestProgress = new RequestProgress( + simpleProgress.getTotal(), + simpleProgress.getRequired(), + simpleProgress.getFailed(), + simpleProgress.getFatallyFailed(), + simpleProgress.getLastProgress(), + simpleProgress.getSucceeded(), + simpleProgress.isFinalizedTotal(), + simpleProgress.getMinSuccessFetchBlocks() + ); + requestProgressConsumers.stream().forEach(consumer -> consumer.accept(requestProgress)); + } + + @Override protected void consumeProtocolError(ProtocolError protocolError) { finish(); } diff --git a/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java b/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java index e526f2f..f238d5a 100644 --- a/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java +++ b/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java @@ -58,10 +58,7 @@ import org.hamcrest.Matchers; import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.After; import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; /** @@ -1333,316 +1330,325 @@ public class DefaultFcpClientTest { } - public class ClientPut { + public class PutCommands { - @Test - public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException { - fcpClient.clientPut() - .from(new ByteArrayInputStream("Hello\n".getBytes())) - .length(6) - .uri("KSK@foo.txt") - .execute(); - connectNode(); - readMessage("Hello", this::matchesDirectClientPut); - } - - @Test - public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException { - Future> key = fcpClient.clientPut() - .from(new ByteArrayInputStream("Hello\n".getBytes())) - .length(6) - .uri("KSK@foo.txt") - .execute(); - connectNode(); - readMessage("Hello", this::matchesDirectClientPut); - replyWithPutFailed("not-the-right-one"); - replyWithPutSuccessful(identifier); - assertThat(key.get().get().getKey(), is("KSK@foo.txt")); - } - - @Test - public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException { - Future> key = fcpClient.clientPut() - .from(new ByteArrayInputStream("Hello\n".getBytes())) - .length(6) - .uri("KSK@foo.txt") - .execute(); - connectNode(); - readMessage("Hello", this::matchesDirectClientPut); - replyWithPutSuccessful("not-the-right-one"); - replyWithPutFailed(identifier); - assertThat(key.get().isPresent(), is(false)); - } - - @Test - public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException { - fcpClient.clientPut() - .named("otherName.txt") - .from(new ByteArrayInputStream("Hello\n".getBytes())) - .length(6) - .uri("KSK@foo.txt") - .execute(); - connectNode(); - readMessage("Hello", () -> allOf( - hasHead("ClientPut"), - hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6", - "URI=KSK@foo.txt"), - hasTail("EndMessage", "Hello") - )); - } - - @Test - public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException { - fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute(); - connectAndAssert(() -> - matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt")); - } - - @Test - public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException { - fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute(); - connectAndAssert(() -> - matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt")); - } - - public class DDA { + public class ClientPut { - private final File ddaFile; - private final File fileToUpload; - - public DDA() throws IOException { - ddaFile = createDdaFile(); - fileToUpload = new File(ddaFile.getParent(), "test.dat"); + @Test + public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException { + fcpClient.clientPut() + .from(new ByteArrayInputStream("Hello\n".getBytes())) + .length(6) + .uri("KSK@foo.txt") + .execute(); + connectNode(); + readMessage("Hello", this::matchesDirectClientPut); } - private Matcher> matchesFileClientPut(File file) { - return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file); + @Test + public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException { + Future> key = fcpClient.clientPut() + .from(new ByteArrayInputStream("Hello\n".getBytes())) + .length(6) + .uri("KSK@foo.txt") + .execute(); + connectNode(); + readMessage("Hello", this::matchesDirectClientPut); + replyWithPutFailed("not-the-right-one"); + replyWithPutSuccessful(identifier); + assertThat(key.get().get().getKey(), is("KSK@foo.txt")); } @Test - public void completeDda() throws IOException, ExecutionException, InterruptedException { - fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute(); - connectAndAssert(() -> matchesFileClientPut(fileToUpload)); - sendDdaRequired(identifier); - readMessage(() -> matchesTestDDARequest(ddaFile)); - sendTestDDAReply(ddaFile.getParent(), ddaFile); - readMessage(() -> matchesTestDDAResponse(ddaFile)); - writeTestDDAComplete(ddaFile); - readMessage(() -> matchesFileClientPut(fileToUpload)); + public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException { + Future> key = fcpClient.clientPut() + .from(new ByteArrayInputStream("Hello\n".getBytes())) + .length(6) + .uri("KSK@foo.txt") + .execute(); + connectNode(); + readMessage("Hello", this::matchesDirectClientPut); + replyWithPutSuccessful("not-the-right-one"); + replyWithPutFailed(identifier); + assertThat(key.get().isPresent(), is(false)); } @Test - public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException { - fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute(); - connectAndAssert(() -> matchesFileClientPut(fileToUpload)); - sendDdaRequired(identifier); - readMessage(() -> matchesTestDDARequest(ddaFile)); - sendTestDDAReply("/some-other-directory", ddaFile); - sendTestDDAReply(ddaFile.getParent(), ddaFile); - readMessage(() -> matchesTestDDAResponse(ddaFile)); + public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException { + fcpClient.clientPut() + .named("otherName.txt") + .from(new ByteArrayInputStream("Hello\n".getBytes())) + .length(6) + .uri("KSK@foo.txt") + .execute(); + connectNode(); + readMessage("Hello", () -> allOf( + hasHead("ClientPut"), + hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6", + "URI=KSK@foo.txt"), + hasTail("EndMessage", "Hello") + )); } @Test - public void sendResponseIfFileUnreadable() throws IOException, ExecutionException, InterruptedException { - fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute(); - connectAndAssert(() -> matchesFileClientPut(fileToUpload)); - sendDdaRequired(identifier); - readMessage(() -> matchesTestDDARequest(ddaFile)); - sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo")); - readMessage(this::matchesFailedToReadResponse); + public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException { + fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute(); + connectAndAssert(() -> + matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt")); } @Test - public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory() - throws IOException, ExecutionException, InterruptedException { - fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute(); - connectNode(); - List lines = fcpServer.collectUntil(is("EndMessage")); - String identifier = extractIdentifier(lines); - fcpServer.writeLine( - "TestDDAComplete", - "Directory=/some-other-directory", - "EndMessage" - ); - sendDdaRequired(identifier); - lines = fcpServer.collectUntil(is("EndMessage")); - assertThat(lines, matchesFcpMessage( - "TestDDARequest", - "Directory=" + ddaFile.getParent(), - "WantReadDirectory=true", - "WantWriteDirectory=false" - )); + public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException { + fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute(); + connectAndAssert(() -> + matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt")); } - private Matcher> matchesFailedToReadResponse() { - return matchesFcpMessage( - "TestDDAResponse", - "Directory=" + ddaFile.getParent(), - "ReadContent=failed-to-read" + public class DDA { + + private final File ddaFile; + private final File fileToUpload; + + public DDA() throws IOException { + ddaFile = createDdaFile(); + fileToUpload = new File(ddaFile.getParent(), "test.dat"); + } + + private Matcher> matchesFileClientPut(File file) { + return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file); + } + + @Test + public void completeDda() throws IOException, ExecutionException, InterruptedException { + fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute(); + connectAndAssert(() -> matchesFileClientPut(fileToUpload)); + sendDdaRequired(identifier); + readMessage(() -> matchesTestDDARequest(ddaFile)); + sendTestDDAReply(ddaFile.getParent(), ddaFile); + readMessage(() -> matchesTestDDAResponse(ddaFile)); + writeTestDDAComplete(ddaFile); + readMessage(() -> matchesFileClientPut(fileToUpload)); + } + + @Test + public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException { + fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute(); + connectAndAssert(() -> matchesFileClientPut(fileToUpload)); + sendDdaRequired(identifier); + readMessage(() -> matchesTestDDARequest(ddaFile)); + sendTestDDAReply("/some-other-directory", ddaFile); + sendTestDDAReply(ddaFile.getParent(), ddaFile); + readMessage(() -> matchesTestDDAResponse(ddaFile)); + } + + @Test + public void sendResponseIfFileUnreadable() + throws IOException, ExecutionException, InterruptedException { + fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute(); + connectAndAssert(() -> matchesFileClientPut(fileToUpload)); + sendDdaRequired(identifier); + readMessage(() -> matchesTestDDARequest(ddaFile)); + sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo")); + readMessage(this::matchesFailedToReadResponse); + } + + @Test + public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory() + throws IOException, ExecutionException, InterruptedException { + fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); + fcpServer.writeLine( + "TestDDAComplete", + "Directory=/some-other-directory", + "EndMessage" + ); + sendDdaRequired(identifier); + lines = fcpServer.collectUntil(is("EndMessage")); + assertThat(lines, matchesFcpMessage( + "TestDDARequest", + "Directory=" + ddaFile.getParent(), + "WantReadDirectory=true", + "WantWriteDirectory=false" + )); + } + + private Matcher> matchesFailedToReadResponse() { + return matchesFcpMessage( + "TestDDAResponse", + "Directory=" + ddaFile.getParent(), + "ReadContent=failed-to-read" + ); + } + + private void writeTestDDAComplete(File tempFile) throws IOException { + fcpServer.writeLine( + "TestDDAComplete", + "Directory=" + tempFile.getParent(), + "ReadDirectoryAllowed=true", + "EndMessage" + ); + } + + private Matcher> matchesTestDDAResponse(File tempFile) { + return matchesFcpMessage( + "TestDDAResponse", + "Directory=" + tempFile.getParent(), + "ReadContent=test-content" + ); + } + + private void sendTestDDAReply(String directory, File tempFile) throws IOException { + fcpServer.writeLine( + "TestDDAReply", + "Directory=" + directory, + "ReadFilename=" + tempFile, + "EndMessage" + ); + } + + private Matcher> matchesTestDDARequest(File tempFile) { + return matchesFcpMessage( + "TestDDARequest", + "Directory=" + tempFile.getParent(), + "WantReadDirectory=true", + "WantWriteDirectory=false" + ); + } + + private void sendDdaRequired(String identifier) throws IOException { + fcpServer.writeLine( + "ProtocolError", + "Identifier=" + identifier, + "Code=25", + "EndMessage" + ); + } + + } + + private void replyWithPutSuccessful(String identifier) throws IOException { + fcpServer.writeLine( + "PutSuccessful", + "URI=KSK@foo.txt", + "Identifier=" + identifier, + "EndMessage" ); } - private void writeTestDDAComplete(File tempFile) throws IOException { + private void replyWithPutFailed(String identifier) throws IOException { fcpServer.writeLine( - "TestDDAComplete", - "Directory=" + tempFile.getParent(), - "ReadDirectoryAllowed=true", + "PutFailed", + "Identifier=" + identifier, "EndMessage" ); } - private Matcher> matchesTestDDAResponse(File tempFile) { - return matchesFcpMessage( - "TestDDAResponse", - "Directory=" + tempFile.getParent(), - "ReadContent=test-content" + private Matcher> matchesDirectClientPut(String... additionalLines) { + List lines = + new ArrayList<>(Arrays.asList("UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt")); + Arrays.asList(additionalLines).forEach(lines::add); + return allOf( + hasHead("ClientPut"), + hasParameters(1, 2, lines.toArray(new String[lines.size()])), + hasTail("EndMessage", "Hello") ); } - private void sendTestDDAReply(String directory, File tempFile) throws IOException { + private File createDdaFile() throws IOException { + File tempFile = File.createTempFile("test-dda-", ".dat"); + tempFile.deleteOnExit(); + Files.write("test-content", tempFile, StandardCharsets.UTF_8); + return tempFile; + } + + @Test + public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier() + throws InterruptedException, ExecutionException, IOException { + Future> key = + fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); fcpServer.writeLine( - "TestDDAReply", - "Directory=" + directory, - "ReadFilename=" + tempFile, + "ProtocolError", + "Identifier=not-the-right-one", + "Code=25", "EndMessage" ); - } - - private Matcher> matchesTestDDARequest(File tempFile) { - return matchesFcpMessage( - "TestDDARequest", - "Directory=" + tempFile.getParent(), - "WantReadDirectory=true", - "WantWriteDirectory=false" + fcpServer.writeLine( + "PutSuccessful", + "Identifier=" + identifier, + "URI=KSK@foo.txt", + "EndMessage" ); + assertThat(key.get().get().getKey(), is("KSK@foo.txt")); } - private void sendDdaRequired(String identifier) throws IOException { + @Test + public void clientPutAbortsOnProtocolErrorOtherThan25() + throws InterruptedException, ExecutionException, IOException { + Future> key = + fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); fcpServer.writeLine( "ProtocolError", "Identifier=" + identifier, - "Code=25", + "Code=1", "EndMessage" ); + assertThat(key.get().isPresent(), is(false)); } - } - - private void replyWithPutSuccessful(String identifier) throws IOException { - fcpServer.writeLine( - "PutSuccessful", - "URI=KSK@foo.txt", - "Identifier=" + identifier, - "EndMessage" - ); - } - - private void replyWithPutFailed(String identifier) throws IOException { - fcpServer.writeLine( - "PutFailed", - "Identifier=" + identifier, - "EndMessage" - ); - } - - private Matcher> matchesDirectClientPut(String... additionalLines) { - List lines = new ArrayList<>(Arrays.asList("UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt")); - Arrays.asList(additionalLines).forEach(lines::add); - return allOf( - hasHead("ClientPut"), - hasParameters(1, 2, lines.toArray(new String[lines.size()])), - hasTail("EndMessage", "Hello") - ); - } - - private File createDdaFile() throws IOException { - File tempFile = File.createTempFile("test-dda-", ".dat"); - tempFile.deleteOnExit(); - Files.write("test-content", tempFile, StandardCharsets.UTF_8); - return tempFile; - } - - @Test - public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier() - throws InterruptedException, ExecutionException, IOException { - Future> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute(); - connectNode(); - List lines = fcpServer.collectUntil(is("EndMessage")); - String identifier = extractIdentifier(lines); - fcpServer.writeLine( - "ProtocolError", - "Identifier=not-the-right-one", - "Code=25", - "EndMessage" - ); - fcpServer.writeLine( - "PutSuccessful", - "Identifier=" + identifier, - "URI=KSK@foo.txt", - "EndMessage" - ); - assertThat(key.get().get().getKey(), is("KSK@foo.txt")); - } - - @Test - public void clientPutAbortsOnProtocolErrorOtherThan25() - throws InterruptedException, ExecutionException, IOException { - Future> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute(); - connectNode(); - List lines = fcpServer.collectUntil(is("EndMessage")); - String identifier = extractIdentifier(lines); - fcpServer.writeLine( - "ProtocolError", - "Identifier=" + identifier, - "Code=1", - "EndMessage" - ); - assertThat(key.get().isPresent(), is(false)); - } + @Test + public void clientPutSendsNotificationsForGeneratedKeys() + throws InterruptedException, ExecutionException, IOException { + List generatedKeys = new CopyOnWriteArrayList<>(); + Future> key = fcpClient.clientPut() + .onKeyGenerated(generatedKeys::add) + .from(new ByteArrayInputStream("Hello\n".getBytes())) + .length(6) + .uri("KSK@foo.txt") + .execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("Hello")); + String identifier = extractIdentifier(lines); + fcpServer.writeLine( + "URIGenerated", + "Identifier=" + identifier, + "URI=KSK@foo.txt", + "EndMessage" + ); + replyWithPutSuccessful(identifier); + assertThat(key.get().get().getKey(), is("KSK@foo.txt")); + assertThat(generatedKeys, contains("KSK@foo.txt")); + } - @Test - public void clientPutSendsNotificationsForGeneratedKeys() - throws InterruptedException, ExecutionException, IOException { - List generatedKeys = new CopyOnWriteArrayList<>(); - Future> key = fcpClient.clientPut() - .onKeyGenerated(generatedKeys::add) - .from(new ByteArrayInputStream("Hello\n".getBytes())) - .length(6) - .uri("KSK@foo.txt") - .execute(); - connectNode(); - List lines = fcpServer.collectUntil(is("Hello")); - String identifier = extractIdentifier(lines); - fcpServer.writeLine( - "URIGenerated", - "Identifier=" + identifier, - "URI=KSK@foo.txt", - "EndMessage" - ); - replyWithPutSuccessful(identifier); - assertThat(key.get().get().getKey(), is("KSK@foo.txt")); - assertThat(generatedKeys, contains("KSK@foo.txt")); - } + @Test + public void clientPutSendsNotificationOnProgress() + throws InterruptedException, ExecutionException, IOException { + List requestProgress = new ArrayList<>(); + Future> key = fcpClient.clientPut() + .onProgress(requestProgress::add) + .from(new ByteArrayInputStream("Hello\n".getBytes())) + .length(6) + .uri("KSK@foo.txt") + .execute(); + connectNode(); + readMessage("Hello", () -> matchesDirectClientPut("Verbosity=1")); + replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8); + replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18); + replyWithPutSuccessful(identifier); + assertThat(key.get().get().getKey(), is("KSK@foo.txt")); + assertThat(requestProgress, contains( + isRequestProgress(1, 2, 3, 4, 5, 6, true, 8), + isRequestProgress(11, 12, 13, 14, 15, 16, false, 18) + )); + } - @Test - public void clientPutSendsNotificationOnProgress() throws InterruptedException, ExecutionException, IOException { - List requestProgress = new ArrayList<>(); - Future> key = fcpClient.clientPut() - .onProgress(requestProgress::add) - .from(new ByteArrayInputStream("Hello\n".getBytes())) - .length(6) - .uri("KSK@foo.txt") - .execute(); - connectNode(); - readMessage("Hello", () -> matchesDirectClientPut("Verbosity=1")); - replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8); - replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18); - replyWithPutSuccessful(identifier); - assertThat(key.get().get().getKey(), is("KSK@foo.txt")); - assertThat(requestProgress, contains( - isRequestProgress(1, 2, 3, 4, 5, 6, true, 8), - isRequestProgress(11, 12, 13, 14, 15, 16, false, 18) - )); } private void replyWithSimpleProgress( @@ -1663,33 +1669,46 @@ public class DefaultFcpClientTest { ); } - } + public class ClientPutDiskDir { - public class ClientPutDiskDir { + @Test + public void commandIsSentCorrectly() throws InterruptedException, ExecutionException, IOException { + Future> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute(); + connectAndAssert(this::matchesClientPutDiskDir); + fcpServer.writeLine("PutSuccessful", "Identifier=" + identifier, "URI=CHK@abc", "EndMessage"); + assertThat(key.get().get().getKey(), is("CHK@abc")); + } - @Test - public void commandIsSentCorrectly() throws InterruptedException, ExecutionException, IOException { - Future> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute(); - connectAndAssert(this::matchesClientPutDiskDir); - fcpServer.writeLine("PutSuccessful", "Identifier=" + identifier, "URI=CHK@abc", "EndMessage"); - assertThat(key.get().get().getKey(), is("CHK@abc")); - } + @Test + public void protocolErrorAbortsCommand() throws InterruptedException, ExecutionException, IOException { + Future> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute(); + connectAndAssert(this::matchesClientPutDiskDir); + replyWithProtocolError(); + assertThat(key.get().isPresent(), is(false)); + } - @Test - public void protocolErrorAbortsCommand() throws InterruptedException, ExecutionException, IOException { - Future> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute(); - connectAndAssert(this::matchesClientPutDiskDir); - replyWithProtocolError(); - assertThat(key.get().isPresent(), is(false)); - } + @Test + public void progressIsSentToConsumerCorrectly() throws InterruptedException, ExecutionException, IOException { + List requestProgress = new ArrayList<>(); + Future> key = fcpClient.clientPutDiskDir().onProgress(requestProgress::add) + .fromDirectory(new File("")).uri("CHK@").execute(); + connectAndAssert(() -> matchesClientPutDiskDir("Verbosity=1")); + replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8); + replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18); + fcpServer.writeLine("PutSuccessful", "Identifier=" + identifier, "URI=CHK@abc", "EndMessage"); + assertThat(key.get().get().getKey(), is("CHK@abc")); + assertThat(requestProgress, contains( + isRequestProgress(1, 2, 3, 4, 5, 6, true, 8), + isRequestProgress(11, 12, 13, 14, 15, 16, false, 18) + )); + } + + private Matcher> matchesClientPutDiskDir(String... additionalLines) { + List lines = new ArrayList<>(Arrays.asList("Identifier=" + identifier, "URI=CHK@", "Filename=" + new File("").getPath())); + Arrays.asList(additionalLines).forEach(lines::add); + return matchesFcpMessage("ClientPutDiskDir", lines.toArray(new String[lines.size()])); + } - private Matcher> matchesClientPutDiskDir() { - return matchesFcpMessage( - "ClientPutDiskDir", - "Identifier=" + identifier, - "URI=CHK@", - "Filename=" + new File("").getPath() - ); } }