X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2Fquelaton%2FDefaultFcpClientTest.java;fp=src%2Ftest%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2Fquelaton%2FDefaultFcpClientTest.java;h=f238d5a358346a6527954c5d82064e7be8ebee3e;hb=c2522a887d02f692017b4b9c8e1d31472e41001f;hp=e526f2f23d7b10f9282617b0a8fcad0c4b1f1f9a;hpb=f3d387a780cd3da40847ba199f492f7752bef50a;p=jFCPlib.git diff --git a/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java b/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java index e526f2f..f238d5a 100644 --- a/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java +++ b/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java @@ -58,10 +58,7 @@ import org.hamcrest.Matchers; import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.After; import org.junit.Assert; -import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; /** @@ -1333,316 +1330,325 @@ public class DefaultFcpClientTest { } - public class ClientPut { + public class PutCommands { - @Test - public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException { - fcpClient.clientPut() - .from(new ByteArrayInputStream("Hello\n".getBytes())) - .length(6) - .uri("KSK@foo.txt") - .execute(); - connectNode(); - readMessage("Hello", this::matchesDirectClientPut); - } - - @Test - public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException { - Future> key = fcpClient.clientPut() - .from(new ByteArrayInputStream("Hello\n".getBytes())) - .length(6) - .uri("KSK@foo.txt") - .execute(); - connectNode(); - readMessage("Hello", this::matchesDirectClientPut); - replyWithPutFailed("not-the-right-one"); - replyWithPutSuccessful(identifier); - assertThat(key.get().get().getKey(), is("KSK@foo.txt")); - } - - @Test - public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException { - Future> key = fcpClient.clientPut() - .from(new ByteArrayInputStream("Hello\n".getBytes())) - .length(6) - .uri("KSK@foo.txt") - .execute(); - connectNode(); - readMessage("Hello", this::matchesDirectClientPut); - replyWithPutSuccessful("not-the-right-one"); - replyWithPutFailed(identifier); - assertThat(key.get().isPresent(), is(false)); - } - - @Test - public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException { - fcpClient.clientPut() - .named("otherName.txt") - .from(new ByteArrayInputStream("Hello\n".getBytes())) - .length(6) - .uri("KSK@foo.txt") - .execute(); - connectNode(); - readMessage("Hello", () -> allOf( - hasHead("ClientPut"), - hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6", - "URI=KSK@foo.txt"), - hasTail("EndMessage", "Hello") - )); - } - - @Test - public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException { - fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute(); - connectAndAssert(() -> - matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt")); - } - - @Test - public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException { - fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute(); - connectAndAssert(() -> - matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt")); - } - - public class DDA { + public class ClientPut { - private final File ddaFile; - private final File fileToUpload; - - public DDA() throws IOException { - ddaFile = createDdaFile(); - fileToUpload = new File(ddaFile.getParent(), "test.dat"); + @Test + public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException { + fcpClient.clientPut() + .from(new ByteArrayInputStream("Hello\n".getBytes())) + .length(6) + .uri("KSK@foo.txt") + .execute(); + connectNode(); + readMessage("Hello", this::matchesDirectClientPut); } - private Matcher> matchesFileClientPut(File file) { - return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file); + @Test + public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException { + Future> key = fcpClient.clientPut() + .from(new ByteArrayInputStream("Hello\n".getBytes())) + .length(6) + .uri("KSK@foo.txt") + .execute(); + connectNode(); + readMessage("Hello", this::matchesDirectClientPut); + replyWithPutFailed("not-the-right-one"); + replyWithPutSuccessful(identifier); + assertThat(key.get().get().getKey(), is("KSK@foo.txt")); } @Test - public void completeDda() throws IOException, ExecutionException, InterruptedException { - fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute(); - connectAndAssert(() -> matchesFileClientPut(fileToUpload)); - sendDdaRequired(identifier); - readMessage(() -> matchesTestDDARequest(ddaFile)); - sendTestDDAReply(ddaFile.getParent(), ddaFile); - readMessage(() -> matchesTestDDAResponse(ddaFile)); - writeTestDDAComplete(ddaFile); - readMessage(() -> matchesFileClientPut(fileToUpload)); + public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException { + Future> key = fcpClient.clientPut() + .from(new ByteArrayInputStream("Hello\n".getBytes())) + .length(6) + .uri("KSK@foo.txt") + .execute(); + connectNode(); + readMessage("Hello", this::matchesDirectClientPut); + replyWithPutSuccessful("not-the-right-one"); + replyWithPutFailed(identifier); + assertThat(key.get().isPresent(), is(false)); } @Test - public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException { - fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute(); - connectAndAssert(() -> matchesFileClientPut(fileToUpload)); - sendDdaRequired(identifier); - readMessage(() -> matchesTestDDARequest(ddaFile)); - sendTestDDAReply("/some-other-directory", ddaFile); - sendTestDDAReply(ddaFile.getParent(), ddaFile); - readMessage(() -> matchesTestDDAResponse(ddaFile)); + public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException { + fcpClient.clientPut() + .named("otherName.txt") + .from(new ByteArrayInputStream("Hello\n".getBytes())) + .length(6) + .uri("KSK@foo.txt") + .execute(); + connectNode(); + readMessage("Hello", () -> allOf( + hasHead("ClientPut"), + hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6", + "URI=KSK@foo.txt"), + hasTail("EndMessage", "Hello") + )); } @Test - public void sendResponseIfFileUnreadable() throws IOException, ExecutionException, InterruptedException { - fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute(); - connectAndAssert(() -> matchesFileClientPut(fileToUpload)); - sendDdaRequired(identifier); - readMessage(() -> matchesTestDDARequest(ddaFile)); - sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo")); - readMessage(this::matchesFailedToReadResponse); + public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException { + fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute(); + connectAndAssert(() -> + matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt")); } @Test - public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory() - throws IOException, ExecutionException, InterruptedException { - fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute(); - connectNode(); - List lines = fcpServer.collectUntil(is("EndMessage")); - String identifier = extractIdentifier(lines); - fcpServer.writeLine( - "TestDDAComplete", - "Directory=/some-other-directory", - "EndMessage" - ); - sendDdaRequired(identifier); - lines = fcpServer.collectUntil(is("EndMessage")); - assertThat(lines, matchesFcpMessage( - "TestDDARequest", - "Directory=" + ddaFile.getParent(), - "WantReadDirectory=true", - "WantWriteDirectory=false" - )); + public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException { + fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute(); + connectAndAssert(() -> + matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt")); } - private Matcher> matchesFailedToReadResponse() { - return matchesFcpMessage( - "TestDDAResponse", - "Directory=" + ddaFile.getParent(), - "ReadContent=failed-to-read" + public class DDA { + + private final File ddaFile; + private final File fileToUpload; + + public DDA() throws IOException { + ddaFile = createDdaFile(); + fileToUpload = new File(ddaFile.getParent(), "test.dat"); + } + + private Matcher> matchesFileClientPut(File file) { + return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file); + } + + @Test + public void completeDda() throws IOException, ExecutionException, InterruptedException { + fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute(); + connectAndAssert(() -> matchesFileClientPut(fileToUpload)); + sendDdaRequired(identifier); + readMessage(() -> matchesTestDDARequest(ddaFile)); + sendTestDDAReply(ddaFile.getParent(), ddaFile); + readMessage(() -> matchesTestDDAResponse(ddaFile)); + writeTestDDAComplete(ddaFile); + readMessage(() -> matchesFileClientPut(fileToUpload)); + } + + @Test + public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException { + fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute(); + connectAndAssert(() -> matchesFileClientPut(fileToUpload)); + sendDdaRequired(identifier); + readMessage(() -> matchesTestDDARequest(ddaFile)); + sendTestDDAReply("/some-other-directory", ddaFile); + sendTestDDAReply(ddaFile.getParent(), ddaFile); + readMessage(() -> matchesTestDDAResponse(ddaFile)); + } + + @Test + public void sendResponseIfFileUnreadable() + throws IOException, ExecutionException, InterruptedException { + fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute(); + connectAndAssert(() -> matchesFileClientPut(fileToUpload)); + sendDdaRequired(identifier); + readMessage(() -> matchesTestDDARequest(ddaFile)); + sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo")); + readMessage(this::matchesFailedToReadResponse); + } + + @Test + public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory() + throws IOException, ExecutionException, InterruptedException { + fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); + fcpServer.writeLine( + "TestDDAComplete", + "Directory=/some-other-directory", + "EndMessage" + ); + sendDdaRequired(identifier); + lines = fcpServer.collectUntil(is("EndMessage")); + assertThat(lines, matchesFcpMessage( + "TestDDARequest", + "Directory=" + ddaFile.getParent(), + "WantReadDirectory=true", + "WantWriteDirectory=false" + )); + } + + private Matcher> matchesFailedToReadResponse() { + return matchesFcpMessage( + "TestDDAResponse", + "Directory=" + ddaFile.getParent(), + "ReadContent=failed-to-read" + ); + } + + private void writeTestDDAComplete(File tempFile) throws IOException { + fcpServer.writeLine( + "TestDDAComplete", + "Directory=" + tempFile.getParent(), + "ReadDirectoryAllowed=true", + "EndMessage" + ); + } + + private Matcher> matchesTestDDAResponse(File tempFile) { + return matchesFcpMessage( + "TestDDAResponse", + "Directory=" + tempFile.getParent(), + "ReadContent=test-content" + ); + } + + private void sendTestDDAReply(String directory, File tempFile) throws IOException { + fcpServer.writeLine( + "TestDDAReply", + "Directory=" + directory, + "ReadFilename=" + tempFile, + "EndMessage" + ); + } + + private Matcher> matchesTestDDARequest(File tempFile) { + return matchesFcpMessage( + "TestDDARequest", + "Directory=" + tempFile.getParent(), + "WantReadDirectory=true", + "WantWriteDirectory=false" + ); + } + + private void sendDdaRequired(String identifier) throws IOException { + fcpServer.writeLine( + "ProtocolError", + "Identifier=" + identifier, + "Code=25", + "EndMessage" + ); + } + + } + + private void replyWithPutSuccessful(String identifier) throws IOException { + fcpServer.writeLine( + "PutSuccessful", + "URI=KSK@foo.txt", + "Identifier=" + identifier, + "EndMessage" ); } - private void writeTestDDAComplete(File tempFile) throws IOException { + private void replyWithPutFailed(String identifier) throws IOException { fcpServer.writeLine( - "TestDDAComplete", - "Directory=" + tempFile.getParent(), - "ReadDirectoryAllowed=true", + "PutFailed", + "Identifier=" + identifier, "EndMessage" ); } - private Matcher> matchesTestDDAResponse(File tempFile) { - return matchesFcpMessage( - "TestDDAResponse", - "Directory=" + tempFile.getParent(), - "ReadContent=test-content" + private Matcher> matchesDirectClientPut(String... additionalLines) { + List lines = + new ArrayList<>(Arrays.asList("UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt")); + Arrays.asList(additionalLines).forEach(lines::add); + return allOf( + hasHead("ClientPut"), + hasParameters(1, 2, lines.toArray(new String[lines.size()])), + hasTail("EndMessage", "Hello") ); } - private void sendTestDDAReply(String directory, File tempFile) throws IOException { + private File createDdaFile() throws IOException { + File tempFile = File.createTempFile("test-dda-", ".dat"); + tempFile.deleteOnExit(); + Files.write("test-content", tempFile, StandardCharsets.UTF_8); + return tempFile; + } + + @Test + public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier() + throws InterruptedException, ExecutionException, IOException { + Future> key = + fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); fcpServer.writeLine( - "TestDDAReply", - "Directory=" + directory, - "ReadFilename=" + tempFile, + "ProtocolError", + "Identifier=not-the-right-one", + "Code=25", "EndMessage" ); - } - - private Matcher> matchesTestDDARequest(File tempFile) { - return matchesFcpMessage( - "TestDDARequest", - "Directory=" + tempFile.getParent(), - "WantReadDirectory=true", - "WantWriteDirectory=false" + fcpServer.writeLine( + "PutSuccessful", + "Identifier=" + identifier, + "URI=KSK@foo.txt", + "EndMessage" ); + assertThat(key.get().get().getKey(), is("KSK@foo.txt")); } - private void sendDdaRequired(String identifier) throws IOException { + @Test + public void clientPutAbortsOnProtocolErrorOtherThan25() + throws InterruptedException, ExecutionException, IOException { + Future> key = + fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); fcpServer.writeLine( "ProtocolError", "Identifier=" + identifier, - "Code=25", + "Code=1", "EndMessage" ); + assertThat(key.get().isPresent(), is(false)); } - } - - private void replyWithPutSuccessful(String identifier) throws IOException { - fcpServer.writeLine( - "PutSuccessful", - "URI=KSK@foo.txt", - "Identifier=" + identifier, - "EndMessage" - ); - } - - private void replyWithPutFailed(String identifier) throws IOException { - fcpServer.writeLine( - "PutFailed", - "Identifier=" + identifier, - "EndMessage" - ); - } - - private Matcher> matchesDirectClientPut(String... additionalLines) { - List lines = new ArrayList<>(Arrays.asList("UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt")); - Arrays.asList(additionalLines).forEach(lines::add); - return allOf( - hasHead("ClientPut"), - hasParameters(1, 2, lines.toArray(new String[lines.size()])), - hasTail("EndMessage", "Hello") - ); - } - - private File createDdaFile() throws IOException { - File tempFile = File.createTempFile("test-dda-", ".dat"); - tempFile.deleteOnExit(); - Files.write("test-content", tempFile, StandardCharsets.UTF_8); - return tempFile; - } - - @Test - public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier() - throws InterruptedException, ExecutionException, IOException { - Future> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute(); - connectNode(); - List lines = fcpServer.collectUntil(is("EndMessage")); - String identifier = extractIdentifier(lines); - fcpServer.writeLine( - "ProtocolError", - "Identifier=not-the-right-one", - "Code=25", - "EndMessage" - ); - fcpServer.writeLine( - "PutSuccessful", - "Identifier=" + identifier, - "URI=KSK@foo.txt", - "EndMessage" - ); - assertThat(key.get().get().getKey(), is("KSK@foo.txt")); - } - - @Test - public void clientPutAbortsOnProtocolErrorOtherThan25() - throws InterruptedException, ExecutionException, IOException { - Future> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute(); - connectNode(); - List lines = fcpServer.collectUntil(is("EndMessage")); - String identifier = extractIdentifier(lines); - fcpServer.writeLine( - "ProtocolError", - "Identifier=" + identifier, - "Code=1", - "EndMessage" - ); - assertThat(key.get().isPresent(), is(false)); - } + @Test + public void clientPutSendsNotificationsForGeneratedKeys() + throws InterruptedException, ExecutionException, IOException { + List generatedKeys = new CopyOnWriteArrayList<>(); + Future> key = fcpClient.clientPut() + .onKeyGenerated(generatedKeys::add) + .from(new ByteArrayInputStream("Hello\n".getBytes())) + .length(6) + .uri("KSK@foo.txt") + .execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("Hello")); + String identifier = extractIdentifier(lines); + fcpServer.writeLine( + "URIGenerated", + "Identifier=" + identifier, + "URI=KSK@foo.txt", + "EndMessage" + ); + replyWithPutSuccessful(identifier); + assertThat(key.get().get().getKey(), is("KSK@foo.txt")); + assertThat(generatedKeys, contains("KSK@foo.txt")); + } - @Test - public void clientPutSendsNotificationsForGeneratedKeys() - throws InterruptedException, ExecutionException, IOException { - List generatedKeys = new CopyOnWriteArrayList<>(); - Future> key = fcpClient.clientPut() - .onKeyGenerated(generatedKeys::add) - .from(new ByteArrayInputStream("Hello\n".getBytes())) - .length(6) - .uri("KSK@foo.txt") - .execute(); - connectNode(); - List lines = fcpServer.collectUntil(is("Hello")); - String identifier = extractIdentifier(lines); - fcpServer.writeLine( - "URIGenerated", - "Identifier=" + identifier, - "URI=KSK@foo.txt", - "EndMessage" - ); - replyWithPutSuccessful(identifier); - assertThat(key.get().get().getKey(), is("KSK@foo.txt")); - assertThat(generatedKeys, contains("KSK@foo.txt")); - } + @Test + public void clientPutSendsNotificationOnProgress() + throws InterruptedException, ExecutionException, IOException { + List requestProgress = new ArrayList<>(); + Future> key = fcpClient.clientPut() + .onProgress(requestProgress::add) + .from(new ByteArrayInputStream("Hello\n".getBytes())) + .length(6) + .uri("KSK@foo.txt") + .execute(); + connectNode(); + readMessage("Hello", () -> matchesDirectClientPut("Verbosity=1")); + replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8); + replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18); + replyWithPutSuccessful(identifier); + assertThat(key.get().get().getKey(), is("KSK@foo.txt")); + assertThat(requestProgress, contains( + isRequestProgress(1, 2, 3, 4, 5, 6, true, 8), + isRequestProgress(11, 12, 13, 14, 15, 16, false, 18) + )); + } - @Test - public void clientPutSendsNotificationOnProgress() throws InterruptedException, ExecutionException, IOException { - List requestProgress = new ArrayList<>(); - Future> key = fcpClient.clientPut() - .onProgress(requestProgress::add) - .from(new ByteArrayInputStream("Hello\n".getBytes())) - .length(6) - .uri("KSK@foo.txt") - .execute(); - connectNode(); - readMessage("Hello", () -> matchesDirectClientPut("Verbosity=1")); - replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8); - replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18); - replyWithPutSuccessful(identifier); - assertThat(key.get().get().getKey(), is("KSK@foo.txt")); - assertThat(requestProgress, contains( - isRequestProgress(1, 2, 3, 4, 5, 6, true, 8), - isRequestProgress(11, 12, 13, 14, 15, 16, false, 18) - )); } private void replyWithSimpleProgress( @@ -1663,33 +1669,46 @@ public class DefaultFcpClientTest { ); } - } + public class ClientPutDiskDir { - public class ClientPutDiskDir { + @Test + public void commandIsSentCorrectly() throws InterruptedException, ExecutionException, IOException { + Future> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute(); + connectAndAssert(this::matchesClientPutDiskDir); + fcpServer.writeLine("PutSuccessful", "Identifier=" + identifier, "URI=CHK@abc", "EndMessage"); + assertThat(key.get().get().getKey(), is("CHK@abc")); + } - @Test - public void commandIsSentCorrectly() throws InterruptedException, ExecutionException, IOException { - Future> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute(); - connectAndAssert(this::matchesClientPutDiskDir); - fcpServer.writeLine("PutSuccessful", "Identifier=" + identifier, "URI=CHK@abc", "EndMessage"); - assertThat(key.get().get().getKey(), is("CHK@abc")); - } + @Test + public void protocolErrorAbortsCommand() throws InterruptedException, ExecutionException, IOException { + Future> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute(); + connectAndAssert(this::matchesClientPutDiskDir); + replyWithProtocolError(); + assertThat(key.get().isPresent(), is(false)); + } - @Test - public void protocolErrorAbortsCommand() throws InterruptedException, ExecutionException, IOException { - Future> key = fcpClient.clientPutDiskDir().fromDirectory(new File("")).uri("CHK@").execute(); - connectAndAssert(this::matchesClientPutDiskDir); - replyWithProtocolError(); - assertThat(key.get().isPresent(), is(false)); - } + @Test + public void progressIsSentToConsumerCorrectly() throws InterruptedException, ExecutionException, IOException { + List requestProgress = new ArrayList<>(); + Future> key = fcpClient.clientPutDiskDir().onProgress(requestProgress::add) + .fromDirectory(new File("")).uri("CHK@").execute(); + connectAndAssert(() -> matchesClientPutDiskDir("Verbosity=1")); + replyWithSimpleProgress(1, 2, 3, 4, 5, 6, true, 8); + replyWithSimpleProgress(11, 12, 13, 14, 15, 16, false, 18); + fcpServer.writeLine("PutSuccessful", "Identifier=" + identifier, "URI=CHK@abc", "EndMessage"); + assertThat(key.get().get().getKey(), is("CHK@abc")); + assertThat(requestProgress, contains( + isRequestProgress(1, 2, 3, 4, 5, 6, true, 8), + isRequestProgress(11, 12, 13, 14, 15, 16, false, 18) + )); + } + + private Matcher> matchesClientPutDiskDir(String... additionalLines) { + List lines = new ArrayList<>(Arrays.asList("Identifier=" + identifier, "URI=CHK@", "Filename=" + new File("").getPath())); + Arrays.asList(additionalLines).forEach(lines::add); + return matchesFcpMessage("ClientPutDiskDir", lines.toArray(new String[lines.size()])); + } - private Matcher> matchesClientPutDiskDir() { - return matchesFcpMessage( - "ClientPutDiskDir", - "Identifier=" + identifier, - "URI=CHK@", - "Filename=" + new File("").getPath() - ); } }