Refactor client get tests and message matching
[jFCPlib.git] / src / test / java / net / pterodactylus / fcp / quelaton / DefaultFcpClientTest.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import static org.hamcrest.MatcherAssert.assertThat;
4 import static org.hamcrest.Matchers.allOf;
5 import static org.hamcrest.Matchers.contains;
6 import static org.hamcrest.Matchers.containsInAnyOrder;
7 import static org.hamcrest.Matchers.hasItem;
8 import static org.hamcrest.Matchers.hasSize;
9 import static org.hamcrest.Matchers.is;
10 import static org.hamcrest.Matchers.not;
11 import static org.hamcrest.Matchers.notNullValue;
12 import static org.hamcrest.Matchers.startsWith;
13
14 import java.io.ByteArrayInputStream;
15 import java.io.File;
16 import java.io.IOException;
17 import java.net.URL;
18 import java.nio.charset.StandardCharsets;
19 import java.util.Arrays;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Optional;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import java.util.function.Supplier;
33 import java.util.stream.Collectors;
34
35 import net.pterodactylus.fcp.ARK;
36 import net.pterodactylus.fcp.ConfigData;
37 import net.pterodactylus.fcp.DSAGroup;
38 import net.pterodactylus.fcp.FcpKeyPair;
39 import net.pterodactylus.fcp.Key;
40 import net.pterodactylus.fcp.NodeData;
41 import net.pterodactylus.fcp.NodeRef;
42 import net.pterodactylus.fcp.Peer;
43 import net.pterodactylus.fcp.PeerNote;
44 import net.pterodactylus.fcp.PluginInfo;
45 import net.pterodactylus.fcp.Priority;
46 import net.pterodactylus.fcp.fake.FakeTcpServer;
47 import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
48
49 import com.google.common.io.ByteStreams;
50 import com.google.common.io.Files;
51 import com.nitorcreations.junit.runners.NestedRunner;
52 import org.hamcrest.Description;
53 import org.hamcrest.Matcher;
54 import org.hamcrest.Matchers;
55 import org.hamcrest.TypeSafeDiagnosingMatcher;
56 import org.junit.After;
57 import org.junit.Assert;
58 import org.junit.Test;
59 import org.junit.runner.RunWith;
60
61 /**
62  * Unit test for {@link DefaultFcpClient}.
63  *
64  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
65  */
66 @RunWith(NestedRunner.class)
67 public class DefaultFcpClientTest {
68
69         private static final String INSERT_URI =
70                 "SSK@RVCHbJdkkyTCeNN9AYukEg76eyqmiosSaNKgE3U9zUw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQECAAE/";
71         private static final String REQUEST_URI =
72                 "SSK@wtbgd2loNcJCXvtQVOftl2tuWBomDQHfqS6ytpPRhfw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQACAAE/";
73
74         private int threadCounter = 0;
75         private final ExecutorService threadPool =
76                 Executors.newCachedThreadPool(r -> new Thread(r, "Test-Thread-" + threadCounter++));
77         private final FakeTcpServer fcpServer;
78         private final DefaultFcpClient fcpClient;
79
80         public DefaultFcpClientTest() throws IOException {
81                 fcpServer = new FakeTcpServer(threadPool);
82                 fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test");
83         }
84
85         @After
86         public void tearDown() throws IOException {
87                 fcpServer.close();
88                 threadPool.shutdown();
89         }
90
91         @Test(expected = ExecutionException.class)
92         public void defaultFcpClientThrowsExceptionIfItCanNotConnect()
93         throws IOException, ExecutionException, InterruptedException {
94                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
95                 fcpServer.connect().get();
96                 fcpServer.collectUntil(is("EndMessage"));
97                 fcpServer.writeLine(
98                         "CloseConnectionDuplicateClientName",
99                         "EndMessage"
100                 );
101                 keyPairFuture.get();
102         }
103
104         @Test(expected = ExecutionException.class)
105         public void defaultFcpClientThrowsExceptionIfConnectionIsClosed()
106         throws IOException, ExecutionException, InterruptedException {
107                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
108                 fcpServer.connect().get();
109                 fcpServer.collectUntil(is("EndMessage"));
110                 fcpServer.close();
111                 keyPairFuture.get();
112         }
113
114         private void connectNode() throws InterruptedException, ExecutionException, IOException {
115                 fcpServer.connect().get();
116                 fcpServer.collectUntil(is("EndMessage"));
117                 fcpServer.writeLine("NodeHello",
118                         "CompressionCodecs=4 - GZIP(0), BZIP2(1), LZMA(2), LZMA_NEW(3)",
119                         "Revision=build01466",
120                         "Testnet=false",
121                         "Version=Fred,0.7,1.0,1466",
122                         "Build=1466",
123                         "ConnectionIdentifier=14318898267048452a81b36e7f13a3f0",
124                         "Node=Fred",
125                         "ExtBuild=29",
126                         "FCPVersion=2.0",
127                         "NodeLanguage=ENGLISH",
128                         "ExtRevision=v29",
129                         "EndMessage"
130                 );
131         }
132
133         @Test
134         public void clientGetCanDownloadData() throws InterruptedException, ExecutionException, IOException {
135                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
136                 connectNode();
137                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
138                 assertThat(lines, matchesFcpMessage("ClientGet", "ReturnType=direct", "URI=KSK@foo.txt"));
139                 String identifier = extractIdentifier(lines);
140                 fcpServer.writeLine(
141                         "AllData",
142                         "Identifier=" + identifier,
143                         "DataLength=6",
144                         "StartupTime=1435610539000",
145                         "CompletionTime=1435610540000",
146                         "Metadata.ContentType=text/plain;charset=utf-8",
147                         "Data",
148                         "Hello"
149                 );
150                 Optional<Data> data = dataFuture.get();
151                 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
152                 assertThat(data.get().size(), is(6L));
153                 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
154                         is("Hello\n".getBytes(StandardCharsets.UTF_8)));
155         }
156
157         private String extractIdentifier(List<String> lines) {
158                 return lines.stream()
159                         .filter(s -> s.startsWith("Identifier="))
160                         .map(s -> s.substring(s.indexOf('=') + 1))
161                         .findFirst()
162                         .orElse("");
163         }
164
165         @Test
166         public void defaultFcpClientReusesConnection() throws InterruptedException, ExecutionException, IOException {
167                 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
168                 connectNode();
169                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
170                 String identifier = extractIdentifier(lines);
171                 fcpServer.writeLine(
172                         "SSKKeypair",
173                         "InsertURI=" + INSERT_URI + "",
174                         "RequestURI=" + REQUEST_URI + "",
175                         "Identifier=" + identifier,
176                         "EndMessage"
177                 );
178                 keyPair.get();
179                 keyPair = fcpClient.generateKeypair().execute();
180                 lines = fcpServer.collectUntil(is("EndMessage"));
181                 identifier = extractIdentifier(lines);
182                 fcpServer.writeLine(
183                         "SSKKeypair",
184                         "InsertURI=" + INSERT_URI + "",
185                         "RequestURI=" + REQUEST_URI + "",
186                         "Identifier=" + identifier,
187                         "EndMessage"
188                 );
189                 keyPair.get();
190         }
191
192         @Test
193         public void defaultFcpClientCanReconnectAfterConnectionHasBeenClosed()
194         throws InterruptedException, ExecutionException, IOException {
195                 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
196                 connectNode();
197                 fcpServer.collectUntil(is("EndMessage"));
198                 fcpServer.close();
199                 try {
200                         keyPair.get();
201                         Assert.fail();
202                 } catch (ExecutionException e) {
203                 }
204                 keyPair = fcpClient.generateKeypair().execute();
205                 connectNode();
206                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
207                 String identifier = extractIdentifier(lines);
208                 fcpServer.writeLine(
209                         "SSKKeypair",
210                         "InsertURI=" + INSERT_URI + "",
211                         "RequestURI=" + REQUEST_URI + "",
212                         "Identifier=" + identifier,
213                         "EndMessage"
214                 );
215                 keyPair.get();
216         }
217
218         private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
219                 return matchesFcpMessageWithTerminator(name, "EndMessage", requiredLines);
220         }
221
222         private Matcher<List<String>> matchesDataMessage(String name, String... requiredLines) {
223                 return matchesFcpMessageWithTerminator(name, "Data", requiredLines);
224         }
225
226         private Matcher<Iterable<String>> hasHead(String firstElement) {
227                 return new TypeSafeDiagnosingMatcher<Iterable<String>>() {
228                         @Override
229                         protected boolean matchesSafely(Iterable<String> iterable, Description mismatchDescription) {
230                                 if (!iterable.iterator().hasNext()) {
231                                         mismatchDescription.appendText("is empty");
232                                         return false;
233                                 }
234                                 String element = iterable.iterator().next();
235                                 if (!element.equals(firstElement)) {
236                                         mismatchDescription.appendText("starts with ").appendValue(element);
237                                         return false;
238                                 }
239                                 return true;
240                         }
241
242                         @Override
243                         public void describeTo(Description description) {
244                                 description.appendText("starts with ").appendValue(firstElement);
245                         }
246                 };
247         }
248
249         private Matcher<List<String>> matchesFcpMessageWithTerminator(
250                 String name, String terminator, String... requiredLines) {
251                 return allOf(hasHead(name), hasParameters(1, 1, requiredLines), hasTail(terminator));
252         }
253
254         private Matcher<List<String>> hasParameters(int ignoreStart, int ignoreEnd, String... lines) {
255                 return new TypeSafeDiagnosingMatcher<List<String>>() {
256                         @Override
257                         protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
258                                 if (item.size() < (ignoreStart + ignoreEnd)) {
259                                         mismatchDescription.appendText("has only ").appendValue(item.size()).appendText(" elements");
260                                         return false;
261                                 }
262                                 for (String line : lines) {
263                                         if ((item.indexOf(line) < ignoreStart) || (item.indexOf(line) >= (item.size() - ignoreEnd))) {
264                                                 mismatchDescription.appendText("does not contains ").appendValue(line);
265                                                 return false;
266                                         }
267                                 }
268                                 return true;
269                         }
270
271                         @Override
272                         public void describeTo(Description description) {
273                                 description.appendText("contains ").appendValueList("(", ", ", ")", lines);
274                                 description.appendText(", ignoring the first ").appendValue(ignoreStart);
275                                 description.appendText(" and the last ").appendValue(ignoreEnd);
276                         }
277                 };
278         }
279
280         private Matcher<List<String>> hasTail(String... lastElements) {
281                 return new TypeSafeDiagnosingMatcher<List<String>>() {
282                         @Override
283                         protected boolean matchesSafely(List<String> list, Description mismatchDescription) {
284                                 if (list.size() < lastElements.length) {
285                                         mismatchDescription.appendText("is too small");
286                                         return false;
287                                 }
288                                 List<String> tail = list.subList(list.size() - lastElements.length, list.size());
289                                 if (!tail.equals(Arrays.asList(lastElements))) {
290                                         mismatchDescription.appendText("ends with ").appendValueList("(", ", ", ")", tail);
291                                         return false;
292                                 }
293                                 return true;
294                         }
295
296                         @Override
297                         public void describeTo(Description description) {
298                                 description.appendText("ends with ").appendValueList("(", ", ", ")", lastElements);
299                         }
300                 };
301         }
302
303         @Test
304         public void clientPutWithDirectDataSendsCorrectCommand()
305         throws IOException, ExecutionException, InterruptedException {
306                 fcpClient.clientPut()
307                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
308                         .length(6)
309                         .uri("KSK@foo.txt")
310                         .execute();
311                 connectNode();
312                 List<String> lines = fcpServer.collectUntil(is("Hello"));
313                 assertThat(lines, allOf(
314                         hasHead("ClientPut"),
315                         hasParameters(1, 2, "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"),
316                         hasTail("EndMessage", "Hello")
317                 ));
318         }
319
320         @Test
321         public void clientPutWithDirectDataSucceedsOnCorrectIdentifier()
322         throws InterruptedException, ExecutionException, IOException {
323                 Future<Optional<Key>> key = fcpClient.clientPut()
324                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
325                         .length(6)
326                         .uri("KSK@foo.txt")
327                         .execute();
328                 connectNode();
329                 List<String> lines = fcpServer.collectUntil(is("Hello"));
330                 String identifier = extractIdentifier(lines);
331                 fcpServer.writeLine(
332                         "PutFailed",
333                         "Identifier=not-the-right-one",
334                         "EndMessage"
335                 );
336                 fcpServer.writeLine(
337                         "PutSuccessful",
338                         "URI=KSK@foo.txt",
339                         "Identifier=" + identifier,
340                         "EndMessage"
341                 );
342                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
343         }
344
345         @Test
346         public void clientPutWithDirectDataFailsOnCorrectIdentifier()
347         throws InterruptedException, ExecutionException, IOException {
348                 Future<Optional<Key>> key = fcpClient.clientPut()
349                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
350                         .length(6)
351                         .uri("KSK@foo.txt")
352                         .execute();
353                 connectNode();
354                 List<String> lines = fcpServer.collectUntil(is("Hello"));
355                 String identifier = extractIdentifier(lines);
356                 fcpServer.writeLine(
357                         "PutSuccessful",
358                         "Identifier=not-the-right-one",
359                         "URI=KSK@foo.txt",
360                         "EndMessage"
361                 );
362                 fcpServer.writeLine(
363                         "PutFailed",
364                         "Identifier=" + identifier,
365                         "EndMessage"
366                 );
367                 assertThat(key.get().isPresent(), is(false));
368         }
369
370         @Test
371         public void clientPutWithRenamedDirectDataSendsCorrectCommand()
372         throws InterruptedException, ExecutionException, IOException {
373                 fcpClient.clientPut()
374                         .named("otherName.txt")
375                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
376                         .length(6)
377                         .uri("KSK@foo.txt")
378                         .execute();
379                 connectNode();
380                 List<String> lines = fcpServer.collectUntil(is("Hello"));
381                 assertThat(lines, allOf(
382                         hasHead("ClientPut"),
383                         hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"),
384                         hasTail("EndMessage", "Hello")
385                 ));
386         }
387
388         @Test
389         public void clientPutWithRedirectSendsCorrectCommand()
390         throws IOException, ExecutionException, InterruptedException {
391                 fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
392                 connectNode();
393                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
394                 assertThat(lines,
395                         matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
396         }
397
398         @Test
399         public void clientPutWithFileSendsCorrectCommand() throws InterruptedException, ExecutionException, IOException {
400                 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
401                 connectNode();
402                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
403                 assertThat(lines,
404                         matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
405         }
406
407         @Test
408         public void clientPutWithFileCanCompleteTestDdaSequence()
409         throws IOException, ExecutionException, InterruptedException {
410                 File tempFile = createTempFile();
411                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
412                 connectNode();
413                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
414                 String identifier = extractIdentifier(lines);
415                 fcpServer.writeLine(
416                         "ProtocolError",
417                         "Identifier=" + identifier,
418                         "Code=25",
419                         "EndMessage"
420                 );
421                 lines = fcpServer.collectUntil(is("EndMessage"));
422                 assertThat(lines, matchesFcpMessage(
423                         "TestDDARequest",
424                         "Directory=" + tempFile.getParent(),
425                         "WantReadDirectory=true",
426                         "WantWriteDirectory=false"
427                 ));
428                 fcpServer.writeLine(
429                         "TestDDAReply",
430                         "Directory=" + tempFile.getParent(),
431                         "ReadFilename=" + tempFile,
432                         "EndMessage"
433                 );
434                 lines = fcpServer.collectUntil(is("EndMessage"));
435                 assertThat(lines, matchesFcpMessage(
436                         "TestDDAResponse",
437                         "Directory=" + tempFile.getParent(),
438                         "ReadContent=test-content"
439                 ));
440                 fcpServer.writeLine(
441                         "TestDDAComplete",
442                         "Directory=" + tempFile.getParent(),
443                         "ReadDirectoryAllowed=true",
444                         "EndMessage"
445                 );
446                 lines = fcpServer.collectUntil(is("EndMessage"));
447                 assertThat(lines,
448                         matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt",
449                                 "Filename=" + new File(tempFile.getParent(), "test.dat")));
450         }
451
452         private File createTempFile() throws IOException {
453                 File tempFile = File.createTempFile("test-dda-", ".dat");
454                 tempFile.deleteOnExit();
455                 Files.write("test-content", tempFile, StandardCharsets.UTF_8);
456                 return tempFile;
457         }
458
459         @Test
460         public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
461         throws InterruptedException, ExecutionException, IOException {
462                 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
463                 connectNode();
464                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
465                 String identifier = extractIdentifier(lines);
466                 fcpServer.writeLine(
467                         "ProtocolError",
468                         "Identifier=not-the-right-one",
469                         "Code=25",
470                         "EndMessage"
471                 );
472                 fcpServer.writeLine(
473                         "PutSuccessful",
474                         "Identifier=" + identifier,
475                         "URI=KSK@foo.txt",
476                         "EndMessage"
477                 );
478                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
479         }
480
481         @Test
482         public void clientPutAbortsOnProtocolErrorOtherThan25()
483         throws InterruptedException, ExecutionException, IOException {
484                 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
485                 connectNode();
486                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
487                 String identifier = extractIdentifier(lines);
488                 fcpServer.writeLine(
489                         "ProtocolError",
490                         "Identifier=" + identifier,
491                         "Code=1",
492                         "EndMessage"
493                 );
494                 assertThat(key.get().isPresent(), is(false));
495         }
496
497         @Test
498         public void clientPutDoesNotReplyToWrongTestDdaReply() throws IOException, ExecutionException,
499         InterruptedException {
500                 File tempFile = createTempFile();
501                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
502                 connectNode();
503                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
504                 String identifier = extractIdentifier(lines);
505                 fcpServer.writeLine(
506                         "ProtocolError",
507                         "Identifier=" + identifier,
508                         "Code=25",
509                         "EndMessage"
510                 );
511                 lines = fcpServer.collectUntil(is("EndMessage"));
512                 assertThat(lines, matchesFcpMessage(
513                         "TestDDARequest",
514                         "Directory=" + tempFile.getParent(),
515                         "WantReadDirectory=true",
516                         "WantWriteDirectory=false"
517                 ));
518                 fcpServer.writeLine(
519                         "TestDDAReply",
520                         "Directory=/some-other-directory",
521                         "ReadFilename=" + tempFile,
522                         "EndMessage"
523                 );
524                 fcpServer.writeLine(
525                         "TestDDAReply",
526                         "Directory=" + tempFile.getParent(),
527                         "ReadFilename=" + tempFile,
528                         "EndMessage"
529                 );
530                 lines = fcpServer.collectUntil(is("EndMessage"));
531                 assertThat(lines, matchesFcpMessage(
532                         "TestDDAResponse",
533                         "Directory=" + tempFile.getParent(),
534                         "ReadContent=test-content"
535                 ));
536         }
537
538         @Test
539         public void clientPutSendsResponseEvenIfFileCanNotBeRead()
540         throws IOException, ExecutionException, InterruptedException {
541                 File tempFile = createTempFile();
542                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
543                 connectNode();
544                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
545                 String identifier = extractIdentifier(lines);
546                 fcpServer.writeLine(
547                         "ProtocolError",
548                         "Identifier=" + identifier,
549                         "Code=25",
550                         "EndMessage"
551                 );
552                 lines = fcpServer.collectUntil(is("EndMessage"));
553                 assertThat(lines, matchesFcpMessage(
554                         "TestDDARequest",
555                         "Directory=" + tempFile.getParent(),
556                         "WantReadDirectory=true",
557                         "WantWriteDirectory=false"
558                 ));
559                 fcpServer.writeLine(
560                         "TestDDAReply",
561                         "Directory=" + tempFile.getParent(),
562                         "ReadFilename=" + tempFile + ".foo",
563                         "EndMessage"
564                 );
565                 lines = fcpServer.collectUntil(is("EndMessage"));
566                 assertThat(lines, matchesFcpMessage(
567                         "TestDDAResponse",
568                         "Directory=" + tempFile.getParent(),
569                         "ReadContent=failed-to-read"
570                 ));
571         }
572
573         @Test
574         public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
575         throws IOException, ExecutionException, InterruptedException {
576                 File tempFile = createTempFile();
577                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt").execute();
578                 connectNode();
579                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
580                 String identifier = extractIdentifier(lines);
581                 fcpServer.writeLine(
582                         "TestDDAComplete",
583                         "Directory=/some-other-directory",
584                         "EndMessage"
585                 );
586                 fcpServer.writeLine(
587                         "ProtocolError",
588                         "Identifier=" + identifier,
589                         "Code=25",
590                         "EndMessage"
591                 );
592                 lines = fcpServer.collectUntil(is("EndMessage"));
593                 assertThat(lines, matchesFcpMessage(
594                         "TestDDARequest",
595                         "Directory=" + tempFile.getParent(),
596                         "WantReadDirectory=true",
597                         "WantWriteDirectory=false"
598                 ));
599         }
600
601         @Test
602         public void clientPutSendsNotificationsForGeneratedKeys()
603         throws InterruptedException, ExecutionException, IOException {
604                 List<String> generatedKeys = new CopyOnWriteArrayList<>();
605                 Future<Optional<Key>> key = fcpClient.clientPut()
606                         .onKeyGenerated(generatedKeys::add)
607                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
608                         .length(6)
609                         .uri("KSK@foo.txt")
610                         .execute();
611                 connectNode();
612                 List<String> lines = fcpServer.collectUntil(is("Hello"));
613                 String identifier = extractIdentifier(lines);
614                 fcpServer.writeLine(
615                         "URIGenerated",
616                         "Identifier=" + identifier,
617                         "URI=KSK@foo.txt",
618                         "EndMessage"
619                 );
620                 fcpServer.writeLine(
621                         "PutSuccessful",
622                         "URI=KSK@foo.txt",
623                         "Identifier=" + identifier,
624                         "EndMessage"
625                 );
626                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
627                 assertThat(generatedKeys, contains("KSK@foo.txt"));
628         }
629
630         @Test
631         public void defaultFcpClientCanGetNodeInformation() throws InterruptedException, ExecutionException, IOException {
632                 Future<NodeData> nodeData = fcpClient.getNode().execute();
633                 connectNode();
634                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
635                 String identifier = extractIdentifier(lines);
636                 assertThat(lines, matchesFcpMessage(
637                         "GetNode",
638                         "Identifier=" + identifier,
639                         "GiveOpennetRef=false",
640                         "WithPrivate=false",
641                         "WithVolatile=false"
642                 ));
643                 fcpServer.writeLine(
644                         "NodeData",
645                         "Identifier=" + identifier,
646                         "ark.pubURI=SSK@3YEf.../ark",
647                         "ark.number=78",
648                         "auth.negTypes=2",
649                         "version=Fred,0.7,1.0,1466",
650                         "lastGoodVersion=Fred,0.7,1.0,1466",
651                         "EndMessage"
652                 );
653                 assertThat(nodeData.get(), notNullValue());
654         }
655
656         @Test
657         public void defaultFcpClientCanGetNodeInformationWithOpennetRef()
658         throws InterruptedException, ExecutionException, IOException {
659                 Future<NodeData> nodeData = fcpClient.getNode().opennetRef().execute();
660                 connectNode();
661                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
662                 String identifier = extractIdentifier(lines);
663                 assertThat(lines, matchesFcpMessage(
664                         "GetNode",
665                         "Identifier=" + identifier,
666                         "GiveOpennetRef=true",
667                         "WithPrivate=false",
668                         "WithVolatile=false"
669                 ));
670                 fcpServer.writeLine(
671                         "NodeData",
672                         "Identifier=" + identifier,
673                         "opennet=true",
674                         "ark.pubURI=SSK@3YEf.../ark",
675                         "ark.number=78",
676                         "auth.negTypes=2",
677                         "version=Fred,0.7,1.0,1466",
678                         "lastGoodVersion=Fred,0.7,1.0,1466",
679                         "EndMessage"
680                 );
681                 assertThat(nodeData.get().getVersion().toString(), is("Fred,0.7,1.0,1466"));
682         }
683
684         @Test
685         public void defaultFcpClientCanGetNodeInformationWithPrivateData()
686         throws InterruptedException, ExecutionException, IOException {
687                 Future<NodeData> nodeData = fcpClient.getNode().includePrivate().execute();
688                 connectNode();
689                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
690                 String identifier = extractIdentifier(lines);
691                 assertThat(lines, matchesFcpMessage(
692                         "GetNode",
693                         "Identifier=" + identifier,
694                         "GiveOpennetRef=false",
695                         "WithPrivate=true",
696                         "WithVolatile=false"
697                 ));
698                 fcpServer.writeLine(
699                         "NodeData",
700                         "Identifier=" + identifier,
701                         "opennet=false",
702                         "ark.pubURI=SSK@3YEf.../ark",
703                         "ark.number=78",
704                         "auth.negTypes=2",
705                         "version=Fred,0.7,1.0,1466",
706                         "lastGoodVersion=Fred,0.7,1.0,1466",
707                         "ark.privURI=SSK@XdHMiRl",
708                         "EndMessage"
709                 );
710                 assertThat(nodeData.get().getARK().getPrivateURI(), is("SSK@XdHMiRl"));
711         }
712
713         @Test
714         public void defaultFcpClientCanGetNodeInformationWithVolatileData()
715         throws InterruptedException, ExecutionException, IOException {
716                 Future<NodeData> nodeData = fcpClient.getNode().includeVolatile().execute();
717                 connectNode();
718                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
719                 String identifier = extractIdentifier(lines);
720                 assertThat(lines, matchesFcpMessage(
721                         "GetNode",
722                         "Identifier=" + identifier,
723                         "GiveOpennetRef=false",
724                         "WithPrivate=false",
725                         "WithVolatile=true"
726                 ));
727                 fcpServer.writeLine(
728                         "NodeData",
729                         "Identifier=" + identifier,
730                         "opennet=false",
731                         "ark.pubURI=SSK@3YEf.../ark",
732                         "ark.number=78",
733                         "auth.negTypes=2",
734                         "version=Fred,0.7,1.0,1466",
735                         "lastGoodVersion=Fred,0.7,1.0,1466",
736                         "volatile.freeJavaMemory=205706528",
737                         "EndMessage"
738                 );
739                 assertThat(nodeData.get().getVolatile("freeJavaMemory"), is("205706528"));
740         }
741
742         @Test
743         public void defaultFcpClientCanGetConfigWithoutDetails()
744         throws InterruptedException, ExecutionException, IOException {
745                 Future<ConfigData> configData = fcpClient.getConfig().execute();
746                 connectNode();
747                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
748                 String identifier = extractIdentifier(lines);
749                 assertThat(lines, matchesFcpMessage(
750                         "GetConfig",
751                         "Identifier=" + identifier
752                 ));
753                 fcpServer.writeLine(
754                         "ConfigData",
755                         "Identifier=" + identifier,
756                         "EndMessage"
757                 );
758                 assertThat(configData.get(), notNullValue());
759         }
760
761         @Test
762         public void defaultFcpClientCanGetConfigWithCurrent()
763         throws InterruptedException, ExecutionException, IOException {
764                 Future<ConfigData> configData = fcpClient.getConfig().withCurrent().execute();
765                 connectNode();
766                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
767                 String identifier = extractIdentifier(lines);
768                 assertThat(lines, matchesFcpMessage(
769                         "GetConfig",
770                         "Identifier=" + identifier,
771                         "WithCurrent=true"
772                 ));
773                 fcpServer.writeLine(
774                         "ConfigData",
775                         "Identifier=" + identifier,
776                         "current.foo=bar",
777                         "EndMessage"
778                 );
779                 assertThat(configData.get().getCurrent("foo"), is("bar"));
780         }
781
782         @Test
783         public void defaultFcpClientCanGetConfigWithDefaults()
784         throws InterruptedException, ExecutionException, IOException {
785                 Future<ConfigData> configData = fcpClient.getConfig().withDefaults().execute();
786                 connectNode();
787                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
788                 String identifier = extractIdentifier(lines);
789                 assertThat(lines, matchesFcpMessage(
790                         "GetConfig",
791                         "Identifier=" + identifier,
792                         "WithDefaults=true"
793                 ));
794                 fcpServer.writeLine(
795                         "ConfigData",
796                         "Identifier=" + identifier,
797                         "default.foo=bar",
798                         "EndMessage"
799                 );
800                 assertThat(configData.get().getDefault("foo"), is("bar"));
801         }
802
803         @Test
804         public void defaultFcpClientCanGetConfigWithSortOrder()
805         throws InterruptedException, ExecutionException, IOException {
806                 Future<ConfigData> configData = fcpClient.getConfig().withSortOrder().execute();
807                 connectNode();
808                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
809                 String identifier = extractIdentifier(lines);
810                 assertThat(lines, matchesFcpMessage(
811                         "GetConfig",
812                         "Identifier=" + identifier,
813                         "WithSortOrder=true"
814                 ));
815                 fcpServer.writeLine(
816                         "ConfigData",
817                         "Identifier=" + identifier,
818                         "sortOrder.foo=17",
819                         "EndMessage"
820                 );
821                 assertThat(configData.get().getSortOrder("foo"), is(17));
822         }
823
824         @Test
825         public void defaultFcpClientCanGetConfigWithExpertFlag()
826         throws InterruptedException, ExecutionException, IOException {
827                 Future<ConfigData> configData = fcpClient.getConfig().withExpertFlag().execute();
828                 connectNode();
829                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
830                 String identifier = extractIdentifier(lines);
831                 assertThat(lines, matchesFcpMessage(
832                         "GetConfig",
833                         "Identifier=" + identifier,
834                         "WithExpertFlag=true"
835                 ));
836                 fcpServer.writeLine(
837                         "ConfigData",
838                         "Identifier=" + identifier,
839                         "expertFlag.foo=true",
840                         "EndMessage"
841                 );
842                 assertThat(configData.get().getExpertFlag("foo"), is(true));
843         }
844
845         @Test
846         public void defaultFcpClientCanGetConfigWithForceWriteFlag()
847         throws InterruptedException, ExecutionException, IOException {
848                 Future<ConfigData> configData = fcpClient.getConfig().withForceWriteFlag().execute();
849                 connectNode();
850                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
851                 String identifier = extractIdentifier(lines);
852                 assertThat(lines, matchesFcpMessage(
853                         "GetConfig",
854                         "Identifier=" + identifier,
855                         "WithForceWriteFlag=true"
856                 ));
857                 fcpServer.writeLine(
858                         "ConfigData",
859                         "Identifier=" + identifier,
860                         "forceWriteFlag.foo=true",
861                         "EndMessage"
862                 );
863                 assertThat(configData.get().getForceWriteFlag("foo"), is(true));
864         }
865
866         @Test
867         public void defaultFcpClientCanGetConfigWithShortDescription()
868         throws InterruptedException, ExecutionException, IOException {
869                 Future<ConfigData> configData = fcpClient.getConfig().withShortDescription().execute();
870                 connectNode();
871                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
872                 String identifier = extractIdentifier(lines);
873                 assertThat(lines, matchesFcpMessage(
874                         "GetConfig",
875                         "Identifier=" + identifier,
876                         "WithShortDescription=true"
877                 ));
878                 fcpServer.writeLine(
879                         "ConfigData",
880                         "Identifier=" + identifier,
881                         "shortDescription.foo=bar",
882                         "EndMessage"
883                 );
884                 assertThat(configData.get().getShortDescription("foo"), is("bar"));
885         }
886
887         @Test
888         public void defaultFcpClientCanGetConfigWithLongDescription()
889         throws InterruptedException, ExecutionException, IOException {
890                 Future<ConfigData> configData = fcpClient.getConfig().withLongDescription().execute();
891                 connectNode();
892                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
893                 String identifier = extractIdentifier(lines);
894                 assertThat(lines, matchesFcpMessage(
895                         "GetConfig",
896                         "Identifier=" + identifier,
897                         "WithLongDescription=true"
898                 ));
899                 fcpServer.writeLine(
900                         "ConfigData",
901                         "Identifier=" + identifier,
902                         "longDescription.foo=bar",
903                         "EndMessage"
904                 );
905                 assertThat(configData.get().getLongDescription("foo"), is("bar"));
906         }
907
908         @Test
909         public void defaultFcpClientCanGetConfigWithDataTypes()
910         throws InterruptedException, ExecutionException, IOException {
911                 Future<ConfigData> configData = fcpClient.getConfig().withDataTypes().execute();
912                 connectNode();
913                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
914                 String identifier = extractIdentifier(lines);
915                 assertThat(lines, matchesFcpMessage(
916                         "GetConfig",
917                         "Identifier=" + identifier,
918                         "WithDataTypes=true"
919                 ));
920                 fcpServer.writeLine(
921                         "ConfigData",
922                         "Identifier=" + identifier,
923                         "dataType.foo=number",
924                         "EndMessage"
925                 );
926                 assertThat(configData.get().getDataType("foo"), is("number"));
927         }
928
929         @Test
930         public void defaultFcpClientCanModifyConfigData() throws InterruptedException, ExecutionException, IOException {
931                 Future<ConfigData> newConfigData = fcpClient.modifyConfig().set("foo.bar").to("baz").execute();
932                 connectNode();
933                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
934                 String identifier = extractIdentifier(lines);
935                 assertThat(lines, matchesFcpMessage(
936                         "ModifyConfig",
937                         "Identifier=" + identifier,
938                         "foo.bar=baz"
939                 ));
940                 fcpServer.writeLine(
941                         "ConfigData",
942                         "Identifier=" + identifier,
943                         "current.foo.bar=baz",
944                         "EndMessage"
945                 );
946                 assertThat(newConfigData.get().getCurrent("foo.bar"), is("baz"));
947         }
948
949         private List<String> lines;
950         private String identifier;
951
952         private void connectAndAssert(Supplier<Matcher<List<String>>> requestMatcher)
953         throws InterruptedException, ExecutionException, IOException {
954                 connectNode();
955                 readMessage(requestMatcher);
956         }
957
958         private void readMessage(Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
959                 lines = fcpServer.collectUntil(is("EndMessage"));
960                 identifier = extractIdentifier(lines);
961                 assertThat(lines, requestMatcher.get());
962         }
963
964         public class GenerateKeyPair {
965
966                 @Test
967                 public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException {
968                         Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
969                         connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
970                         replyWithKeyPair();
971                         FcpKeyPair keyPair = keyPairFuture.get();
972                         assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
973                         assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
974                 }
975
976                 private void replyWithKeyPair() throws IOException {
977                         fcpServer.writeLine("SSKKeypair",
978                                 "InsertURI=" + INSERT_URI + "",
979                                 "RequestURI=" + REQUEST_URI + "",
980                                 "Identifier=" + identifier,
981                                 "EndMessage");
982                 }
983
984         }
985
986         public class Peers {
987
988                 public class PeerCommands {
989
990                         public class ListPeer {
991
992                                 @Test
993                                 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
994                                         Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id1").execute();
995                                         connectAndAssert(() -> matchesListPeer("id1"));
996                                         replyWithPeer("id1");
997                                         assertThat(peer.get().get().getIdentity(), is("id1"));
998                                 }
999
1000                                 @Test
1001                                 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
1002                                         Future<Optional<Peer>> peer = fcpClient.listPeer().byHostAndPort("host.free.net", 12345).execute();
1003                                         connectAndAssert(() -> matchesListPeer("host.free.net:12345"));
1004                                         replyWithPeer("id1");
1005                                         assertThat(peer.get().get().getIdentity(), is("id1"));
1006                                 }
1007
1008                                 @Test
1009                                 public void byName() throws InterruptedException, ExecutionException, IOException {
1010                                         Future<Optional<Peer>> peer = fcpClient.listPeer().byName("FriendNode").execute();
1011                                         connectAndAssert(() -> matchesListPeer("FriendNode"));
1012                                         replyWithPeer("id1");
1013                                         assertThat(peer.get().get().getIdentity(), is("id1"));
1014                                 }
1015
1016                                 @Test
1017                                 public void unknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
1018                                         Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id2").execute();
1019                                         connectAndAssert(() -> matchesListPeer("id2"));
1020                                         replyWithUnknownNodeIdentifier();
1021                                         assertThat(peer.get().isPresent(), is(false));
1022                                 }
1023
1024                                 private Matcher<List<String>> matchesListPeer(String nodeId) {
1025                                         return matchesFcpMessage(
1026                                                 "ListPeer",
1027                                                 "Identifier=" + identifier,
1028                                                 "NodeIdentifier=" + nodeId
1029                                         );
1030                                 }
1031
1032                         }
1033
1034                         public class ListPeers {
1035
1036                                 @Test
1037                                 public void withoutMetadataOrVolatile() throws IOException, ExecutionException, InterruptedException {
1038                                         Future<Collection<Peer>> peers = fcpClient.listPeers().execute();
1039                                         connectAndAssert(() -> matchesListPeers(false, false));
1040                                         replyWithPeer("id1");
1041                                         replyWithPeer("id2");
1042                                         sendEndOfPeerList();
1043                                         assertThat(peers.get(), hasSize(2));
1044                                         assertThat(peers.get().stream().map(Peer::getIdentity).collect(Collectors.toList()),
1045                                                 containsInAnyOrder("id1", "id2"));
1046                                 }
1047
1048                                 @Test
1049                                 public void withMetadata() throws IOException, ExecutionException, InterruptedException {
1050                                         Future<Collection<Peer>> peers = fcpClient.listPeers().includeMetadata().execute();
1051                                         connectAndAssert(() -> matchesListPeers(false, true));
1052                                         replyWithPeer("id1", "metadata.foo=bar1");
1053                                         replyWithPeer("id2", "metadata.foo=bar2");
1054                                         sendEndOfPeerList();
1055                                         assertThat(peers.get(), hasSize(2));
1056                                         assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()),
1057                                                 containsInAnyOrder("bar1", "bar2"));
1058                                 }
1059
1060                                 @Test
1061                                 public void withVolatile() throws IOException, ExecutionException, InterruptedException {
1062                                         Future<Collection<Peer>> peers = fcpClient.listPeers().includeVolatile().execute();
1063                                         connectAndAssert(() -> matchesListPeers(true, false));
1064                                         replyWithPeer("id1", "volatile.foo=bar1");
1065                                         replyWithPeer("id2", "volatile.foo=bar2");
1066                                         sendEndOfPeerList();
1067                                         assertThat(peers.get(), hasSize(2));
1068                                         assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()),
1069                                                 containsInAnyOrder("bar1", "bar2"));
1070                                 }
1071
1072                                 private Matcher<List<String>> matchesListPeers(boolean withVolatile, boolean withMetadata) {
1073                                         return matchesFcpMessage(
1074                                                 "ListPeers",
1075                                                 "WithVolatile=" + withVolatile,
1076                                                 "WithMetadata=" + withMetadata
1077                                         );
1078                                 }
1079
1080                                 private void sendEndOfPeerList() throws IOException {
1081                                         fcpServer.writeLine(
1082                                                 "EndListPeers",
1083                                                 "Identifier=" + identifier,
1084                                                 "EndMessage"
1085                                         );
1086                                 }
1087
1088                         }
1089
1090                         public class AddPeer {
1091
1092                                 @Test
1093                                 public void fromFile() throws InterruptedException, ExecutionException, IOException {
1094                                         Future<Optional<Peer>> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute();
1095                                         connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("File=/tmp/ref.txt")));
1096                                         replyWithPeer("id1");
1097                                         assertThat(peer.get().get().getIdentity(), is("id1"));
1098                                 }
1099
1100                                 @Test
1101                                 public void fromUrl() throws InterruptedException, ExecutionException, IOException {
1102                                         Future<Optional<Peer>> peer = fcpClient.addPeer().fromURL(new URL("http://node.ref/")).execute();
1103                                         connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("URL=http://node.ref/")));
1104                                         replyWithPeer("id1");
1105                                         assertThat(peer.get().get().getIdentity(), is("id1"));
1106                                 }
1107
1108                                 @Test
1109                                 public void fromNodeRef() throws InterruptedException, ExecutionException, IOException {
1110                                         NodeRef nodeRef = createNodeRef();
1111                                         Future<Optional<Peer>> peer = fcpClient.addPeer().fromNodeRef(nodeRef).execute();
1112                                         connectAndAssert(() -> allOf(matchesAddPeer(), Matchers.<String>hasItems(
1113                                                 "myName=name",
1114                                                 "ark.pubURI=public",
1115                                                 "ark.number=1",
1116                                                 "dsaGroup.g=base",
1117                                                 "dsaGroup.p=prime",
1118                                                 "dsaGroup.q=subprime",
1119                                                 "dsaPubKey.y=dsa-public",
1120                                                 "physical.udp=1.2.3.4:5678",
1121                                                 "auth.negTypes=3;5",
1122                                                 "sig=sig"
1123                                         )));
1124                                         replyWithPeer("id1");
1125                                         assertThat(peer.get().get().getIdentity(), is("id1"));
1126                                 }
1127
1128                                 private NodeRef createNodeRef() {
1129                                         NodeRef nodeRef = new NodeRef();
1130                                         nodeRef.setIdentity("id1");
1131                                         nodeRef.setName("name");
1132                                         nodeRef.setARK(new ARK("public", "1"));
1133                                         nodeRef.setDSAGroup(new DSAGroup("base", "prime", "subprime"));
1134                                         nodeRef.setNegotiationTypes(new int[] { 3, 5 });
1135                                         nodeRef.setPhysicalUDP("1.2.3.4:5678");
1136                                         nodeRef.setDSAPublicKey("dsa-public");
1137                                         nodeRef.setSignature("sig");
1138                                         return nodeRef;
1139                                 }
1140
1141                                 private Matcher<List<String>> matchesAddPeer() {
1142                                         return matchesFcpMessage(
1143                                                 "AddPeer",
1144                                                 "Identifier=" + identifier
1145                                         );
1146                                 }
1147
1148                         }
1149
1150                         public class ModifyPeer {
1151
1152                                 @Test
1153                                 public void defaultFcpClientCanEnablePeerByName()
1154                                 throws InterruptedException, ExecutionException, IOException {
1155                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byName("id1").execute();
1156                                         connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
1157                                         replyWithPeer("id1");
1158                                         assertThat(peer.get().get().getIdentity(), is("id1"));
1159                                 }
1160
1161                                 @Test
1162                                 public void defaultFcpClientCanDisablePeerByName()
1163                                 throws InterruptedException, ExecutionException, IOException {
1164                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().disable().byName("id1").execute();
1165                                         connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", true));
1166                                         replyWithPeer("id1");
1167                                         assertThat(peer.get().get().getIdentity(), is("id1"));
1168                                 }
1169
1170                                 @Test
1171                                 public void defaultFcpClientCanEnablePeerByIdentity()
1172                                 throws InterruptedException, ExecutionException, IOException {
1173                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
1174                                         connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
1175                                         replyWithPeer("id1");
1176                                         assertThat(peer.get().get().getIdentity(), is("id1"));
1177                                 }
1178
1179                                 @Test
1180                                 public void defaultFcpClientCanEnablePeerByHostAndPort()
1181                                 throws InterruptedException, ExecutionException, IOException {
1182                                         Future<Optional<Peer>> peer =
1183                                                 fcpClient.modifyPeer().enable().byHostAndPort("1.2.3.4", 5678).execute();
1184                                         connectAndAssert(() -> matchesModifyPeer("1.2.3.4:5678", "IsDisabled", false));
1185                                         replyWithPeer("id1");
1186                                         assertThat(peer.get().get().getIdentity(), is("id1"));
1187                                 }
1188
1189                                 @Test
1190                                 public void allowLocalAddressesOfPeer() throws InterruptedException, ExecutionException, IOException {
1191                                         Future<Optional<Peer>> peer =
1192                                                 fcpClient.modifyPeer().allowLocalAddresses().byIdentity("id1").execute();
1193                                         connectAndAssert(() -> allOf(
1194                                                 matchesModifyPeer("id1", "AllowLocalAddresses", true),
1195                                                 not(contains(startsWith("IsDisabled=")))
1196                                         ));
1197                                         replyWithPeer("id1");
1198                                         assertThat(peer.get().get().getIdentity(), is("id1"));
1199                                 }
1200
1201                                 @Test
1202                                 public void disallowLocalAddressesOfPeer()
1203                                 throws InterruptedException, ExecutionException, IOException {
1204                                         Future<Optional<Peer>> peer =
1205                                                 fcpClient.modifyPeer().disallowLocalAddresses().byIdentity("id1").execute();
1206                                         connectAndAssert(() -> allOf(
1207                                                 matchesModifyPeer("id1", "AllowLocalAddresses", false),
1208                                                 not(contains(startsWith("IsDisabled=")))
1209                                         ));
1210                                         replyWithPeer("id1");
1211                                         assertThat(peer.get().get().getIdentity(), is("id1"));
1212                                 }
1213
1214                                 @Test
1215                                 public void setBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
1216                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().setBurstOnly().byIdentity("id1").execute();
1217                                         connectAndAssert(() -> allOf(
1218                                                 matchesModifyPeer("id1", "IsBurstOnly", true),
1219                                                 not(contains(startsWith("AllowLocalAddresses="))),
1220                                                 not(contains(startsWith("IsDisabled=")))
1221                                         ));
1222                                         replyWithPeer("id1");
1223                                         assertThat(peer.get().get().getIdentity(), is("id1"));
1224                                 }
1225
1226                                 @Test
1227                                 public void clearBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
1228                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearBurstOnly().byIdentity("id1").execute();
1229                                         connectAndAssert(() -> allOf(
1230                                                 matchesModifyPeer("id1", "IsBurstOnly", false),
1231                                                 not(contains(startsWith("AllowLocalAddresses="))),
1232                                                 not(contains(startsWith("IsDisabled=")))
1233                                         ));
1234                                         replyWithPeer("id1");
1235                                         assertThat(peer.get().get().getIdentity(), is("id1"));
1236                                 }
1237
1238                                 @Test
1239                                 public void defaultFcpClientCanSetListenOnlyForPeer()
1240                                 throws InterruptedException, ExecutionException, IOException {
1241                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().setListenOnly().byIdentity("id1").execute();
1242                                         connectAndAssert(() -> allOf(
1243                                                 matchesModifyPeer("id1", "IsListenOnly", true),
1244                                                 not(contains(startsWith("AllowLocalAddresses="))),
1245                                                 not(contains(startsWith("IsDisabled="))),
1246                                                 not(contains(startsWith("IsBurstOnly=")))
1247                                         ));
1248                                         replyWithPeer("id1");
1249                                         assertThat(peer.get().get().getIdentity(), is("id1"));
1250                                 }
1251
1252                                 @Test
1253                                 public void clearListenOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
1254                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearListenOnly().byIdentity("id1").execute();
1255                                         connectAndAssert(() -> allOf(
1256                                                 matchesModifyPeer("id1", "IsListenOnly", false),
1257                                                 not(contains(startsWith("AllowLocalAddresses="))),
1258                                                 not(contains(startsWith("IsDisabled="))),
1259                                                 not(contains(startsWith("IsBurstOnly=")))
1260                                         ));
1261                                         replyWithPeer("id1");
1262                                         assertThat(peer.get().get().getIdentity(), is("id1"));
1263                                 }
1264
1265                                 @Test
1266                                 public void ignoreSourceForPeer() throws InterruptedException, ExecutionException, IOException {
1267                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().ignoreSource().byIdentity("id1").execute();
1268                                         connectAndAssert(() -> allOf(
1269                                                 matchesModifyPeer("id1", "IgnoreSourcePort", true),
1270                                                 not(contains(startsWith("AllowLocalAddresses="))),
1271                                                 not(contains(startsWith("IsDisabled="))),
1272                                                 not(contains(startsWith("IsBurstOnly="))),
1273                                                 not(contains(startsWith("IsListenOnly=")))
1274                                         ));
1275                                         replyWithPeer("id1");
1276                                         assertThat(peer.get().get().getIdentity(), is("id1"));
1277                                 }
1278
1279                                 @Test
1280                                 public void useSourceForPeer() throws InterruptedException, ExecutionException, IOException {
1281                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().useSource().byIdentity("id1").execute();
1282                                         connectAndAssert(() -> allOf(
1283                                                 matchesModifyPeer("id1", "IgnoreSourcePort", false),
1284                                                 not(contains(startsWith("AllowLocalAddresses="))),
1285                                                 not(contains(startsWith("IsDisabled="))),
1286                                                 not(contains(startsWith("IsBurstOnly="))),
1287                                                 not(contains(startsWith("IsListenOnly=")))
1288                                         ));
1289                                         replyWithPeer("id1");
1290                                         assertThat(peer.get().get().getIdentity(), is("id1"));
1291                                 }
1292
1293                                 @Test
1294                                 public void unknownNode() throws InterruptedException, ExecutionException, IOException {
1295                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
1296                                         connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
1297                                         replyWithUnknownNodeIdentifier();
1298                                         assertThat(peer.get().isPresent(), is(false));
1299                                 }
1300
1301                                 private Matcher<List<String>> matchesModifyPeer(String nodeIdentifier, String setting, boolean value) {
1302                                         return matchesFcpMessage(
1303                                                 "ModifyPeer",
1304                                                 "Identifier=" + identifier,
1305                                                 "NodeIdentifier=" + nodeIdentifier,
1306                                                 setting + "=" + value
1307                                         );
1308                                 }
1309
1310                         }
1311
1312                         public class RemovePeer {
1313
1314                                 @Test
1315                                 public void byName() throws InterruptedException, ExecutionException, IOException {
1316                                         Future<Boolean> peer = fcpClient.removePeer().byName("Friend1").execute();
1317                                         connectAndAssert(() -> matchesRemovePeer("Friend1"));
1318                                         replyWithPeerRemoved("Friend1");
1319                                         assertThat(peer.get(), is(true));
1320                                 }
1321
1322                                 @Test
1323                                 public void invalidName() throws InterruptedException, ExecutionException, IOException {
1324                                         Future<Boolean> peer = fcpClient.removePeer().byName("NotFriend1").execute();
1325                                         connectAndAssert(() -> matchesRemovePeer("NotFriend1"));
1326                                         replyWithUnknownNodeIdentifier();
1327                                         assertThat(peer.get(), is(false));
1328                                 }
1329
1330                                 @Test
1331                                 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
1332                                         Future<Boolean> peer = fcpClient.removePeer().byIdentity("id1").execute();
1333                                         connectAndAssert(() -> matchesRemovePeer("id1"));
1334                                         replyWithPeerRemoved("id1");
1335                                         assertThat(peer.get(), is(true));
1336                                 }
1337
1338                                 @Test
1339                                 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
1340                                         Future<Boolean> peer = fcpClient.removePeer().byHostAndPort("1.2.3.4", 5678).execute();
1341                                         connectAndAssert(() -> matchesRemovePeer("1.2.3.4:5678"));
1342                                         replyWithPeerRemoved("Friend1");
1343                                         assertThat(peer.get(), is(true));
1344                                 }
1345
1346                                 private Matcher<List<String>> matchesRemovePeer(String nodeIdentifier) {
1347                                         return matchesFcpMessage(
1348                                                 "RemovePeer",
1349                                                 "Identifier=" + identifier,
1350                                                 "NodeIdentifier=" + nodeIdentifier
1351                                         );
1352                                 }
1353
1354                                 private void replyWithPeerRemoved(String nodeIdentifier) throws IOException {
1355                                         fcpServer.writeLine(
1356                                                 "PeerRemoved",
1357                                                 "Identifier=" + identifier,
1358                                                 "NodeIdentifier=" + nodeIdentifier,
1359                                                 "EndMessage"
1360                                         );
1361                                 }
1362
1363                         }
1364
1365                         private void replyWithPeer(String peerId, String... additionalLines) throws IOException {
1366                                 fcpServer.writeLine(
1367                                         "Peer",
1368                                         "Identifier=" + identifier,
1369                                         "identity=" + peerId,
1370                                         "opennet=false",
1371                                         "ark.pubURI=SSK@3YEf.../ark",
1372                                         "ark.number=78",
1373                                         "auth.negTypes=2",
1374                                         "version=Fred,0.7,1.0,1466",
1375                                         "lastGoodVersion=Fred,0.7,1.0,1466"
1376                                 );
1377                                 fcpServer.writeLine(additionalLines);
1378                                 fcpServer.writeLine("EndMessage");
1379                         }
1380
1381                 }
1382
1383                 public class PeerNoteCommands {
1384
1385                         public class ListPeerNotes {
1386
1387                                 @Test
1388                                 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
1389                                         Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
1390                                         connectAndAssert(() -> matchesListPeerNotes("Friend1"));
1391                                         replyWithUnknownNodeIdentifier();
1392                                         assertThat(peerNote.get().isPresent(), is(false));
1393                                 }
1394
1395                                 @Test
1396                                 public void byNodeName() throws InterruptedException, ExecutionException, IOException {
1397                                         Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
1398                                         connectAndAssert(() -> matchesListPeerNotes("Friend1"));
1399                                         replyWithPeerNote();
1400                                         replyWithEndListPeerNotes();
1401                                         assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
1402                                         assertThat(peerNote.get().get().getPeerNoteType(), is(1));
1403                                 }
1404
1405                                 @Test
1406                                 public void byNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
1407                                         Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byIdentity("id1").execute();
1408                                         connectAndAssert(() -> matchesListPeerNotes("id1"));
1409                                         replyWithPeerNote();
1410                                         replyWithEndListPeerNotes();
1411                                         assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
1412                                         assertThat(peerNote.get().get().getPeerNoteType(), is(1));
1413                                 }
1414
1415                                 @Test
1416                                 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
1417                                         Future<Optional<PeerNote>> peerNote =
1418                                                 fcpClient.listPeerNotes().byHostAndPort("1.2.3.4", 5678).execute();
1419                                         connectAndAssert(() -> matchesListPeerNotes("1.2.3.4:5678"));
1420                                         replyWithPeerNote();
1421                                         replyWithEndListPeerNotes();
1422                                         assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
1423                                         assertThat(peerNote.get().get().getPeerNoteType(), is(1));
1424                                 }
1425
1426                                 private Matcher<List<String>> matchesListPeerNotes(String nodeIdentifier) {
1427                                         return matchesFcpMessage(
1428                                                 "ListPeerNotes",
1429                                                 "NodeIdentifier=" + nodeIdentifier
1430                                         );
1431                                 }
1432
1433                                 private void replyWithEndListPeerNotes() throws IOException {
1434                                         fcpServer.writeLine(
1435                                                 "EndListPeerNotes",
1436                                                 "Identifier=" + identifier,
1437                                                 "EndMessage"
1438                                         );
1439                                 }
1440
1441                                 private void replyWithPeerNote() throws IOException {
1442                                         fcpServer.writeLine(
1443                                                 "PeerNote",
1444                                                 "Identifier=" + identifier,
1445                                                 "NodeIdentifier=Friend1",
1446                                                 "NoteText=RXhhbXBsZSBUZXh0Lg==",
1447                                                 "PeerNoteType=1",
1448                                                 "EndMessage"
1449                                         );
1450                                 }
1451
1452                         }
1453
1454                         public class ModifyPeerNotes {
1455
1456                                 @Test
1457                                 public void byName() throws InterruptedException, ExecutionException, IOException {
1458                                         Future<Boolean> noteUpdated =
1459                                                 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
1460                                         connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
1461                                         replyWithPeerNote();
1462                                         assertThat(noteUpdated.get(), is(true));
1463                                 }
1464
1465                                 @Test
1466                                 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
1467                                         Future<Boolean> noteUpdated =
1468                                                 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
1469                                         connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
1470                                         replyWithUnknownNodeIdentifier();
1471                                         assertThat(noteUpdated.get(), is(false));
1472                                 }
1473
1474                                 @Test
1475                                 public void defaultFcpClientFailsToModifyPeerNoteWithoutPeerNote()
1476                                 throws InterruptedException, ExecutionException, IOException {
1477                                         Future<Boolean> noteUpdated = fcpClient.modifyPeerNote().byName("Friend1").execute();
1478                                         assertThat(noteUpdated.get(), is(false));
1479                                 }
1480
1481                                 @Test
1482                                 public void byIdentifier() throws InterruptedException, ExecutionException, IOException {
1483                                         Future<Boolean> noteUpdated =
1484                                                 fcpClient.modifyPeerNote().darknetComment("foo").byIdentifier("id1").execute();
1485                                         connectAndAssert(() -> matchesModifyPeerNote("id1"));
1486                                         replyWithPeerNote();
1487                                         assertThat(noteUpdated.get(), is(true));
1488                                 }
1489
1490                                 @Test
1491                                 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
1492                                         Future<Boolean> noteUpdated =
1493                                                 fcpClient.modifyPeerNote().darknetComment("foo").byHostAndPort("1.2.3.4", 5678).execute();
1494                                         connectAndAssert(() -> matchesModifyPeerNote("1.2.3.4:5678"));
1495                                         replyWithPeerNote();
1496                                         assertThat(noteUpdated.get(), is(true));
1497                                 }
1498
1499                                 private Matcher<List<String>> matchesModifyPeerNote(String nodeIdentifier) {
1500                                         return matchesFcpMessage(
1501                                                 "ModifyPeerNote",
1502                                                 "Identifier=" + identifier,
1503                                                 "NodeIdentifier=" + nodeIdentifier,
1504                                                 "PeerNoteType=1",
1505                                                 "NoteText=Zm9v"
1506                                         );
1507                                 }
1508
1509                                 private void replyWithPeerNote() throws IOException {
1510                                         fcpServer.writeLine(
1511                                                 "PeerNote",
1512                                                 "Identifier=" + identifier,
1513                                                 "NodeIdentifier=Friend1",
1514                                                 "NoteText=Zm9v",
1515                                                 "PeerNoteType=1",
1516                                                 "EndMessage"
1517                                         );
1518                                 }
1519
1520                         }
1521
1522                 }
1523
1524                 private void replyWithUnknownNodeIdentifier() throws IOException {
1525                         fcpServer.writeLine(
1526                                 "UnknownNodeIdentifier",
1527                                 "Identifier=" + identifier,
1528                                 "NodeIdentifier=id2",
1529                                 "EndMessage"
1530                         );
1531                 }
1532
1533         }
1534
1535         public class PluginCommands {
1536
1537                 private static final String CLASS_NAME = "foo.plugin.Plugin";
1538
1539                 private void replyWithPluginInfo() throws IOException {
1540                         fcpServer.writeLine(
1541                                 "PluginInfo",
1542                                 "Identifier=" + identifier,
1543                                 "PluginName=superPlugin",
1544                                 "IsTalkable=true",
1545                                 "LongVersion=1.2.3",
1546                                 "Version=42",
1547                                 "OriginUri=superPlugin",
1548                                 "Started=true",
1549                                 "EndMessage"
1550                         );
1551                 }
1552
1553                 private void verifyPluginInfo(Future<Optional<PluginInfo>> pluginInfo)
1554                 throws InterruptedException, ExecutionException {
1555                         assertThat(pluginInfo.get().get().getPluginName(), is("superPlugin"));
1556                         assertThat(pluginInfo.get().get().getOriginalURI(), is("superPlugin"));
1557                         assertThat(pluginInfo.get().get().isTalkable(), is(true));
1558                         assertThat(pluginInfo.get().get().getVersion(), is("42"));
1559                         assertThat(pluginInfo.get().get().getLongVersion(), is("1.2.3"));
1560                         assertThat(pluginInfo.get().get().isStarted(), is(true));
1561                 }
1562
1563                 public class LoadPlugin {
1564
1565                         public class OfficialPlugins {
1566
1567                                 @Test
1568                                 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
1569                                         Future<Optional<PluginInfo>> pluginInfo =
1570                                                 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
1571                                         connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
1572                                         assertThat(lines, not(contains(startsWith("Store="))));
1573                                         replyWithPluginInfo();
1574                                         verifyPluginInfo(pluginInfo);
1575                                 }
1576
1577                                 @Test
1578                                 public void persistentFromFreenet() throws ExecutionException, InterruptedException, IOException {
1579                                         Future<Optional<PluginInfo>> pluginInfo =
1580                                                 fcpClient.loadPlugin().addToConfig().officialFromFreenet("superPlugin").execute();
1581                                         connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
1582                                         assertThat(lines, hasItem("Store=true"));
1583                                         replyWithPluginInfo();
1584                                         verifyPluginInfo(pluginInfo);
1585                                 }
1586
1587                                 @Test
1588                                 public void fromHttps() throws ExecutionException, InterruptedException, IOException {
1589                                         Future<Optional<PluginInfo>> pluginInfo =
1590                                                 fcpClient.loadPlugin().officialFromHttps("superPlugin").execute();
1591                                         connectAndAssert(() -> createMatcherForOfficialSource("https"));
1592                                         replyWithPluginInfo();
1593                                         verifyPluginInfo(pluginInfo);
1594                                 }
1595
1596                                 private Matcher<List<String>> createMatcherForOfficialSource(String officialSource) {
1597                                         return matchesFcpMessage(
1598                                                 "LoadPlugin",
1599                                                 "Identifier=" + identifier,
1600                                                 "PluginURL=superPlugin",
1601                                                 "URLType=official",
1602                                                 "OfficialSource=" + officialSource
1603                                         );
1604                                 }
1605
1606                         }
1607
1608                         public class FromOtherSources {
1609
1610                                 private static final String FILE_PATH = "/path/to/plugin.jar";
1611                                 private static final String URL = "http://server.com/plugin.jar";
1612                                 private static final String KEY = "KSK@plugin.jar";
1613
1614                                 @Test
1615                                 public void fromFile() throws ExecutionException, InterruptedException, IOException {
1616                                         Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFile(FILE_PATH).execute();
1617                                         connectAndAssert(() -> createMatcher("file", FILE_PATH));
1618                                         replyWithPluginInfo();
1619                                         verifyPluginInfo(pluginInfo);
1620                                 }
1621
1622                                 @Test
1623                                 public void fromUrl() throws ExecutionException, InterruptedException, IOException {
1624                                         Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromUrl(URL).execute();
1625                                         connectAndAssert(() -> createMatcher("url", URL));
1626                                         replyWithPluginInfo();
1627                                         verifyPluginInfo(pluginInfo);
1628                                 }
1629
1630                                 @Test
1631                                 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
1632                                         Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFreenet(KEY).execute();
1633                                         connectAndAssert(() -> createMatcher("freenet", KEY));
1634                                         replyWithPluginInfo();
1635                                         verifyPluginInfo(pluginInfo);
1636                                 }
1637
1638                                 private Matcher<List<String>> createMatcher(String urlType, String url) {
1639                                         return matchesFcpMessage(
1640                                                 "LoadPlugin",
1641                                                 "Identifier=" + identifier,
1642                                                 "PluginURL=" + url,
1643                                                 "URLType=" + urlType
1644                                         );
1645                                 }
1646
1647                         }
1648
1649                         public class Failed {
1650
1651                                 @Test
1652                                 public void failedLoad() throws ExecutionException, InterruptedException, IOException {
1653                                         Future<Optional<PluginInfo>> pluginInfo =
1654                                                 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
1655                                         connectAndAssert(() -> matchesFcpMessage("LoadPlugin"));
1656                                         replyWithProtocolError();
1657                                         assertThat(pluginInfo.get().isPresent(), is(false));
1658                                 }
1659
1660                         }
1661
1662                 }
1663
1664                 private void replyWithProtocolError() throws IOException {
1665                         fcpServer.writeLine(
1666                                 "ProtocolError",
1667                                 "Identifier=" + identifier,
1668                                 "EndMessage"
1669                         );
1670                 }
1671
1672                 public class ReloadPlugin {
1673
1674                         @Test
1675                         public void reloadingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1676                                 Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
1677                                 connectAndAssert(() -> matchReloadPluginMessage());
1678                                 replyWithPluginInfo();
1679                                 verifyPluginInfo(pluginInfo);
1680                         }
1681
1682                         @Test
1683                         public void reloadingPluginWithMaxWaitTimeWorks()
1684                         throws InterruptedException, ExecutionException, IOException {
1685                                 Future<Optional<PluginInfo>> pluginInfo =
1686                                         fcpClient.reloadPlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1687                                 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("MaxWaitTime=1234")));
1688                                 replyWithPluginInfo();
1689                                 verifyPluginInfo(pluginInfo);
1690                         }
1691
1692                         @Test
1693                         public void reloadingPluginWithPurgeWorks()
1694                         throws InterruptedException, ExecutionException, IOException {
1695                                 Future<Optional<PluginInfo>> pluginInfo =
1696                                         fcpClient.reloadPlugin().purge().plugin(CLASS_NAME).execute();
1697                                 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Purge=true")));
1698                                 replyWithPluginInfo();
1699                                 verifyPluginInfo(pluginInfo);
1700                         }
1701
1702                         @Test
1703                         public void reloadingPluginWithStoreWorks()
1704                         throws InterruptedException, ExecutionException, IOException {
1705                                 Future<Optional<PluginInfo>> pluginInfo =
1706                                         fcpClient.reloadPlugin().addToConfig().plugin(CLASS_NAME).execute();
1707                                 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Store=true")));
1708                                 replyWithPluginInfo();
1709                                 verifyPluginInfo(pluginInfo);
1710                         }
1711
1712                         private Matcher<List<String>> matchReloadPluginMessage() {
1713                                 return matchesFcpMessage(
1714                                         "ReloadPlugin",
1715                                         "Identifier=" + identifier,
1716                                         "PluginName=" + CLASS_NAME
1717                                 );
1718                         }
1719
1720                 }
1721
1722                 public class RemovePlugin {
1723
1724                         @Test
1725                         public void removingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1726                                 Future<Boolean> pluginRemoved = fcpClient.removePlugin().plugin(CLASS_NAME).execute();
1727                                 connectAndAssert(() -> matchPluginRemovedMessage());
1728                                 replyWithPluginRemoved();
1729                                 assertThat(pluginRemoved.get(), is(true));
1730                         }
1731
1732                         @Test
1733                         public void removingPluginWithMaxWaitTimeWorks()
1734                         throws InterruptedException, ExecutionException, IOException {
1735                                 Future<Boolean> pluginRemoved = fcpClient.removePlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1736                                 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("MaxWaitTime=1234")));
1737                                 replyWithPluginRemoved();
1738                                 assertThat(pluginRemoved.get(), is(true));
1739                         }
1740
1741                         @Test
1742                         public void removingPluginWithPurgeWorks()
1743                         throws InterruptedException, ExecutionException, IOException {
1744                                 Future<Boolean> pluginRemoved = fcpClient.removePlugin().purge().plugin(CLASS_NAME).execute();
1745                                 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("Purge=true")));
1746                                 replyWithPluginRemoved();
1747                                 assertThat(pluginRemoved.get(), is(true));
1748                         }
1749
1750                         private void replyWithPluginRemoved() throws IOException {
1751                                 fcpServer.writeLine(
1752                                         "PluginRemoved",
1753                                         "Identifier=" + identifier,
1754                                         "PluginName=" + CLASS_NAME,
1755                                         "EndMessage"
1756                                 );
1757                         }
1758
1759                         private Matcher<List<String>> matchPluginRemovedMessage() {
1760                                 return matchesFcpMessage(
1761                                         "RemovePlugin",
1762                                         "Identifier=" + identifier,
1763                                         "PluginName=" + CLASS_NAME
1764                                 );
1765                         }
1766
1767                 }
1768
1769                 public class GetPluginInfo {
1770
1771                         @Test
1772                         public void gettingPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
1773                                 Future<Optional<PluginInfo>> pluginInfo = fcpClient.getPluginInfo().plugin(CLASS_NAME).execute();
1774                                 connectAndAssert(() -> matchGetPluginInfoMessage());
1775                                 replyWithPluginInfo();
1776                                 verifyPluginInfo(pluginInfo);
1777                         }
1778
1779                         @Test
1780                         public void gettingPluginInfoWithDetailsWorks()
1781                         throws InterruptedException, ExecutionException, IOException {
1782                                 Future<Optional<PluginInfo>> pluginInfo =
1783                                         fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1784                                 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1785                                 replyWithPluginInfo();
1786                                 verifyPluginInfo(pluginInfo);
1787                         }
1788
1789                         @Test
1790                         public void protocolErrorIsRecognizedAsFailure()
1791                         throws InterruptedException, ExecutionException, IOException {
1792                                 Future<Optional<PluginInfo>> pluginInfo =
1793                                         fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1794                                 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1795                                 replyWithProtocolError();
1796                                 assertThat(pluginInfo.get(), is(Optional.empty()));
1797                         }
1798
1799                         private Matcher<List<String>> matchGetPluginInfoMessage() {
1800                                 return matchesFcpMessage(
1801                                         "GetPluginInfo",
1802                                         "Identifier=" + identifier,
1803                                         "PluginName=" + CLASS_NAME
1804                                 );
1805                         }
1806
1807                 }
1808
1809         }
1810
1811         public class UskSubscriptionCommands {
1812
1813                 private static final String URI = "USK@some,uri/file.txt";
1814
1815                 @Test
1816                 public void subscriptionWorks() throws InterruptedException, ExecutionException, IOException {
1817                         Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1818                         connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1819                         replyWithSubscribed();
1820                         assertThat(uskSubscription.get().get().getUri(), is(URI));
1821                         AtomicInteger edition = new AtomicInteger();
1822                         CountDownLatch updated = new CountDownLatch(2);
1823                         uskSubscription.get().get().onUpdate(e -> {
1824                                 edition.set(e);
1825                                 updated.countDown();
1826                         });
1827                         sendUpdateNotification(23);
1828                         sendUpdateNotification(24);
1829                         assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1830                         assertThat(edition.get(), is(24));
1831                 }
1832
1833                 @Test
1834                 public void subscriptionUpdatesMultipleTimes() throws InterruptedException, ExecutionException, IOException {
1835                         Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1836                         connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1837                         replyWithSubscribed();
1838                         assertThat(uskSubscription.get().get().getUri(), is(URI));
1839                         AtomicInteger edition = new AtomicInteger();
1840                         CountDownLatch updated = new CountDownLatch(2);
1841                         uskSubscription.get().get().onUpdate(e -> {
1842                                 edition.set(e);
1843                                 updated.countDown();
1844                         });
1845                         uskSubscription.get().get().onUpdate(e -> updated.countDown());
1846                         sendUpdateNotification(23);
1847                         assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1848                         assertThat(edition.get(), is(23));
1849                 }
1850
1851                 @Test
1852                 public void subscriptionCanBeCancelled() throws InterruptedException, ExecutionException, IOException {
1853                         Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1854                         connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1855                         replyWithSubscribed();
1856                         assertThat(uskSubscription.get().get().getUri(), is(URI));
1857                         AtomicBoolean updated = new AtomicBoolean();
1858                         uskSubscription.get().get().onUpdate(e -> updated.set(true));
1859                         uskSubscription.get().get().cancel();
1860                         readMessage(() -> matchesFcpMessage("UnsubscribeUSK", "Identifier=" + identifier));
1861                         sendUpdateNotification(23);
1862                         assertThat(updated.get(), is(false));
1863                 }
1864
1865                 private void replyWithSubscribed() throws IOException {
1866                         fcpServer.writeLine(
1867                                 "SubscribedUSK",
1868                                 "Identifier=" + identifier,
1869                                 "URI=" + URI,
1870                                 "DontPoll=false",
1871                                 "EndMessage"
1872                         );
1873                 }
1874
1875                 private void sendUpdateNotification(int edition, String... additionalLines) throws IOException {
1876                         fcpServer.writeLine(
1877                                 "SubscribedUSKUpdate",
1878                                 "Identifier=" + identifier,
1879                                 "URI=" + URI,
1880                                 "Edition=" + edition
1881                         );
1882                         fcpServer.writeLine(additionalLines);
1883                         fcpServer.writeLine("EndMessage");
1884                 }
1885
1886         }
1887
1888         public class ClientGet {
1889
1890                 @Test
1891                 public void works() throws InterruptedException, ExecutionException, IOException {
1892                         Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1893                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1894                         replyWithAllData("not-test", "Hello World", "text/plain;charset=latin-9");
1895                         replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1896                         Optional<Data> data = dataFuture.get();
1897                         verifyData(data);
1898                 }
1899
1900                 @Test
1901                 public void getFailedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1902                         Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1903                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1904                         replyWithGetFailed(identifier);
1905                         Optional<Data> data = dataFuture.get();
1906                         assertThat(data.isPresent(), is(false));
1907                 }
1908
1909                 @Test
1910                 public void getFailedForDifferentIdentifierIsIgnored()
1911                 throws InterruptedException, ExecutionException, IOException {
1912                         Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1913                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1914                         replyWithGetFailed("not-test");
1915                         replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1916                         Optional<Data> data = dataFuture.get();
1917                         verifyData(data);
1918                 }
1919
1920                 @Test(expected = ExecutionException.class)
1921                 public void connectionClosedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1922                         Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1923                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1924                         fcpServer.close();
1925                         dataFuture.get();
1926                 }
1927
1928                 @Test
1929                 public void withIgnoreData() throws InterruptedException, ExecutionException, IOException {
1930                         fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt").execute();
1931                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
1932                 }
1933
1934                 @Test
1935                 public void withDataStoreOnly() throws InterruptedException, ExecutionException, IOException {
1936                         fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt").execute();
1937                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
1938                 }
1939
1940                 @Test
1941                 public void clientGetWithMaxSizeSettingSendsCorrectCommands()
1942                 throws InterruptedException, ExecutionException, IOException {
1943                         fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt").execute();
1944                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
1945                 }
1946
1947                 @Test
1948                 public void clientGetWithPrioritySettingSendsCorrectCommands()
1949                 throws InterruptedException, ExecutionException, IOException {
1950                         fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt").execute();
1951                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
1952                 }
1953
1954                 @Test
1955                 public void clientGetWithRealTimeSettingSendsCorrectCommands()
1956                 throws InterruptedException, ExecutionException, IOException {
1957                         fcpClient.clientGet().realTime().uri("KSK@foo.txt").execute();
1958                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
1959                 }
1960
1961                 @Test
1962                 public void clientGetWithGlobalSettingSendsCorrectCommands()
1963                 throws InterruptedException, ExecutionException, IOException {
1964                         fcpClient.clientGet().global().uri("KSK@foo.txt").execute();
1965                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
1966                 }
1967
1968                 private void replyWithGetFailed(String identifier) throws IOException {
1969                         fcpServer.writeLine(
1970                                 "GetFailed",
1971                                 "Identifier=" + identifier,
1972                                 "Code=3",
1973                                 "EndMessage"
1974                         );
1975                 }
1976
1977                 private void replyWithAllData(String identifier, String text, String contentType) throws IOException {
1978                         fcpServer.writeLine(
1979                                 "AllData",
1980                                 "Identifier=" + identifier,
1981                                 "DataLength=" + (text.length() + 1),
1982                                 "StartupTime=1435610539000",
1983                                 "CompletionTime=1435610540000",
1984                                 "Metadata.ContentType=" + contentType,
1985                                 "Data",
1986                                 text
1987                         );
1988                 }
1989
1990                 private void verifyData(Optional<Data> data) throws IOException {
1991                         assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
1992                         assertThat(data.get().size(), is(6L));
1993                         assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
1994                                 is("Hello\n".getBytes(StandardCharsets.UTF_8)));
1995                 }
1996
1997         }
1998
1999 }