X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;ds=inline;f=src%2Ftest%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2Fquelaton%2FDefaultFcpClientTest.java;h=428ee045a1652030a87f632c8f23f5bcbfb44547;hb=a09c7dc8182d6238b8e635d0da96ed19b3ca87bd;hp=da454df3b145cddaa14697695cad0eb12d7b33d1;hpb=125c4277dad06cbd413f4d2b4d41a16a65c7b286;p=jFCPlib.git diff --git a/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java b/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java index da454df..428ee04 100644 --- a/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java +++ b/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java @@ -1,38 +1,69 @@ package net.pterodactylus.fcp.quelaton; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.startsWith; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; +import java.net.URL; import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Optional; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import net.pterodactylus.fcp.ARK; +import net.pterodactylus.fcp.ConfigData; +import net.pterodactylus.fcp.DSAGroup; import net.pterodactylus.fcp.FcpKeyPair; import net.pterodactylus.fcp.Key; +import net.pterodactylus.fcp.NodeData; +import net.pterodactylus.fcp.NodeRef; +import net.pterodactylus.fcp.Peer; +import net.pterodactylus.fcp.PeerNote; +import net.pterodactylus.fcp.PluginInfo; import net.pterodactylus.fcp.Priority; import net.pterodactylus.fcp.fake.FakeTcpServer; import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data; import com.google.common.io.ByteStreams; import com.google.common.io.Files; +import com.nitorcreations.junit.runners.NestedRunner; import org.hamcrest.Description; import org.hamcrest.Matcher; +import org.hamcrest.Matchers; import org.hamcrest.TypeSafeDiagnosingMatcher; import org.junit.After; +import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; /** * Unit test for {@link DefaultFcpClient}. * * @author David ‘Bombe’ Roden */ +@RunWith(NestedRunner.class) public class DefaultFcpClientTest { private static final String INSERT_URI = @@ -40,7 +71,7 @@ public class DefaultFcpClientTest { private static final String REQUEST_URI = "SSK@wtbgd2loNcJCXvtQVOftl2tuWBomDQHfqS6ytpPRhfw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQACAAE/"; - private static int threadCounter = 0; + private int threadCounter = 0; private final ExecutorService threadPool = Executors.newCachedThreadPool(r -> new Thread(r, "Test-Thread-" + threadCounter++)); private final FakeTcpServer fcpServer; @@ -48,27 +79,13 @@ public class DefaultFcpClientTest { public DefaultFcpClientTest() throws IOException { fcpServer = new FakeTcpServer(threadPool); - fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test", () -> "2.0"); + fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test"); } @After public void tearDown() throws IOException { fcpServer.close(); - } - - @Test - public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException { - Future keyPairFuture = fcpClient.generateKeypair().execute(); - connectNode(); - fcpServer.collectUntil(is("EndMessage")); - fcpServer.writeLine("SSKKeypair", - "InsertURI=" + INSERT_URI + "", - "RequestURI=" + REQUEST_URI + "", - "Identifier=My Identifier from GenerateSSK", - "EndMessage"); - FcpKeyPair keyPair = keyPairFuture.get(); - assertThat(keyPair.getPublicKey(), is(REQUEST_URI)); - assertThat(keyPair.getPrivateKey(), is(INSERT_URI)); + threadPool.shutdown(); } private void connectNode() throws InterruptedException, ExecutionException, IOException { @@ -90,30 +107,6 @@ public class DefaultFcpClientTest { ); } - @Test - public void clientGetCanDownloadData() throws InterruptedException, ExecutionException, IOException { - Future> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt"); - connectNode(); - List lines = fcpServer.collectUntil(is("EndMessage")); - assertThat(lines, matchesFcpMessage("ClientGet", "ReturnType=direct", "URI=KSK@foo.txt")); - String identifier = extractIdentifier(lines); - fcpServer.writeLine( - "AllData", - "Identifier=" + identifier, - "DataLength=6", - "StartupTime=1435610539000", - "CompletionTime=1435610540000", - "Metadata.ContentType=text/plain;charset=utf-8", - "Data", - "Hello" - ); - Optional data = dataFuture.get(); - assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8")); - assertThat(data.get().size(), is(6L)); - assertThat(ByteStreams.toByteArray(data.get().getInputStream()), - is("Hello\n".getBytes(StandardCharsets.UTF_8))); - } - private String extractIdentifier(List lines) { return lines.stream() .filter(s -> s.startsWith("Identifier=")) @@ -123,157 +116,105 @@ public class DefaultFcpClientTest { } @Test - public void clientGetDownloadsDataForCorrectIdentifier() - throws InterruptedException, ExecutionException, IOException { - Future> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt"); + public void defaultFcpClientReusesConnection() throws InterruptedException, ExecutionException, IOException { + Future keyPair = fcpClient.generateKeypair().execute(); connectNode(); List lines = fcpServer.collectUntil(is("EndMessage")); - assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt")); String identifier = extractIdentifier(lines); fcpServer.writeLine( - "AllData", - "Identifier=not-test", - "DataLength=12", - "StartupTime=1435610539000", - "CompletionTime=1435610540000", - "Metadata.ContentType=text/plain;charset=latin-9", - "Data", - "Hello World" - ); - fcpServer.writeLine( - "AllData", + "SSKKeypair", + "InsertURI=" + INSERT_URI + "", + "RequestURI=" + REQUEST_URI + "", "Identifier=" + identifier, - "DataLength=6", - "StartupTime=1435610539000", - "CompletionTime=1435610540000", - "Metadata.ContentType=text/plain;charset=utf-8", - "Data", - "Hello" + "EndMessage" ); - Optional data = dataFuture.get(); - assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8")); - assertThat(data.get().size(), is(6L)); - assertThat(ByteStreams.toByteArray(data.get().getInputStream()), - is("Hello\n".getBytes(StandardCharsets.UTF_8))); - } - - @Test - public void clientGetRecognizesGetFailed() throws InterruptedException, ExecutionException, IOException { - Future> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt"); - connectNode(); - List lines = fcpServer.collectUntil(is("EndMessage")); - assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt")); - String identifier = extractIdentifier(lines); + keyPair.get(); + keyPair = fcpClient.generateKeypair().execute(); + lines = fcpServer.collectUntil(is("EndMessage")); + identifier = extractIdentifier(lines); fcpServer.writeLine( - "GetFailed", + "SSKKeypair", + "InsertURI=" + INSERT_URI + "", + "RequestURI=" + REQUEST_URI + "", "Identifier=" + identifier, - "Code=3", "EndMessage" ); - Optional data = dataFuture.get(); - assertThat(data.isPresent(), is(false)); + keyPair.get(); } @Test - public void clientGetRecognizesGetFailedForCorrectIdentifier() + public void defaultFcpClientCanReconnectAfterConnectionHasBeenClosed() throws InterruptedException, ExecutionException, IOException { - Future> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt"); + Future keyPair = fcpClient.generateKeypair().execute(); + connectNode(); + fcpServer.collectUntil(is("EndMessage")); + fcpServer.close(); + try { + keyPair.get(); + Assert.fail(); + } catch (ExecutionException e) { + } + keyPair = fcpClient.generateKeypair().execute(); connectNode(); List lines = fcpServer.collectUntil(is("EndMessage")); - assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt")); String identifier = extractIdentifier(lines); fcpServer.writeLine( - "GetFailed", - "Identifier=not-test", - "Code=3", - "EndMessage" - ); - fcpServer.writeLine( - "GetFailed", + "SSKKeypair", + "InsertURI=" + INSERT_URI + "", + "RequestURI=" + REQUEST_URI + "", "Identifier=" + identifier, - "Code=3", "EndMessage" ); - Optional data = dataFuture.get(); - assertThat(data.isPresent(), is(false)); - } - - @Test - public void clientGetRecognizesConnectionClosed() throws InterruptedException, ExecutionException, IOException { - Future> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt"); - connectNode(); - List lines = fcpServer.collectUntil(is("EndMessage")); - assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt")); - fcpServer.close(); - Optional data = dataFuture.get(); - assertThat(data.isPresent(), is(false)); - } - - @Test - public void clientGetWithIgnoreDataStoreSettingSendsCorrectCommands() - throws InterruptedException, ExecutionException, IOException { - fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt"); - connectNode(); - List lines = fcpServer.collectUntil(is("EndMessage")); - assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true")); + keyPair.get(); } - @Test - public void clientGetWithDataStoreOnlySettingSendsCorrectCommands() - throws InterruptedException, ExecutionException, IOException { - fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt"); - connectNode(); - List lines = fcpServer.collectUntil(is("EndMessage")); - assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true")); + private Matcher> matchesFcpMessage(String name, String... requiredLines) { + return matchesFcpMessageWithTerminator(name, "EndMessage", requiredLines); } - @Test - public void clientGetWithMaxSizeSettingSendsCorrectCommands() - throws InterruptedException, ExecutionException, IOException { - fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt"); - connectNode(); - List lines = fcpServer.collectUntil(is("EndMessage")); - assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576")); + private Matcher> matchesDataMessage(String name, String... requiredLines) { + return matchesFcpMessageWithTerminator(name, "Data", requiredLines); } - @Test - public void clientGetWithPrioritySettingSendsCorrectCommands() - throws InterruptedException, ExecutionException, IOException { - fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt"); - connectNode(); - List lines = fcpServer.collectUntil(is("EndMessage")); - assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1")); - } + private Matcher> hasHead(String firstElement) { + return new TypeSafeDiagnosingMatcher>() { + @Override + protected boolean matchesSafely(Iterable iterable, Description mismatchDescription) { + if (!iterable.iterator().hasNext()) { + mismatchDescription.appendText("is empty"); + return false; + } + String element = iterable.iterator().next(); + if (!element.equals(firstElement)) { + mismatchDescription.appendText("starts with ").appendValue(element); + return false; + } + return true; + } - @Test - public void clientGetWithRealTimeSettingSendsCorrectCommands() - throws InterruptedException, ExecutionException, IOException { - fcpClient.clientGet().realTime().uri("KSK@foo.txt"); - connectNode(); - List lines = fcpServer.collectUntil(is("EndMessage")); - assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true")); + @Override + public void describeTo(Description description) { + description.appendText("starts with ").appendValue(firstElement); + } + }; } - @Test - public void clientGetWithGlobalSettingSendsCorrectCommands() - throws InterruptedException, ExecutionException, IOException { - fcpClient.clientGet().global().uri("KSK@foo.txt"); - connectNode(); - List lines = fcpServer.collectUntil(is("EndMessage")); - assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true")); + private Matcher> matchesFcpMessageWithTerminator( + String name, String terminator, String... requiredLines) { + return allOf(hasHead(name), hasParameters(1, 1, requiredLines), hasTail(terminator)); } - private Matcher> matchesFcpMessage(String name, String... requiredLines) { + private Matcher> hasParameters(int ignoreStart, int ignoreEnd, String... lines) { return new TypeSafeDiagnosingMatcher>() { @Override protected boolean matchesSafely(List item, Description mismatchDescription) { - if (!item.get(0).equals(name)) { - mismatchDescription.appendText("FCP message is named ").appendValue(item.get(0)); + if (item.size() < (ignoreStart + ignoreEnd)) { + mismatchDescription.appendText("has only ").appendValue(item.size()).appendText(" elements"); return false; } - for (String requiredLine : requiredLines) { - if (item.indexOf(requiredLine) < 1) { - mismatchDescription.appendText("FCP message does not contain ").appendValue(requiredLine); + for (String line : lines) { + if ((item.indexOf(line) < ignoreStart) || (item.indexOf(line) >= (item.size() - ignoreEnd))) { + mismatchDescription.appendText("does not contains ").appendValue(line); return false; } } @@ -282,8 +223,32 @@ public class DefaultFcpClientTest { @Override public void describeTo(Description description) { - description.appendText("FCP message named ").appendValue(name); - description.appendValueList(", containing the lines ", ", ", "", requiredLines); + description.appendText("contains ").appendValueList("(", ", ", ")", lines); + description.appendText(", ignoring the first ").appendValue(ignoreStart); + description.appendText(" and the last ").appendValue(ignoreEnd); + } + }; + } + + private Matcher> hasTail(String... lastElements) { + return new TypeSafeDiagnosingMatcher>() { + @Override + protected boolean matchesSafely(List list, Description mismatchDescription) { + if (list.size() < lastElements.length) { + mismatchDescription.appendText("is too small"); + return false; + } + List tail = list.subList(list.size() - lastElements.length, list.size()); + if (!tail.equals(Arrays.asList(lastElements))) { + mismatchDescription.appendText("ends with ").appendValueList("(", ", ", ")", tail); + return false; + } + return true; + } + + @Override + public void describeTo(Description description) { + description.appendText("ends with ").appendValueList("(", ", ", ")", lastElements); } }; } @@ -294,10 +259,15 @@ public class DefaultFcpClientTest { fcpClient.clientPut() .from(new ByteArrayInputStream("Hello\n".getBytes())) .length(6) - .key(new Key("KSK@foo.txt")); + .uri("KSK@foo.txt") + .execute(); connectNode(); List lines = fcpServer.collectUntil(is("Hello")); - assertThat(lines, matchesFcpMessage("ClientPut", "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt")); + assertThat(lines, allOf( + hasHead("ClientPut"), + hasParameters(1, 2, "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"), + hasTail("EndMessage", "Hello") + )); } @Test @@ -306,7 +276,8 @@ public class DefaultFcpClientTest { Future> key = fcpClient.clientPut() .from(new ByteArrayInputStream("Hello\n".getBytes())) .length(6) - .key(new Key("KSK@foo.txt")); + .uri("KSK@foo.txt") + .execute(); connectNode(); List lines = fcpServer.collectUntil(is("Hello")); String identifier = extractIdentifier(lines); @@ -330,7 +301,8 @@ public class DefaultFcpClientTest { Future> key = fcpClient.clientPut() .from(new ByteArrayInputStream("Hello\n".getBytes())) .length(6) - .key(new Key("KSK@foo.txt")); + .uri("KSK@foo.txt") + .execute(); connectNode(); List lines = fcpServer.collectUntil(is("Hello")); String identifier = extractIdentifier(lines); @@ -355,17 +327,21 @@ public class DefaultFcpClientTest { .named("otherName.txt") .from(new ByteArrayInputStream("Hello\n".getBytes())) .length(6) - .key(new Key("KSK@foo.txt")); + .uri("KSK@foo.txt") + .execute(); connectNode(); List lines = fcpServer.collectUntil(is("Hello")); - assertThat(lines, matchesFcpMessage("ClientPut", "TargetFilename=otherName.txt", "UploadFrom=direct", - "DataLength=6", "URI=KSK@foo.txt")); + assertThat(lines, allOf( + hasHead("ClientPut"), + hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"), + hasTail("EndMessage", "Hello") + )); } @Test public void clientPutWithRedirectSendsCorrectCommand() throws IOException, ExecutionException, InterruptedException { - fcpClient.clientPut().redirectTo(new Key("KSK@bar.txt")).key(new Key("KSK@foo.txt")); + fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute(); connectNode(); List lines = fcpServer.collectUntil(is("EndMessage")); assertThat(lines, @@ -374,7 +350,7 @@ public class DefaultFcpClientTest { @Test public void clientPutWithFileSendsCorrectCommand() throws InterruptedException, ExecutionException, IOException { - fcpClient.clientPut().from(new File("/tmp/data.txt")).key(new Key("KSK@foo.txt")); + fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute(); connectNode(); List lines = fcpServer.collectUntil(is("EndMessage")); assertThat(lines, @@ -385,7 +361,7 @@ public class DefaultFcpClientTest { public void clientPutWithFileCanCompleteTestDdaSequence() throws IOException, ExecutionException, InterruptedException { File tempFile = createTempFile(); - fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).key(new Key("KSK@foo.txt")); + fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute(); connectNode(); List lines = fcpServer.collectUntil(is("EndMessage")); String identifier = extractIdentifier(lines); @@ -400,8 +376,7 @@ public class DefaultFcpClientTest { "TestDDARequest", "Directory=" + tempFile.getParent(), "WantReadDirectory=true", - "WantWriteDirectory=false", - "EndMessage" + "WantWriteDirectory=false" )); fcpServer.writeLine( "TestDDAReply", @@ -413,8 +388,7 @@ public class DefaultFcpClientTest { assertThat(lines, matchesFcpMessage( "TestDDAResponse", "Directory=" + tempFile.getParent(), - "ReadContent=test-content", - "EndMessage" + "ReadContent=test-content" )); fcpServer.writeLine( "TestDDAComplete", @@ -438,7 +412,7 @@ public class DefaultFcpClientTest { @Test public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier() throws InterruptedException, ExecutionException, IOException { - Future> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).key(new Key("KSK@foo.txt")); + Future> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute(); connectNode(); List lines = fcpServer.collectUntil(is("EndMessage")); String identifier = extractIdentifier(lines); @@ -460,7 +434,7 @@ public class DefaultFcpClientTest { @Test public void clientPutAbortsOnProtocolErrorOtherThan25() throws InterruptedException, ExecutionException, IOException { - Future> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).key(new Key("KSK@foo.txt")); + Future> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute(); connectNode(); List lines = fcpServer.collectUntil(is("EndMessage")); String identifier = extractIdentifier(lines); @@ -477,7 +451,7 @@ public class DefaultFcpClientTest { public void clientPutDoesNotReplyToWrongTestDdaReply() throws IOException, ExecutionException, InterruptedException { File tempFile = createTempFile(); - fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).key(new Key("KSK@foo.txt")); + fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute(); connectNode(); List lines = fcpServer.collectUntil(is("EndMessage")); String identifier = extractIdentifier(lines); @@ -492,8 +466,7 @@ public class DefaultFcpClientTest { "TestDDARequest", "Directory=" + tempFile.getParent(), "WantReadDirectory=true", - "WantWriteDirectory=false", - "EndMessage" + "WantWriteDirectory=false" )); fcpServer.writeLine( "TestDDAReply", @@ -511,9 +484,1493 @@ public class DefaultFcpClientTest { assertThat(lines, matchesFcpMessage( "TestDDAResponse", "Directory=" + tempFile.getParent(), - "ReadContent=test-content", + "ReadContent=test-content" + )); + } + + @Test + public void clientPutSendsResponseEvenIfFileCanNotBeRead() + throws IOException, ExecutionException, InterruptedException { + File tempFile = createTempFile(); + fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); + fcpServer.writeLine( + "ProtocolError", + "Identifier=" + identifier, + "Code=25", + "EndMessage" + ); + lines = fcpServer.collectUntil(is("EndMessage")); + assertThat(lines, matchesFcpMessage( + "TestDDARequest", + "Directory=" + tempFile.getParent(), + "WantReadDirectory=true", + "WantWriteDirectory=false" + )); + fcpServer.writeLine( + "TestDDAReply", + "Directory=" + tempFile.getParent(), + "ReadFilename=" + tempFile + ".foo", + "EndMessage" + ); + lines = fcpServer.collectUntil(is("EndMessage")); + assertThat(lines, matchesFcpMessage( + "TestDDAResponse", + "Directory=" + tempFile.getParent(), + "ReadContent=failed-to-read" + )); + } + + @Test + public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory() + throws IOException, ExecutionException, InterruptedException { + File tempFile = createTempFile(); + fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); + fcpServer.writeLine( + "TestDDAComplete", + "Directory=/some-other-directory", + "EndMessage" + ); + fcpServer.writeLine( + "ProtocolError", + "Identifier=" + identifier, + "Code=25", + "EndMessage" + ); + lines = fcpServer.collectUntil(is("EndMessage")); + assertThat(lines, matchesFcpMessage( + "TestDDARequest", + "Directory=" + tempFile.getParent(), + "WantReadDirectory=true", + "WantWriteDirectory=false" + )); + } + + @Test + public void clientPutSendsNotificationsForGeneratedKeys() + throws InterruptedException, ExecutionException, IOException { + List generatedKeys = new CopyOnWriteArrayList<>(); + Future> key = fcpClient.clientPut() + .onKeyGenerated(generatedKeys::add) + .from(new ByteArrayInputStream("Hello\n".getBytes())) + .length(6) + .uri("KSK@foo.txt") + .execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("Hello")); + String identifier = extractIdentifier(lines); + fcpServer.writeLine( + "URIGenerated", + "Identifier=" + identifier, + "URI=KSK@foo.txt", + "EndMessage" + ); + fcpServer.writeLine( + "PutSuccessful", + "URI=KSK@foo.txt", + "Identifier=" + identifier, + "EndMessage" + ); + assertThat(key.get().get().getKey(), is("KSK@foo.txt")); + assertThat(generatedKeys, contains("KSK@foo.txt")); + } + + @Test + public void defaultFcpClientCanGetNodeInformation() throws InterruptedException, ExecutionException, IOException { + Future nodeData = fcpClient.getNode().execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); + assertThat(lines, matchesFcpMessage( + "GetNode", + "Identifier=" + identifier, + "GiveOpennetRef=false", + "WithPrivate=false", + "WithVolatile=false" + )); + fcpServer.writeLine( + "NodeData", + "Identifier=" + identifier, + "ark.pubURI=SSK@3YEf.../ark", + "ark.number=78", + "auth.negTypes=2", + "version=Fred,0.7,1.0,1466", + "lastGoodVersion=Fred,0.7,1.0,1466", + "EndMessage" + ); + assertThat(nodeData.get(), notNullValue()); + } + + @Test + public void defaultFcpClientCanGetNodeInformationWithOpennetRef() + throws InterruptedException, ExecutionException, IOException { + Future nodeData = fcpClient.getNode().opennetRef().execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); + assertThat(lines, matchesFcpMessage( + "GetNode", + "Identifier=" + identifier, + "GiveOpennetRef=true", + "WithPrivate=false", + "WithVolatile=false" + )); + fcpServer.writeLine( + "NodeData", + "Identifier=" + identifier, + "opennet=true", + "ark.pubURI=SSK@3YEf.../ark", + "ark.number=78", + "auth.negTypes=2", + "version=Fred,0.7,1.0,1466", + "lastGoodVersion=Fred,0.7,1.0,1466", + "EndMessage" + ); + assertThat(nodeData.get().getVersion().toString(), is("Fred,0.7,1.0,1466")); + } + + @Test + public void defaultFcpClientCanGetNodeInformationWithPrivateData() + throws InterruptedException, ExecutionException, IOException { + Future nodeData = fcpClient.getNode().includePrivate().execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); + assertThat(lines, matchesFcpMessage( + "GetNode", + "Identifier=" + identifier, + "GiveOpennetRef=false", + "WithPrivate=true", + "WithVolatile=false" + )); + fcpServer.writeLine( + "NodeData", + "Identifier=" + identifier, + "opennet=false", + "ark.pubURI=SSK@3YEf.../ark", + "ark.number=78", + "auth.negTypes=2", + "version=Fred,0.7,1.0,1466", + "lastGoodVersion=Fred,0.7,1.0,1466", + "ark.privURI=SSK@XdHMiRl", + "EndMessage" + ); + assertThat(nodeData.get().getARK().getPrivateURI(), is("SSK@XdHMiRl")); + } + + @Test + public void defaultFcpClientCanGetNodeInformationWithVolatileData() + throws InterruptedException, ExecutionException, IOException { + Future nodeData = fcpClient.getNode().includeVolatile().execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); + assertThat(lines, matchesFcpMessage( + "GetNode", + "Identifier=" + identifier, + "GiveOpennetRef=false", + "WithPrivate=false", + "WithVolatile=true" + )); + fcpServer.writeLine( + "NodeData", + "Identifier=" + identifier, + "opennet=false", + "ark.pubURI=SSK@3YEf.../ark", + "ark.number=78", + "auth.negTypes=2", + "version=Fred,0.7,1.0,1466", + "lastGoodVersion=Fred,0.7,1.0,1466", + "volatile.freeJavaMemory=205706528", "EndMessage" + ); + assertThat(nodeData.get().getVolatile("freeJavaMemory"), is("205706528")); + } + + @Test + public void defaultFcpClientCanGetConfigWithoutDetails() + throws InterruptedException, ExecutionException, IOException { + Future configData = fcpClient.getConfig().execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); + assertThat(lines, matchesFcpMessage( + "GetConfig", + "Identifier=" + identifier )); + fcpServer.writeLine( + "ConfigData", + "Identifier=" + identifier, + "EndMessage" + ); + assertThat(configData.get(), notNullValue()); + } + + @Test + public void defaultFcpClientCanGetConfigWithCurrent() + throws InterruptedException, ExecutionException, IOException { + Future configData = fcpClient.getConfig().withCurrent().execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); + assertThat(lines, matchesFcpMessage( + "GetConfig", + "Identifier=" + identifier, + "WithCurrent=true" + )); + fcpServer.writeLine( + "ConfigData", + "Identifier=" + identifier, + "current.foo=bar", + "EndMessage" + ); + assertThat(configData.get().getCurrent("foo"), is("bar")); + } + + @Test + public void defaultFcpClientCanGetConfigWithDefaults() + throws InterruptedException, ExecutionException, IOException { + Future configData = fcpClient.getConfig().withDefaults().execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); + assertThat(lines, matchesFcpMessage( + "GetConfig", + "Identifier=" + identifier, + "WithDefaults=true" + )); + fcpServer.writeLine( + "ConfigData", + "Identifier=" + identifier, + "default.foo=bar", + "EndMessage" + ); + assertThat(configData.get().getDefault("foo"), is("bar")); + } + + @Test + public void defaultFcpClientCanGetConfigWithSortOrder() + throws InterruptedException, ExecutionException, IOException { + Future configData = fcpClient.getConfig().withSortOrder().execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); + assertThat(lines, matchesFcpMessage( + "GetConfig", + "Identifier=" + identifier, + "WithSortOrder=true" + )); + fcpServer.writeLine( + "ConfigData", + "Identifier=" + identifier, + "sortOrder.foo=17", + "EndMessage" + ); + assertThat(configData.get().getSortOrder("foo"), is(17)); + } + + @Test + public void defaultFcpClientCanGetConfigWithExpertFlag() + throws InterruptedException, ExecutionException, IOException { + Future configData = fcpClient.getConfig().withExpertFlag().execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); + assertThat(lines, matchesFcpMessage( + "GetConfig", + "Identifier=" + identifier, + "WithExpertFlag=true" + )); + fcpServer.writeLine( + "ConfigData", + "Identifier=" + identifier, + "expertFlag.foo=true", + "EndMessage" + ); + assertThat(configData.get().getExpertFlag("foo"), is(true)); + } + + @Test + public void defaultFcpClientCanGetConfigWithForceWriteFlag() + throws InterruptedException, ExecutionException, IOException { + Future configData = fcpClient.getConfig().withForceWriteFlag().execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); + assertThat(lines, matchesFcpMessage( + "GetConfig", + "Identifier=" + identifier, + "WithForceWriteFlag=true" + )); + fcpServer.writeLine( + "ConfigData", + "Identifier=" + identifier, + "forceWriteFlag.foo=true", + "EndMessage" + ); + assertThat(configData.get().getForceWriteFlag("foo"), is(true)); + } + + @Test + public void defaultFcpClientCanGetConfigWithShortDescription() + throws InterruptedException, ExecutionException, IOException { + Future configData = fcpClient.getConfig().withShortDescription().execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); + assertThat(lines, matchesFcpMessage( + "GetConfig", + "Identifier=" + identifier, + "WithShortDescription=true" + )); + fcpServer.writeLine( + "ConfigData", + "Identifier=" + identifier, + "shortDescription.foo=bar", + "EndMessage" + ); + assertThat(configData.get().getShortDescription("foo"), is("bar")); + } + + @Test + public void defaultFcpClientCanGetConfigWithLongDescription() + throws InterruptedException, ExecutionException, IOException { + Future configData = fcpClient.getConfig().withLongDescription().execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); + assertThat(lines, matchesFcpMessage( + "GetConfig", + "Identifier=" + identifier, + "WithLongDescription=true" + )); + fcpServer.writeLine( + "ConfigData", + "Identifier=" + identifier, + "longDescription.foo=bar", + "EndMessage" + ); + assertThat(configData.get().getLongDescription("foo"), is("bar")); + } + + @Test + public void defaultFcpClientCanGetConfigWithDataTypes() + throws InterruptedException, ExecutionException, IOException { + Future configData = fcpClient.getConfig().withDataTypes().execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); + assertThat(lines, matchesFcpMessage( + "GetConfig", + "Identifier=" + identifier, + "WithDataTypes=true" + )); + fcpServer.writeLine( + "ConfigData", + "Identifier=" + identifier, + "dataType.foo=number", + "EndMessage" + ); + assertThat(configData.get().getDataType("foo"), is("number")); + } + + @Test + public void defaultFcpClientCanModifyConfigData() throws InterruptedException, ExecutionException, IOException { + Future newConfigData = fcpClient.modifyConfig().set("foo.bar").to("baz").execute(); + connectNode(); + List lines = fcpServer.collectUntil(is("EndMessage")); + String identifier = extractIdentifier(lines); + assertThat(lines, matchesFcpMessage( + "ModifyConfig", + "Identifier=" + identifier, + "foo.bar=baz" + )); + fcpServer.writeLine( + "ConfigData", + "Identifier=" + identifier, + "current.foo.bar=baz", + "EndMessage" + ); + assertThat(newConfigData.get().getCurrent("foo.bar"), is("baz")); + } + + private List lines; + private String identifier; + + private void connectAndAssert(Supplier>> requestMatcher) + throws InterruptedException, ExecutionException, IOException { + connectNode(); + readMessage(requestMatcher); + } + + private void readMessage(Supplier>> requestMatcher) throws IOException { + lines = fcpServer.collectUntil(is("EndMessage")); + identifier = extractIdentifier(lines); + assertThat(lines, requestMatcher.get()); + } + + public class Connections { + + @Test(expected = ExecutionException.class) + public void throwsExceptionOnFailure() throws IOException, ExecutionException, InterruptedException { + Future keyPairFuture = fcpClient.generateKeypair().execute(); + connectAndAssert(() -> matchesFcpMessage("GenerateSSK")); + fcpServer.writeLine( + "CloseConnectionDuplicateClientName", + "EndMessage" + ); + keyPairFuture.get(); + } + + @Test(expected = ExecutionException.class) + public void throwsExceptionIfConnectionIsClosed() throws IOException, ExecutionException, InterruptedException { + Future keyPairFuture = fcpClient.generateKeypair().execute(); + connectAndAssert(() -> matchesFcpMessage("GenerateSSK")); + fcpServer.close(); + keyPairFuture.get(); + } + + } + + public class GenerateKeyPair { + + @Test + public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException { + Future keyPairFuture = fcpClient.generateKeypair().execute(); + connectAndAssert(() -> matchesFcpMessage("GenerateSSK")); + replyWithKeyPair(); + FcpKeyPair keyPair = keyPairFuture.get(); + assertThat(keyPair.getPublicKey(), is(REQUEST_URI)); + assertThat(keyPair.getPrivateKey(), is(INSERT_URI)); + } + + private void replyWithKeyPair() throws IOException { + fcpServer.writeLine("SSKKeypair", + "InsertURI=" + INSERT_URI + "", + "RequestURI=" + REQUEST_URI + "", + "Identifier=" + identifier, + "EndMessage"); + } + + } + + public class Peers { + + public class PeerCommands { + + public class ListPeer { + + @Test + public void byIdentity() throws InterruptedException, ExecutionException, IOException { + Future> peer = fcpClient.listPeer().byIdentity("id1").execute(); + connectAndAssert(() -> matchesListPeer("id1")); + replyWithPeer("id1"); + assertThat(peer.get().get().getIdentity(), is("id1")); + } + + @Test + public void byHostAndPort() throws InterruptedException, ExecutionException, IOException { + Future> peer = fcpClient.listPeer().byHostAndPort("host.free.net", 12345).execute(); + connectAndAssert(() -> matchesListPeer("host.free.net:12345")); + replyWithPeer("id1"); + assertThat(peer.get().get().getIdentity(), is("id1")); + } + + @Test + public void byName() throws InterruptedException, ExecutionException, IOException { + Future> peer = fcpClient.listPeer().byName("FriendNode").execute(); + connectAndAssert(() -> matchesListPeer("FriendNode")); + replyWithPeer("id1"); + assertThat(peer.get().get().getIdentity(), is("id1")); + } + + @Test + public void unknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException { + Future> peer = fcpClient.listPeer().byIdentity("id2").execute(); + connectAndAssert(() -> matchesListPeer("id2")); + replyWithUnknownNodeIdentifier(); + assertThat(peer.get().isPresent(), is(false)); + } + + private Matcher> matchesListPeer(String nodeId) { + return matchesFcpMessage( + "ListPeer", + "Identifier=" + identifier, + "NodeIdentifier=" + nodeId + ); + } + + } + + public class ListPeers { + + @Test + public void withoutMetadataOrVolatile() throws IOException, ExecutionException, InterruptedException { + Future> peers = fcpClient.listPeers().execute(); + connectAndAssert(() -> matchesListPeers(false, false)); + replyWithPeer("id1"); + replyWithPeer("id2"); + sendEndOfPeerList(); + assertThat(peers.get(), hasSize(2)); + assertThat(peers.get().stream().map(Peer::getIdentity).collect(Collectors.toList()), + containsInAnyOrder("id1", "id2")); + } + + @Test + public void withMetadata() throws IOException, ExecutionException, InterruptedException { + Future> peers = fcpClient.listPeers().includeMetadata().execute(); + connectAndAssert(() -> matchesListPeers(false, true)); + replyWithPeer("id1", "metadata.foo=bar1"); + replyWithPeer("id2", "metadata.foo=bar2"); + sendEndOfPeerList(); + assertThat(peers.get(), hasSize(2)); + assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()), + containsInAnyOrder("bar1", "bar2")); + } + + @Test + public void withVolatile() throws IOException, ExecutionException, InterruptedException { + Future> peers = fcpClient.listPeers().includeVolatile().execute(); + connectAndAssert(() -> matchesListPeers(true, false)); + replyWithPeer("id1", "volatile.foo=bar1"); + replyWithPeer("id2", "volatile.foo=bar2"); + sendEndOfPeerList(); + assertThat(peers.get(), hasSize(2)); + assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()), + containsInAnyOrder("bar1", "bar2")); + } + + private Matcher> matchesListPeers(boolean withVolatile, boolean withMetadata) { + return matchesFcpMessage( + "ListPeers", + "WithVolatile=" + withVolatile, + "WithMetadata=" + withMetadata + ); + } + + private void sendEndOfPeerList() throws IOException { + fcpServer.writeLine( + "EndListPeers", + "Identifier=" + identifier, + "EndMessage" + ); + } + + } + + public class AddPeer { + + @Test + public void fromFile() throws InterruptedException, ExecutionException, IOException { + Future> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute(); + connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("File=/tmp/ref.txt"))); + replyWithPeer("id1"); + assertThat(peer.get().get().getIdentity(), is("id1")); + } + + @Test + public void fromUrl() throws InterruptedException, ExecutionException, IOException { + Future> peer = fcpClient.addPeer().fromURL(new URL("http://node.ref/")).execute(); + connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("URL=http://node.ref/"))); + replyWithPeer("id1"); + assertThat(peer.get().get().getIdentity(), is("id1")); + } + + @Test + public void fromNodeRef() throws InterruptedException, ExecutionException, IOException { + NodeRef nodeRef = createNodeRef(); + Future> peer = fcpClient.addPeer().fromNodeRef(nodeRef).execute(); + connectAndAssert(() -> allOf(matchesAddPeer(), Matchers.hasItems( + "myName=name", + "ark.pubURI=public", + "ark.number=1", + "dsaGroup.g=base", + "dsaGroup.p=prime", + "dsaGroup.q=subprime", + "dsaPubKey.y=dsa-public", + "physical.udp=1.2.3.4:5678", + "auth.negTypes=3;5", + "sig=sig" + ))); + replyWithPeer("id1"); + assertThat(peer.get().get().getIdentity(), is("id1")); + } + + private NodeRef createNodeRef() { + NodeRef nodeRef = new NodeRef(); + nodeRef.setIdentity("id1"); + nodeRef.setName("name"); + nodeRef.setARK(new ARK("public", "1")); + nodeRef.setDSAGroup(new DSAGroup("base", "prime", "subprime")); + nodeRef.setNegotiationTypes(new int[] { 3, 5 }); + nodeRef.setPhysicalUDP("1.2.3.4:5678"); + nodeRef.setDSAPublicKey("dsa-public"); + nodeRef.setSignature("sig"); + return nodeRef; + } + + private Matcher> matchesAddPeer() { + return matchesFcpMessage( + "AddPeer", + "Identifier=" + identifier + ); + } + + } + + public class ModifyPeer { + + @Test + public void defaultFcpClientCanEnablePeerByName() + throws InterruptedException, ExecutionException, IOException { + Future> peer = fcpClient.modifyPeer().enable().byName("id1").execute(); + connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false)); + replyWithPeer("id1"); + assertThat(peer.get().get().getIdentity(), is("id1")); + } + + @Test + public void defaultFcpClientCanDisablePeerByName() + throws InterruptedException, ExecutionException, IOException { + Future> peer = fcpClient.modifyPeer().disable().byName("id1").execute(); + connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", true)); + replyWithPeer("id1"); + assertThat(peer.get().get().getIdentity(), is("id1")); + } + + @Test + public void defaultFcpClientCanEnablePeerByIdentity() + throws InterruptedException, ExecutionException, IOException { + Future> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute(); + connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false)); + replyWithPeer("id1"); + assertThat(peer.get().get().getIdentity(), is("id1")); + } + + @Test + public void defaultFcpClientCanEnablePeerByHostAndPort() + throws InterruptedException, ExecutionException, IOException { + Future> peer = + fcpClient.modifyPeer().enable().byHostAndPort("1.2.3.4", 5678).execute(); + connectAndAssert(() -> matchesModifyPeer("1.2.3.4:5678", "IsDisabled", false)); + replyWithPeer("id1"); + assertThat(peer.get().get().getIdentity(), is("id1")); + } + + @Test + public void allowLocalAddressesOfPeer() throws InterruptedException, ExecutionException, IOException { + Future> peer = + fcpClient.modifyPeer().allowLocalAddresses().byIdentity("id1").execute(); + connectAndAssert(() -> allOf( + matchesModifyPeer("id1", "AllowLocalAddresses", true), + not(contains(startsWith("IsDisabled="))) + )); + replyWithPeer("id1"); + assertThat(peer.get().get().getIdentity(), is("id1")); + } + + @Test + public void disallowLocalAddressesOfPeer() + throws InterruptedException, ExecutionException, IOException { + Future> peer = + fcpClient.modifyPeer().disallowLocalAddresses().byIdentity("id1").execute(); + connectAndAssert(() -> allOf( + matchesModifyPeer("id1", "AllowLocalAddresses", false), + not(contains(startsWith("IsDisabled="))) + )); + replyWithPeer("id1"); + assertThat(peer.get().get().getIdentity(), is("id1")); + } + + @Test + public void setBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException { + Future> peer = fcpClient.modifyPeer().setBurstOnly().byIdentity("id1").execute(); + connectAndAssert(() -> allOf( + matchesModifyPeer("id1", "IsBurstOnly", true), + not(contains(startsWith("AllowLocalAddresses="))), + not(contains(startsWith("IsDisabled="))) + )); + replyWithPeer("id1"); + assertThat(peer.get().get().getIdentity(), is("id1")); + } + + @Test + public void clearBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException { + Future> peer = fcpClient.modifyPeer().clearBurstOnly().byIdentity("id1").execute(); + connectAndAssert(() -> allOf( + matchesModifyPeer("id1", "IsBurstOnly", false), + not(contains(startsWith("AllowLocalAddresses="))), + not(contains(startsWith("IsDisabled="))) + )); + replyWithPeer("id1"); + assertThat(peer.get().get().getIdentity(), is("id1")); + } + + @Test + public void defaultFcpClientCanSetListenOnlyForPeer() + throws InterruptedException, ExecutionException, IOException { + Future> peer = fcpClient.modifyPeer().setListenOnly().byIdentity("id1").execute(); + connectAndAssert(() -> allOf( + matchesModifyPeer("id1", "IsListenOnly", true), + not(contains(startsWith("AllowLocalAddresses="))), + not(contains(startsWith("IsDisabled="))), + not(contains(startsWith("IsBurstOnly="))) + )); + replyWithPeer("id1"); + assertThat(peer.get().get().getIdentity(), is("id1")); + } + + @Test + public void clearListenOnlyForPeer() throws InterruptedException, ExecutionException, IOException { + Future> peer = fcpClient.modifyPeer().clearListenOnly().byIdentity("id1").execute(); + connectAndAssert(() -> allOf( + matchesModifyPeer("id1", "IsListenOnly", false), + not(contains(startsWith("AllowLocalAddresses="))), + not(contains(startsWith("IsDisabled="))), + not(contains(startsWith("IsBurstOnly="))) + )); + replyWithPeer("id1"); + assertThat(peer.get().get().getIdentity(), is("id1")); + } + + @Test + public void ignoreSourceForPeer() throws InterruptedException, ExecutionException, IOException { + Future> peer = fcpClient.modifyPeer().ignoreSource().byIdentity("id1").execute(); + connectAndAssert(() -> allOf( + matchesModifyPeer("id1", "IgnoreSourcePort", true), + not(contains(startsWith("AllowLocalAddresses="))), + not(contains(startsWith("IsDisabled="))), + not(contains(startsWith("IsBurstOnly="))), + not(contains(startsWith("IsListenOnly="))) + )); + replyWithPeer("id1"); + assertThat(peer.get().get().getIdentity(), is("id1")); + } + + @Test + public void useSourceForPeer() throws InterruptedException, ExecutionException, IOException { + Future> peer = fcpClient.modifyPeer().useSource().byIdentity("id1").execute(); + connectAndAssert(() -> allOf( + matchesModifyPeer("id1", "IgnoreSourcePort", false), + not(contains(startsWith("AllowLocalAddresses="))), + not(contains(startsWith("IsDisabled="))), + not(contains(startsWith("IsBurstOnly="))), + not(contains(startsWith("IsListenOnly="))) + )); + replyWithPeer("id1"); + assertThat(peer.get().get().getIdentity(), is("id1")); + } + + @Test + public void unknownNode() throws InterruptedException, ExecutionException, IOException { + Future> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute(); + connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false)); + replyWithUnknownNodeIdentifier(); + assertThat(peer.get().isPresent(), is(false)); + } + + private Matcher> matchesModifyPeer(String nodeIdentifier, String setting, boolean value) { + return matchesFcpMessage( + "ModifyPeer", + "Identifier=" + identifier, + "NodeIdentifier=" + nodeIdentifier, + setting + "=" + value + ); + } + + } + + public class RemovePeer { + + @Test + public void byName() throws InterruptedException, ExecutionException, IOException { + Future peer = fcpClient.removePeer().byName("Friend1").execute(); + connectAndAssert(() -> matchesRemovePeer("Friend1")); + replyWithPeerRemoved("Friend1"); + assertThat(peer.get(), is(true)); + } + + @Test + public void invalidName() throws InterruptedException, ExecutionException, IOException { + Future peer = fcpClient.removePeer().byName("NotFriend1").execute(); + connectAndAssert(() -> matchesRemovePeer("NotFriend1")); + replyWithUnknownNodeIdentifier(); + assertThat(peer.get(), is(false)); + } + + @Test + public void byIdentity() throws InterruptedException, ExecutionException, IOException { + Future peer = fcpClient.removePeer().byIdentity("id1").execute(); + connectAndAssert(() -> matchesRemovePeer("id1")); + replyWithPeerRemoved("id1"); + assertThat(peer.get(), is(true)); + } + + @Test + public void byHostAndPort() throws InterruptedException, ExecutionException, IOException { + Future peer = fcpClient.removePeer().byHostAndPort("1.2.3.4", 5678).execute(); + connectAndAssert(() -> matchesRemovePeer("1.2.3.4:5678")); + replyWithPeerRemoved("Friend1"); + assertThat(peer.get(), is(true)); + } + + private Matcher> matchesRemovePeer(String nodeIdentifier) { + return matchesFcpMessage( + "RemovePeer", + "Identifier=" + identifier, + "NodeIdentifier=" + nodeIdentifier + ); + } + + private void replyWithPeerRemoved(String nodeIdentifier) throws IOException { + fcpServer.writeLine( + "PeerRemoved", + "Identifier=" + identifier, + "NodeIdentifier=" + nodeIdentifier, + "EndMessage" + ); + } + + } + + private void replyWithPeer(String peerId, String... additionalLines) throws IOException { + fcpServer.writeLine( + "Peer", + "Identifier=" + identifier, + "identity=" + peerId, + "opennet=false", + "ark.pubURI=SSK@3YEf.../ark", + "ark.number=78", + "auth.negTypes=2", + "version=Fred,0.7,1.0,1466", + "lastGoodVersion=Fred,0.7,1.0,1466" + ); + fcpServer.writeLine(additionalLines); + fcpServer.writeLine("EndMessage"); + } + + } + + public class PeerNoteCommands { + + public class ListPeerNotes { + + @Test + public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException { + Future> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute(); + connectAndAssert(() -> matchesListPeerNotes("Friend1")); + replyWithUnknownNodeIdentifier(); + assertThat(peerNote.get().isPresent(), is(false)); + } + + @Test + public void byNodeName() throws InterruptedException, ExecutionException, IOException { + Future> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute(); + connectAndAssert(() -> matchesListPeerNotes("Friend1")); + replyWithPeerNote(); + replyWithEndListPeerNotes(); + assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg==")); + assertThat(peerNote.get().get().getPeerNoteType(), is(1)); + } + + @Test + public void byNodeIdentifier() throws InterruptedException, ExecutionException, IOException { + Future> peerNote = fcpClient.listPeerNotes().byIdentity("id1").execute(); + connectAndAssert(() -> matchesListPeerNotes("id1")); + replyWithPeerNote(); + replyWithEndListPeerNotes(); + assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg==")); + assertThat(peerNote.get().get().getPeerNoteType(), is(1)); + } + + @Test + public void byHostAndPort() throws InterruptedException, ExecutionException, IOException { + Future> peerNote = + fcpClient.listPeerNotes().byHostAndPort("1.2.3.4", 5678).execute(); + connectAndAssert(() -> matchesListPeerNotes("1.2.3.4:5678")); + replyWithPeerNote(); + replyWithEndListPeerNotes(); + assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg==")); + assertThat(peerNote.get().get().getPeerNoteType(), is(1)); + } + + private Matcher> matchesListPeerNotes(String nodeIdentifier) { + return matchesFcpMessage( + "ListPeerNotes", + "NodeIdentifier=" + nodeIdentifier + ); + } + + private void replyWithEndListPeerNotes() throws IOException { + fcpServer.writeLine( + "EndListPeerNotes", + "Identifier=" + identifier, + "EndMessage" + ); + } + + private void replyWithPeerNote() throws IOException { + fcpServer.writeLine( + "PeerNote", + "Identifier=" + identifier, + "NodeIdentifier=Friend1", + "NoteText=RXhhbXBsZSBUZXh0Lg==", + "PeerNoteType=1", + "EndMessage" + ); + } + + } + + public class ModifyPeerNotes { + + @Test + public void byName() throws InterruptedException, ExecutionException, IOException { + Future noteUpdated = + fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute(); + connectAndAssert(() -> matchesModifyPeerNote("Friend1")); + replyWithPeerNote(); + assertThat(noteUpdated.get(), is(true)); + } + + @Test + public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException { + Future noteUpdated = + fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute(); + connectAndAssert(() -> matchesModifyPeerNote("Friend1")); + replyWithUnknownNodeIdentifier(); + assertThat(noteUpdated.get(), is(false)); + } + + @Test + public void defaultFcpClientFailsToModifyPeerNoteWithoutPeerNote() + throws InterruptedException, ExecutionException, IOException { + Future noteUpdated = fcpClient.modifyPeerNote().byName("Friend1").execute(); + assertThat(noteUpdated.get(), is(false)); + } + + @Test + public void byIdentifier() throws InterruptedException, ExecutionException, IOException { + Future noteUpdated = + fcpClient.modifyPeerNote().darknetComment("foo").byIdentifier("id1").execute(); + connectAndAssert(() -> matchesModifyPeerNote("id1")); + replyWithPeerNote(); + assertThat(noteUpdated.get(), is(true)); + } + + @Test + public void byHostAndPort() throws InterruptedException, ExecutionException, IOException { + Future noteUpdated = + fcpClient.modifyPeerNote().darknetComment("foo").byHostAndPort("1.2.3.4", 5678).execute(); + connectAndAssert(() -> matchesModifyPeerNote("1.2.3.4:5678")); + replyWithPeerNote(); + assertThat(noteUpdated.get(), is(true)); + } + + private Matcher> matchesModifyPeerNote(String nodeIdentifier) { + return matchesFcpMessage( + "ModifyPeerNote", + "Identifier=" + identifier, + "NodeIdentifier=" + nodeIdentifier, + "PeerNoteType=1", + "NoteText=Zm9v" + ); + } + + private void replyWithPeerNote() throws IOException { + fcpServer.writeLine( + "PeerNote", + "Identifier=" + identifier, + "NodeIdentifier=Friend1", + "NoteText=Zm9v", + "PeerNoteType=1", + "EndMessage" + ); + } + + } + + } + + private void replyWithUnknownNodeIdentifier() throws IOException { + fcpServer.writeLine( + "UnknownNodeIdentifier", + "Identifier=" + identifier, + "NodeIdentifier=id2", + "EndMessage" + ); + } + + } + + public class PluginCommands { + + private static final String CLASS_NAME = "foo.plugin.Plugin"; + + private void replyWithPluginInfo() throws IOException { + fcpServer.writeLine( + "PluginInfo", + "Identifier=" + identifier, + "PluginName=superPlugin", + "IsTalkable=true", + "LongVersion=1.2.3", + "Version=42", + "OriginUri=superPlugin", + "Started=true", + "EndMessage" + ); + } + + private void verifyPluginInfo(Future> pluginInfo) + throws InterruptedException, ExecutionException { + assertThat(pluginInfo.get().get().getPluginName(), is("superPlugin")); + assertThat(pluginInfo.get().get().getOriginalURI(), is("superPlugin")); + assertThat(pluginInfo.get().get().isTalkable(), is(true)); + assertThat(pluginInfo.get().get().getVersion(), is("42")); + assertThat(pluginInfo.get().get().getLongVersion(), is("1.2.3")); + assertThat(pluginInfo.get().get().isStarted(), is(true)); + } + + public class LoadPlugin { + + public class OfficialPlugins { + + @Test + public void fromFreenet() throws ExecutionException, InterruptedException, IOException { + Future> pluginInfo = + fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute(); + connectAndAssert(() -> createMatcherForOfficialSource("freenet")); + assertThat(lines, not(contains(startsWith("Store=")))); + replyWithPluginInfo(); + verifyPluginInfo(pluginInfo); + } + + @Test + public void persistentFromFreenet() throws ExecutionException, InterruptedException, IOException { + Future> pluginInfo = + fcpClient.loadPlugin().addToConfig().officialFromFreenet("superPlugin").execute(); + connectAndAssert(() -> createMatcherForOfficialSource("freenet")); + assertThat(lines, hasItem("Store=true")); + replyWithPluginInfo(); + verifyPluginInfo(pluginInfo); + } + + @Test + public void fromHttps() throws ExecutionException, InterruptedException, IOException { + Future> pluginInfo = + fcpClient.loadPlugin().officialFromHttps("superPlugin").execute(); + connectAndAssert(() -> createMatcherForOfficialSource("https")); + replyWithPluginInfo(); + verifyPluginInfo(pluginInfo); + } + + private Matcher> createMatcherForOfficialSource(String officialSource) { + return matchesFcpMessage( + "LoadPlugin", + "Identifier=" + identifier, + "PluginURL=superPlugin", + "URLType=official", + "OfficialSource=" + officialSource + ); + } + + } + + public class FromOtherSources { + + private static final String FILE_PATH = "/path/to/plugin.jar"; + private static final String URL = "http://server.com/plugin.jar"; + private static final String KEY = "KSK@plugin.jar"; + + @Test + public void fromFile() throws ExecutionException, InterruptedException, IOException { + Future> pluginInfo = fcpClient.loadPlugin().fromFile(FILE_PATH).execute(); + connectAndAssert(() -> createMatcher("file", FILE_PATH)); + replyWithPluginInfo(); + verifyPluginInfo(pluginInfo); + } + + @Test + public void fromUrl() throws ExecutionException, InterruptedException, IOException { + Future> pluginInfo = fcpClient.loadPlugin().fromUrl(URL).execute(); + connectAndAssert(() -> createMatcher("url", URL)); + replyWithPluginInfo(); + verifyPluginInfo(pluginInfo); + } + + @Test + public void fromFreenet() throws ExecutionException, InterruptedException, IOException { + Future> pluginInfo = fcpClient.loadPlugin().fromFreenet(KEY).execute(); + connectAndAssert(() -> createMatcher("freenet", KEY)); + replyWithPluginInfo(); + verifyPluginInfo(pluginInfo); + } + + private Matcher> createMatcher(String urlType, String url) { + return matchesFcpMessage( + "LoadPlugin", + "Identifier=" + identifier, + "PluginURL=" + url, + "URLType=" + urlType + ); + } + + } + + public class Failed { + + @Test + public void failedLoad() throws ExecutionException, InterruptedException, IOException { + Future> pluginInfo = + fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute(); + connectAndAssert(() -> matchesFcpMessage("LoadPlugin")); + replyWithProtocolError(); + assertThat(pluginInfo.get().isPresent(), is(false)); + } + + } + + } + + private void replyWithProtocolError() throws IOException { + fcpServer.writeLine( + "ProtocolError", + "Identifier=" + identifier, + "EndMessage" + ); + } + + public class ReloadPlugin { + + @Test + public void reloadingPluginWorks() throws InterruptedException, ExecutionException, IOException { + Future> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute(); + connectAndAssert(() -> matchReloadPluginMessage()); + replyWithPluginInfo(); + verifyPluginInfo(pluginInfo); + } + + @Test + public void reloadingPluginWithMaxWaitTimeWorks() + throws InterruptedException, ExecutionException, IOException { + Future> pluginInfo = + fcpClient.reloadPlugin().waitFor(1234).plugin(CLASS_NAME).execute(); + connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("MaxWaitTime=1234"))); + replyWithPluginInfo(); + verifyPluginInfo(pluginInfo); + } + + @Test + public void reloadingPluginWithPurgeWorks() + throws InterruptedException, ExecutionException, IOException { + Future> pluginInfo = + fcpClient.reloadPlugin().purge().plugin(CLASS_NAME).execute(); + connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Purge=true"))); + replyWithPluginInfo(); + verifyPluginInfo(pluginInfo); + } + + @Test + public void reloadingPluginWithStoreWorks() + throws InterruptedException, ExecutionException, IOException { + Future> pluginInfo = + fcpClient.reloadPlugin().addToConfig().plugin(CLASS_NAME).execute(); + connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Store=true"))); + replyWithPluginInfo(); + verifyPluginInfo(pluginInfo); + } + + private Matcher> matchReloadPluginMessage() { + return matchesFcpMessage( + "ReloadPlugin", + "Identifier=" + identifier, + "PluginName=" + CLASS_NAME + ); + } + + } + + public class RemovePlugin { + + @Test + public void removingPluginWorks() throws InterruptedException, ExecutionException, IOException { + Future pluginRemoved = fcpClient.removePlugin().plugin(CLASS_NAME).execute(); + connectAndAssert(() -> matchPluginRemovedMessage()); + replyWithPluginRemoved(); + assertThat(pluginRemoved.get(), is(true)); + } + + @Test + public void removingPluginWithMaxWaitTimeWorks() + throws InterruptedException, ExecutionException, IOException { + Future pluginRemoved = fcpClient.removePlugin().waitFor(1234).plugin(CLASS_NAME).execute(); + connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("MaxWaitTime=1234"))); + replyWithPluginRemoved(); + assertThat(pluginRemoved.get(), is(true)); + } + + @Test + public void removingPluginWithPurgeWorks() + throws InterruptedException, ExecutionException, IOException { + Future pluginRemoved = fcpClient.removePlugin().purge().plugin(CLASS_NAME).execute(); + connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("Purge=true"))); + replyWithPluginRemoved(); + assertThat(pluginRemoved.get(), is(true)); + } + + private void replyWithPluginRemoved() throws IOException { + fcpServer.writeLine( + "PluginRemoved", + "Identifier=" + identifier, + "PluginName=" + CLASS_NAME, + "EndMessage" + ); + } + + private Matcher> matchPluginRemovedMessage() { + return matchesFcpMessage( + "RemovePlugin", + "Identifier=" + identifier, + "PluginName=" + CLASS_NAME + ); + } + + } + + public class GetPluginInfo { + + @Test + public void gettingPluginInfoWorks() throws InterruptedException, ExecutionException, IOException { + Future> pluginInfo = fcpClient.getPluginInfo().plugin(CLASS_NAME).execute(); + connectAndAssert(() -> matchGetPluginInfoMessage()); + replyWithPluginInfo(); + verifyPluginInfo(pluginInfo); + } + + @Test + public void gettingPluginInfoWithDetailsWorks() + throws InterruptedException, ExecutionException, IOException { + Future> pluginInfo = + fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute(); + connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true"))); + replyWithPluginInfo(); + verifyPluginInfo(pluginInfo); + } + + @Test + public void protocolErrorIsRecognizedAsFailure() + throws InterruptedException, ExecutionException, IOException { + Future> pluginInfo = + fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute(); + connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true"))); + replyWithProtocolError(); + assertThat(pluginInfo.get(), is(Optional.empty())); + } + + private Matcher> matchGetPluginInfoMessage() { + return matchesFcpMessage( + "GetPluginInfo", + "Identifier=" + identifier, + "PluginName=" + CLASS_NAME + ); + } + + } + + } + + public class UskSubscriptionCommands { + + private static final String URI = "USK@some,uri/file.txt"; + + @Test + public void subscriptionWorks() throws InterruptedException, ExecutionException, IOException { + Future> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute(); + connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI)); + replyWithSubscribed(); + assertThat(uskSubscription.get().get().getUri(), is(URI)); + AtomicInteger edition = new AtomicInteger(); + CountDownLatch updated = new CountDownLatch(2); + uskSubscription.get().get().onUpdate(e -> { + edition.set(e); + updated.countDown(); + }); + sendUpdateNotification(23); + sendUpdateNotification(24); + assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true)); + assertThat(edition.get(), is(24)); + } + + @Test + public void subscriptionUpdatesMultipleTimes() throws InterruptedException, ExecutionException, IOException { + Future> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute(); + connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI)); + replyWithSubscribed(); + assertThat(uskSubscription.get().get().getUri(), is(URI)); + AtomicInteger edition = new AtomicInteger(); + CountDownLatch updated = new CountDownLatch(2); + uskSubscription.get().get().onUpdate(e -> { + edition.set(e); + updated.countDown(); + }); + uskSubscription.get().get().onUpdate(e -> updated.countDown()); + sendUpdateNotification(23); + assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true)); + assertThat(edition.get(), is(23)); + } + + @Test + public void subscriptionCanBeCancelled() throws InterruptedException, ExecutionException, IOException { + Future> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute(); + connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI)); + replyWithSubscribed(); + assertThat(uskSubscription.get().get().getUri(), is(URI)); + AtomicBoolean updated = new AtomicBoolean(); + uskSubscription.get().get().onUpdate(e -> updated.set(true)); + uskSubscription.get().get().cancel(); + readMessage(() -> matchesFcpMessage("UnsubscribeUSK", "Identifier=" + identifier)); + sendUpdateNotification(23); + assertThat(updated.get(), is(false)); + } + + private void replyWithSubscribed() throws IOException { + fcpServer.writeLine( + "SubscribedUSK", + "Identifier=" + identifier, + "URI=" + URI, + "DontPoll=false", + "EndMessage" + ); + } + + private void sendUpdateNotification(int edition, String... additionalLines) throws IOException { + fcpServer.writeLine( + "SubscribedUSKUpdate", + "Identifier=" + identifier, + "URI=" + URI, + "Edition=" + edition + ); + fcpServer.writeLine(additionalLines); + fcpServer.writeLine("EndMessage"); + } + + } + + public class ClientGet { + + @Test + public void works() throws InterruptedException, ExecutionException, IOException { + Future> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute(); + connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "ReturnType=direct")); + replyWithAllData("not-test", "Hello World", "text/plain;charset=latin-9"); + replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8"); + Optional data = dataFuture.get(); + verifyData(data); + } + + @Test + public void getFailedIsRecognized() throws InterruptedException, ExecutionException, IOException { + Future> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute(); + connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt")); + replyWithGetFailed("not-test"); + replyWithGetFailed(identifier); + Optional data = dataFuture.get(); + assertThat(data.isPresent(), is(false)); + } + + @Test + public void getFailedForDifferentIdentifierIsIgnored() + throws InterruptedException, ExecutionException, IOException { + Future> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute(); + connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt")); + replyWithGetFailed("not-test"); + replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8"); + Optional data = dataFuture.get(); + verifyData(data); + } + + @Test(expected = ExecutionException.class) + public void connectionClosedIsRecognized() throws InterruptedException, ExecutionException, IOException { + Future> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute(); + connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt")); + fcpServer.close(); + dataFuture.get(); + } + + @Test + public void withIgnoreData() throws InterruptedException, ExecutionException, IOException { + fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt").execute(); + connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true")); + } + + @Test + public void withDataStoreOnly() throws InterruptedException, ExecutionException, IOException { + fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt").execute(); + connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true")); + } + + @Test + public void clientGetWithMaxSizeSettingSendsCorrectCommands() + throws InterruptedException, ExecutionException, IOException { + fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt").execute(); + connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576")); + } + + @Test + public void clientGetWithPrioritySettingSendsCorrectCommands() + throws InterruptedException, ExecutionException, IOException { + fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt").execute(); + connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1")); + } + + @Test + public void clientGetWithRealTimeSettingSendsCorrectCommands() + throws InterruptedException, ExecutionException, IOException { + fcpClient.clientGet().realTime().uri("KSK@foo.txt").execute(); + connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true")); + } + + @Test + public void clientGetWithGlobalSettingSendsCorrectCommands() + throws InterruptedException, ExecutionException, IOException { + fcpClient.clientGet().global().uri("KSK@foo.txt").execute(); + connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true")); + } + + private void replyWithGetFailed(String identifier) throws IOException { + fcpServer.writeLine( + "GetFailed", + "Identifier=" + identifier, + "Code=3", + "EndMessage" + ); + } + + private void replyWithAllData(String identifier, String text, String contentType) throws IOException { + fcpServer.writeLine( + "AllData", + "Identifier=" + identifier, + "DataLength=" + (text.length() + 1), + "StartupTime=1435610539000", + "CompletionTime=1435610540000", + "Metadata.ContentType=" + contentType, + "Data", + text + ); + } + + private void verifyData(Optional data) throws IOException { + assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8")); + assertThat(data.get().size(), is(6L)); + assertThat(ByteStreams.toByteArray(data.get().getInputStream()), + is("Hello\n".getBytes(StandardCharsets.UTF_8))); + } + } }