ed416e2b807868f0f075507a04f4f989ec960f4f
[jFCPlib.git] / src / test / java / net / pterodactylus / fcp / quelaton / DefaultFcpClientTest.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import static org.hamcrest.MatcherAssert.assertThat;
4 import static org.hamcrest.Matchers.allOf;
5 import static org.hamcrest.Matchers.contains;
6 import static org.hamcrest.Matchers.containsInAnyOrder;
7 import static org.hamcrest.Matchers.hasItem;
8 import static org.hamcrest.Matchers.hasSize;
9 import static org.hamcrest.Matchers.is;
10 import static org.hamcrest.Matchers.not;
11 import static org.hamcrest.Matchers.notNullValue;
12 import static org.hamcrest.Matchers.startsWith;
13
14 import java.io.ByteArrayInputStream;
15 import java.io.File;
16 import java.io.IOException;
17 import java.net.URL;
18 import java.nio.charset.StandardCharsets;
19 import java.util.Arrays;
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Optional;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.CountDownLatch;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.TimeUnit;
30 import java.util.concurrent.atomic.AtomicBoolean;
31 import java.util.concurrent.atomic.AtomicInteger;
32 import java.util.function.Supplier;
33 import java.util.stream.Collectors;
34
35 import net.pterodactylus.fcp.ARK;
36 import net.pterodactylus.fcp.ConfigData;
37 import net.pterodactylus.fcp.DSAGroup;
38 import net.pterodactylus.fcp.FcpKeyPair;
39 import net.pterodactylus.fcp.Key;
40 import net.pterodactylus.fcp.NodeData;
41 import net.pterodactylus.fcp.NodeRef;
42 import net.pterodactylus.fcp.Peer;
43 import net.pterodactylus.fcp.PeerNote;
44 import net.pterodactylus.fcp.PluginInfo;
45 import net.pterodactylus.fcp.Priority;
46 import net.pterodactylus.fcp.fake.FakeTcpServer;
47 import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
48
49 import com.google.common.io.ByteStreams;
50 import com.google.common.io.Files;
51 import com.nitorcreations.junit.runners.NestedRunner;
52 import org.hamcrest.Description;
53 import org.hamcrest.Matcher;
54 import org.hamcrest.Matchers;
55 import org.hamcrest.TypeSafeDiagnosingMatcher;
56 import org.junit.After;
57 import org.junit.Assert;
58 import org.junit.Test;
59 import org.junit.runner.RunWith;
60
61 /**
62  * Unit test for {@link DefaultFcpClient}.
63  *
64  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
65  */
66 @RunWith(NestedRunner.class)
67 public class DefaultFcpClientTest {
68
69         private static final String INSERT_URI =
70                 "SSK@RVCHbJdkkyTCeNN9AYukEg76eyqmiosSaNKgE3U9zUw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQECAAE/";
71         private static final String REQUEST_URI =
72                 "SSK@wtbgd2loNcJCXvtQVOftl2tuWBomDQHfqS6ytpPRhfw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQACAAE/";
73
74         private int threadCounter = 0;
75         private final ExecutorService threadPool =
76                 Executors.newCachedThreadPool(r -> new Thread(r, "Test-Thread-" + threadCounter++));
77         private final FakeTcpServer fcpServer;
78         private final DefaultFcpClient fcpClient;
79
80         public DefaultFcpClientTest() throws IOException {
81                 fcpServer = new FakeTcpServer(threadPool);
82                 fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test");
83         }
84
85         @After
86         public void tearDown() throws IOException {
87                 fcpServer.close();
88                 threadPool.shutdown();
89         }
90
91         private void connectNode() throws InterruptedException, ExecutionException, IOException {
92                 fcpServer.connect().get();
93                 fcpServer.collectUntil(is("EndMessage"));
94                 fcpServer.writeLine("NodeHello",
95                         "CompressionCodecs=4 - GZIP(0), BZIP2(1), LZMA(2), LZMA_NEW(3)",
96                         "Revision=build01466",
97                         "Testnet=false",
98                         "Version=Fred,0.7,1.0,1466",
99                         "Build=1466",
100                         "ConnectionIdentifier=14318898267048452a81b36e7f13a3f0",
101                         "Node=Fred",
102                         "ExtBuild=29",
103                         "FCPVersion=2.0",
104                         "NodeLanguage=ENGLISH",
105                         "ExtRevision=v29",
106                         "EndMessage"
107                 );
108         }
109
110         private String extractIdentifier(List<String> lines) {
111                 return lines.stream()
112                         .filter(s -> s.startsWith("Identifier="))
113                         .map(s -> s.substring(s.indexOf('=') + 1))
114                         .findFirst()
115                         .orElse("");
116         }
117
118         private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
119                 return matchesFcpMessageWithTerminator(name, "EndMessage", requiredLines);
120         }
121
122         private Matcher<Iterable<String>> hasHead(String firstElement) {
123                 return new TypeSafeDiagnosingMatcher<Iterable<String>>() {
124                         @Override
125                         protected boolean matchesSafely(Iterable<String> iterable, Description mismatchDescription) {
126                                 if (!iterable.iterator().hasNext()) {
127                                         mismatchDescription.appendText("is empty");
128                                         return false;
129                                 }
130                                 String element = iterable.iterator().next();
131                                 if (!element.equals(firstElement)) {
132                                         mismatchDescription.appendText("starts with ").appendValue(element);
133                                         return false;
134                                 }
135                                 return true;
136                         }
137
138                         @Override
139                         public void describeTo(Description description) {
140                                 description.appendText("starts with ").appendValue(firstElement);
141                         }
142                 };
143         }
144
145         private Matcher<List<String>> matchesFcpMessageWithTerminator(
146                 String name, String terminator, String... requiredLines) {
147                 return allOf(hasHead(name), hasParameters(1, 1, requiredLines), hasTail(terminator));
148         }
149
150         private Matcher<List<String>> hasParameters(int ignoreStart, int ignoreEnd, String... lines) {
151                 return new TypeSafeDiagnosingMatcher<List<String>>() {
152                         @Override
153                         protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
154                                 if (item.size() < (ignoreStart + ignoreEnd)) {
155                                         mismatchDescription.appendText("has only ").appendValue(item.size()).appendText(" elements");
156                                         return false;
157                                 }
158                                 for (String line : lines) {
159                                         if ((item.indexOf(line) < ignoreStart) || (item.indexOf(line) >= (item.size() - ignoreEnd))) {
160                                                 mismatchDescription.appendText("does not contains ").appendValue(line);
161                                                 return false;
162                                         }
163                                 }
164                                 return true;
165                         }
166
167                         @Override
168                         public void describeTo(Description description) {
169                                 description.appendText("contains ").appendValueList("(", ", ", ")", lines);
170                                 description.appendText(", ignoring the first ").appendValue(ignoreStart);
171                                 description.appendText(" and the last ").appendValue(ignoreEnd);
172                         }
173                 };
174         }
175
176         private Matcher<List<String>> hasTail(String... lastElements) {
177                 return new TypeSafeDiagnosingMatcher<List<String>>() {
178                         @Override
179                         protected boolean matchesSafely(List<String> list, Description mismatchDescription) {
180                                 if (list.size() < lastElements.length) {
181                                         mismatchDescription.appendText("is too small");
182                                         return false;
183                                 }
184                                 List<String> tail = list.subList(list.size() - lastElements.length, list.size());
185                                 if (!tail.equals(Arrays.asList(lastElements))) {
186                                         mismatchDescription.appendText("ends with ").appendValueList("(", ", ", ")", tail);
187                                         return false;
188                                 }
189                                 return true;
190                         }
191
192                         @Override
193                         public void describeTo(Description description) {
194                                 description.appendText("ends with ").appendValueList("(", ", ", ")", lastElements);
195                         }
196                 };
197         }
198
199         @Test
200         public void defaultFcpClientCanGetNodeInformation() throws InterruptedException, ExecutionException, IOException {
201                 Future<NodeData> nodeData = fcpClient.getNode().execute();
202                 connectNode();
203                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
204                 String identifier = extractIdentifier(lines);
205                 assertThat(lines, matchesFcpMessage(
206                         "GetNode",
207                         "Identifier=" + identifier,
208                         "GiveOpennetRef=false",
209                         "WithPrivate=false",
210                         "WithVolatile=false"
211                 ));
212                 fcpServer.writeLine(
213                         "NodeData",
214                         "Identifier=" + identifier,
215                         "ark.pubURI=SSK@3YEf.../ark",
216                         "ark.number=78",
217                         "auth.negTypes=2",
218                         "version=Fred,0.7,1.0,1466",
219                         "lastGoodVersion=Fred,0.7,1.0,1466",
220                         "EndMessage"
221                 );
222                 assertThat(nodeData.get(), notNullValue());
223         }
224
225         @Test
226         public void defaultFcpClientCanGetNodeInformationWithOpennetRef()
227         throws InterruptedException, ExecutionException, IOException {
228                 Future<NodeData> nodeData = fcpClient.getNode().opennetRef().execute();
229                 connectNode();
230                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
231                 String identifier = extractIdentifier(lines);
232                 assertThat(lines, matchesFcpMessage(
233                         "GetNode",
234                         "Identifier=" + identifier,
235                         "GiveOpennetRef=true",
236                         "WithPrivate=false",
237                         "WithVolatile=false"
238                 ));
239                 fcpServer.writeLine(
240                         "NodeData",
241                         "Identifier=" + identifier,
242                         "opennet=true",
243                         "ark.pubURI=SSK@3YEf.../ark",
244                         "ark.number=78",
245                         "auth.negTypes=2",
246                         "version=Fred,0.7,1.0,1466",
247                         "lastGoodVersion=Fred,0.7,1.0,1466",
248                         "EndMessage"
249                 );
250                 assertThat(nodeData.get().getVersion().toString(), is("Fred,0.7,1.0,1466"));
251         }
252
253         @Test
254         public void defaultFcpClientCanGetNodeInformationWithPrivateData()
255         throws InterruptedException, ExecutionException, IOException {
256                 Future<NodeData> nodeData = fcpClient.getNode().includePrivate().execute();
257                 connectNode();
258                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
259                 String identifier = extractIdentifier(lines);
260                 assertThat(lines, matchesFcpMessage(
261                         "GetNode",
262                         "Identifier=" + identifier,
263                         "GiveOpennetRef=false",
264                         "WithPrivate=true",
265                         "WithVolatile=false"
266                 ));
267                 fcpServer.writeLine(
268                         "NodeData",
269                         "Identifier=" + identifier,
270                         "opennet=false",
271                         "ark.pubURI=SSK@3YEf.../ark",
272                         "ark.number=78",
273                         "auth.negTypes=2",
274                         "version=Fred,0.7,1.0,1466",
275                         "lastGoodVersion=Fred,0.7,1.0,1466",
276                         "ark.privURI=SSK@XdHMiRl",
277                         "EndMessage"
278                 );
279                 assertThat(nodeData.get().getARK().getPrivateURI(), is("SSK@XdHMiRl"));
280         }
281
282         @Test
283         public void defaultFcpClientCanGetNodeInformationWithVolatileData()
284         throws InterruptedException, ExecutionException, IOException {
285                 Future<NodeData> nodeData = fcpClient.getNode().includeVolatile().execute();
286                 connectNode();
287                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
288                 String identifier = extractIdentifier(lines);
289                 assertThat(lines, matchesFcpMessage(
290                         "GetNode",
291                         "Identifier=" + identifier,
292                         "GiveOpennetRef=false",
293                         "WithPrivate=false",
294                         "WithVolatile=true"
295                 ));
296                 fcpServer.writeLine(
297                         "NodeData",
298                         "Identifier=" + identifier,
299                         "opennet=false",
300                         "ark.pubURI=SSK@3YEf.../ark",
301                         "ark.number=78",
302                         "auth.negTypes=2",
303                         "version=Fred,0.7,1.0,1466",
304                         "lastGoodVersion=Fred,0.7,1.0,1466",
305                         "volatile.freeJavaMemory=205706528",
306                         "EndMessage"
307                 );
308                 assertThat(nodeData.get().getVolatile("freeJavaMemory"), is("205706528"));
309         }
310
311         private List<String> lines;
312         private String identifier;
313
314         private void connectAndAssert(Supplier<Matcher<List<String>>> requestMatcher)
315         throws InterruptedException, ExecutionException, IOException {
316                 connectNode();
317                 readMessage(requestMatcher);
318         }
319
320         private void readMessage(Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
321                 readMessage("EndMessage", requestMatcher);
322         }
323
324         private void readMessage(String terminator, Supplier<Matcher<List<String>>> requestMatcher) throws IOException {
325                 lines = fcpServer.collectUntil(is(terminator));
326                 identifier = extractIdentifier(lines);
327                 assertThat(lines, requestMatcher.get());
328         }
329
330         public class ConnectionsAndKeyPairs {
331
332                 public class Connections {
333
334                         @Test(expected = ExecutionException.class)
335                         public void throwsExceptionOnFailure() throws IOException, ExecutionException, InterruptedException {
336                                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
337                                 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
338                                 fcpServer.writeLine(
339                                         "CloseConnectionDuplicateClientName",
340                                         "EndMessage"
341                                 );
342                                 keyPairFuture.get();
343                         }
344
345                         @Test(expected = ExecutionException.class)
346                         public void throwsExceptionIfConnectionIsClosed() throws IOException, ExecutionException, InterruptedException {
347                                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
348                                 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
349                                 fcpServer.close();
350                                 keyPairFuture.get();
351                         }
352
353                         @Test
354                         public void connectionIsReused() throws InterruptedException, ExecutionException, IOException {
355                                 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
356                                 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
357                                 replyWithKeyPair();
358                                 keyPair.get();
359                                 keyPair = fcpClient.generateKeypair().execute();
360                                 readMessage(() -> matchesFcpMessage("GenerateSSK"));
361                                 identifier = extractIdentifier(lines);
362                                 replyWithKeyPair();
363                                 keyPair.get();
364                         }
365
366                         @Test
367                         public void defaultFcpClientCanReconnectAfterConnectionHasBeenClosed()
368                         throws InterruptedException, ExecutionException, IOException {
369                                 Future<FcpKeyPair> keyPair = fcpClient.generateKeypair().execute();
370                                 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
371                                 fcpServer.close();
372                                 try {
373                                         keyPair.get();
374                                         Assert.fail();
375                                 } catch (ExecutionException e) {
376                                         /* ignore. */
377                                 }
378                                 keyPair = fcpClient.generateKeypair().execute();
379                                 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
380                                 replyWithKeyPair();
381                                 keyPair.get();
382                         }
383
384                 }
385
386                 public class GenerateKeyPair {
387
388                         @Test
389                         public void defaultFcpClientCanGenerateKeypair()
390                         throws ExecutionException, InterruptedException, IOException {
391                                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
392                                 connectAndAssert(() -> matchesFcpMessage("GenerateSSK"));
393                                 replyWithKeyPair();
394                                 FcpKeyPair keyPair = keyPairFuture.get();
395                                 assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
396                                 assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
397                         }
398
399                 }
400
401                 private void replyWithKeyPair() throws IOException {
402                         fcpServer.writeLine("SSKKeypair",
403                                 "InsertURI=" + INSERT_URI + "",
404                                 "RequestURI=" + REQUEST_URI + "",
405                                 "Identifier=" + identifier,
406                                 "EndMessage");
407                 }
408
409         }
410
411         public class Peers {
412
413                 public class PeerCommands {
414
415                         public class ListPeer {
416
417                                 @Test
418                                 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
419                                         Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id1").execute();
420                                         connectAndAssert(() -> matchesListPeer("id1"));
421                                         replyWithPeer("id1");
422                                         assertThat(peer.get().get().getIdentity(), is("id1"));
423                                 }
424
425                                 @Test
426                                 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
427                                         Future<Optional<Peer>> peer = fcpClient.listPeer().byHostAndPort("host.free.net", 12345).execute();
428                                         connectAndAssert(() -> matchesListPeer("host.free.net:12345"));
429                                         replyWithPeer("id1");
430                                         assertThat(peer.get().get().getIdentity(), is("id1"));
431                                 }
432
433                                 @Test
434                                 public void byName() throws InterruptedException, ExecutionException, IOException {
435                                         Future<Optional<Peer>> peer = fcpClient.listPeer().byName("FriendNode").execute();
436                                         connectAndAssert(() -> matchesListPeer("FriendNode"));
437                                         replyWithPeer("id1");
438                                         assertThat(peer.get().get().getIdentity(), is("id1"));
439                                 }
440
441                                 @Test
442                                 public void unknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
443                                         Future<Optional<Peer>> peer = fcpClient.listPeer().byIdentity("id2").execute();
444                                         connectAndAssert(() -> matchesListPeer("id2"));
445                                         replyWithUnknownNodeIdentifier();
446                                         assertThat(peer.get().isPresent(), is(false));
447                                 }
448
449                                 private Matcher<List<String>> matchesListPeer(String nodeId) {
450                                         return matchesFcpMessage(
451                                                 "ListPeer",
452                                                 "Identifier=" + identifier,
453                                                 "NodeIdentifier=" + nodeId
454                                         );
455                                 }
456
457                         }
458
459                         public class ListPeers {
460
461                                 @Test
462                                 public void withoutMetadataOrVolatile() throws IOException, ExecutionException, InterruptedException {
463                                         Future<Collection<Peer>> peers = fcpClient.listPeers().execute();
464                                         connectAndAssert(() -> matchesListPeers(false, false));
465                                         replyWithPeer("id1");
466                                         replyWithPeer("id2");
467                                         sendEndOfPeerList();
468                                         assertThat(peers.get(), hasSize(2));
469                                         assertThat(peers.get().stream().map(Peer::getIdentity).collect(Collectors.toList()),
470                                                 containsInAnyOrder("id1", "id2"));
471                                 }
472
473                                 @Test
474                                 public void withMetadata() throws IOException, ExecutionException, InterruptedException {
475                                         Future<Collection<Peer>> peers = fcpClient.listPeers().includeMetadata().execute();
476                                         connectAndAssert(() -> matchesListPeers(false, true));
477                                         replyWithPeer("id1", "metadata.foo=bar1");
478                                         replyWithPeer("id2", "metadata.foo=bar2");
479                                         sendEndOfPeerList();
480                                         assertThat(peers.get(), hasSize(2));
481                                         assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()),
482                                                 containsInAnyOrder("bar1", "bar2"));
483                                 }
484
485                                 @Test
486                                 public void withVolatile() throws IOException, ExecutionException, InterruptedException {
487                                         Future<Collection<Peer>> peers = fcpClient.listPeers().includeVolatile().execute();
488                                         connectAndAssert(() -> matchesListPeers(true, false));
489                                         replyWithPeer("id1", "volatile.foo=bar1");
490                                         replyWithPeer("id2", "volatile.foo=bar2");
491                                         sendEndOfPeerList();
492                                         assertThat(peers.get(), hasSize(2));
493                                         assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()),
494                                                 containsInAnyOrder("bar1", "bar2"));
495                                 }
496
497                                 private Matcher<List<String>> matchesListPeers(boolean withVolatile, boolean withMetadata) {
498                                         return matchesFcpMessage(
499                                                 "ListPeers",
500                                                 "WithVolatile=" + withVolatile,
501                                                 "WithMetadata=" + withMetadata
502                                         );
503                                 }
504
505                                 private void sendEndOfPeerList() throws IOException {
506                                         fcpServer.writeLine(
507                                                 "EndListPeers",
508                                                 "Identifier=" + identifier,
509                                                 "EndMessage"
510                                         );
511                                 }
512
513                         }
514
515                         public class AddPeer {
516
517                                 @Test
518                                 public void fromFile() throws InterruptedException, ExecutionException, IOException {
519                                         Future<Optional<Peer>> peer = fcpClient.addPeer().fromFile(new File("/tmp/ref.txt")).execute();
520                                         connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("File=/tmp/ref.txt")));
521                                         replyWithPeer("id1");
522                                         assertThat(peer.get().get().getIdentity(), is("id1"));
523                                 }
524
525                                 @Test
526                                 public void fromUrl() throws InterruptedException, ExecutionException, IOException {
527                                         Future<Optional<Peer>> peer = fcpClient.addPeer().fromURL(new URL("http://node.ref/")).execute();
528                                         connectAndAssert(() -> allOf(matchesAddPeer(), hasItem("URL=http://node.ref/")));
529                                         replyWithPeer("id1");
530                                         assertThat(peer.get().get().getIdentity(), is("id1"));
531                                 }
532
533                                 @Test
534                                 public void fromNodeRef() throws InterruptedException, ExecutionException, IOException {
535                                         NodeRef nodeRef = createNodeRef();
536                                         Future<Optional<Peer>> peer = fcpClient.addPeer().fromNodeRef(nodeRef).execute();
537                                         connectAndAssert(() -> allOf(matchesAddPeer(), Matchers.<String>hasItems(
538                                                 "myName=name",
539                                                 "ark.pubURI=public",
540                                                 "ark.number=1",
541                                                 "dsaGroup.g=base",
542                                                 "dsaGroup.p=prime",
543                                                 "dsaGroup.q=subprime",
544                                                 "dsaPubKey.y=dsa-public",
545                                                 "physical.udp=1.2.3.4:5678",
546                                                 "auth.negTypes=3;5",
547                                                 "sig=sig"
548                                         )));
549                                         replyWithPeer("id1");
550                                         assertThat(peer.get().get().getIdentity(), is("id1"));
551                                 }
552
553                                 private NodeRef createNodeRef() {
554                                         NodeRef nodeRef = new NodeRef();
555                                         nodeRef.setIdentity("id1");
556                                         nodeRef.setName("name");
557                                         nodeRef.setARK(new ARK("public", "1"));
558                                         nodeRef.setDSAGroup(new DSAGroup("base", "prime", "subprime"));
559                                         nodeRef.setNegotiationTypes(new int[] { 3, 5 });
560                                         nodeRef.setPhysicalUDP("1.2.3.4:5678");
561                                         nodeRef.setDSAPublicKey("dsa-public");
562                                         nodeRef.setSignature("sig");
563                                         return nodeRef;
564                                 }
565
566                                 private Matcher<List<String>> matchesAddPeer() {
567                                         return matchesFcpMessage(
568                                                 "AddPeer",
569                                                 "Identifier=" + identifier
570                                         );
571                                 }
572
573                         }
574
575                         public class ModifyPeer {
576
577                                 @Test
578                                 public void defaultFcpClientCanEnablePeerByName()
579                                 throws InterruptedException, ExecutionException, IOException {
580                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byName("id1").execute();
581                                         connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
582                                         replyWithPeer("id1");
583                                         assertThat(peer.get().get().getIdentity(), is("id1"));
584                                 }
585
586                                 @Test
587                                 public void defaultFcpClientCanDisablePeerByName()
588                                 throws InterruptedException, ExecutionException, IOException {
589                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().disable().byName("id1").execute();
590                                         connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", true));
591                                         replyWithPeer("id1");
592                                         assertThat(peer.get().get().getIdentity(), is("id1"));
593                                 }
594
595                                 @Test
596                                 public void defaultFcpClientCanEnablePeerByIdentity()
597                                 throws InterruptedException, ExecutionException, IOException {
598                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
599                                         connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
600                                         replyWithPeer("id1");
601                                         assertThat(peer.get().get().getIdentity(), is("id1"));
602                                 }
603
604                                 @Test
605                                 public void defaultFcpClientCanEnablePeerByHostAndPort()
606                                 throws InterruptedException, ExecutionException, IOException {
607                                         Future<Optional<Peer>> peer =
608                                                 fcpClient.modifyPeer().enable().byHostAndPort("1.2.3.4", 5678).execute();
609                                         connectAndAssert(() -> matchesModifyPeer("1.2.3.4:5678", "IsDisabled", false));
610                                         replyWithPeer("id1");
611                                         assertThat(peer.get().get().getIdentity(), is("id1"));
612                                 }
613
614                                 @Test
615                                 public void allowLocalAddressesOfPeer() throws InterruptedException, ExecutionException, IOException {
616                                         Future<Optional<Peer>> peer =
617                                                 fcpClient.modifyPeer().allowLocalAddresses().byIdentity("id1").execute();
618                                         connectAndAssert(() -> allOf(
619                                                 matchesModifyPeer("id1", "AllowLocalAddresses", true),
620                                                 not(contains(startsWith("IsDisabled=")))
621                                         ));
622                                         replyWithPeer("id1");
623                                         assertThat(peer.get().get().getIdentity(), is("id1"));
624                                 }
625
626                                 @Test
627                                 public void disallowLocalAddressesOfPeer()
628                                 throws InterruptedException, ExecutionException, IOException {
629                                         Future<Optional<Peer>> peer =
630                                                 fcpClient.modifyPeer().disallowLocalAddresses().byIdentity("id1").execute();
631                                         connectAndAssert(() -> allOf(
632                                                 matchesModifyPeer("id1", "AllowLocalAddresses", false),
633                                                 not(contains(startsWith("IsDisabled=")))
634                                         ));
635                                         replyWithPeer("id1");
636                                         assertThat(peer.get().get().getIdentity(), is("id1"));
637                                 }
638
639                                 @Test
640                                 public void setBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
641                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().setBurstOnly().byIdentity("id1").execute();
642                                         connectAndAssert(() -> allOf(
643                                                 matchesModifyPeer("id1", "IsBurstOnly", true),
644                                                 not(contains(startsWith("AllowLocalAddresses="))),
645                                                 not(contains(startsWith("IsDisabled=")))
646                                         ));
647                                         replyWithPeer("id1");
648                                         assertThat(peer.get().get().getIdentity(), is("id1"));
649                                 }
650
651                                 @Test
652                                 public void clearBurstOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
653                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearBurstOnly().byIdentity("id1").execute();
654                                         connectAndAssert(() -> allOf(
655                                                 matchesModifyPeer("id1", "IsBurstOnly", false),
656                                                 not(contains(startsWith("AllowLocalAddresses="))),
657                                                 not(contains(startsWith("IsDisabled=")))
658                                         ));
659                                         replyWithPeer("id1");
660                                         assertThat(peer.get().get().getIdentity(), is("id1"));
661                                 }
662
663                                 @Test
664                                 public void defaultFcpClientCanSetListenOnlyForPeer()
665                                 throws InterruptedException, ExecutionException, IOException {
666                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().setListenOnly().byIdentity("id1").execute();
667                                         connectAndAssert(() -> allOf(
668                                                 matchesModifyPeer("id1", "IsListenOnly", true),
669                                                 not(contains(startsWith("AllowLocalAddresses="))),
670                                                 not(contains(startsWith("IsDisabled="))),
671                                                 not(contains(startsWith("IsBurstOnly=")))
672                                         ));
673                                         replyWithPeer("id1");
674                                         assertThat(peer.get().get().getIdentity(), is("id1"));
675                                 }
676
677                                 @Test
678                                 public void clearListenOnlyForPeer() throws InterruptedException, ExecutionException, IOException {
679                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().clearListenOnly().byIdentity("id1").execute();
680                                         connectAndAssert(() -> allOf(
681                                                 matchesModifyPeer("id1", "IsListenOnly", false),
682                                                 not(contains(startsWith("AllowLocalAddresses="))),
683                                                 not(contains(startsWith("IsDisabled="))),
684                                                 not(contains(startsWith("IsBurstOnly=")))
685                                         ));
686                                         replyWithPeer("id1");
687                                         assertThat(peer.get().get().getIdentity(), is("id1"));
688                                 }
689
690                                 @Test
691                                 public void ignoreSourceForPeer() throws InterruptedException, ExecutionException, IOException {
692                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().ignoreSource().byIdentity("id1").execute();
693                                         connectAndAssert(() -> allOf(
694                                                 matchesModifyPeer("id1", "IgnoreSourcePort", true),
695                                                 not(contains(startsWith("AllowLocalAddresses="))),
696                                                 not(contains(startsWith("IsDisabled="))),
697                                                 not(contains(startsWith("IsBurstOnly="))),
698                                                 not(contains(startsWith("IsListenOnly=")))
699                                         ));
700                                         replyWithPeer("id1");
701                                         assertThat(peer.get().get().getIdentity(), is("id1"));
702                                 }
703
704                                 @Test
705                                 public void useSourceForPeer() throws InterruptedException, ExecutionException, IOException {
706                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().useSource().byIdentity("id1").execute();
707                                         connectAndAssert(() -> allOf(
708                                                 matchesModifyPeer("id1", "IgnoreSourcePort", false),
709                                                 not(contains(startsWith("AllowLocalAddresses="))),
710                                                 not(contains(startsWith("IsDisabled="))),
711                                                 not(contains(startsWith("IsBurstOnly="))),
712                                                 not(contains(startsWith("IsListenOnly=")))
713                                         ));
714                                         replyWithPeer("id1");
715                                         assertThat(peer.get().get().getIdentity(), is("id1"));
716                                 }
717
718                                 @Test
719                                 public void unknownNode() throws InterruptedException, ExecutionException, IOException {
720                                         Future<Optional<Peer>> peer = fcpClient.modifyPeer().enable().byIdentity("id1").execute();
721                                         connectAndAssert(() -> matchesModifyPeer("id1", "IsDisabled", false));
722                                         replyWithUnknownNodeIdentifier();
723                                         assertThat(peer.get().isPresent(), is(false));
724                                 }
725
726                                 private Matcher<List<String>> matchesModifyPeer(String nodeIdentifier, String setting, boolean value) {
727                                         return matchesFcpMessage(
728                                                 "ModifyPeer",
729                                                 "Identifier=" + identifier,
730                                                 "NodeIdentifier=" + nodeIdentifier,
731                                                 setting + "=" + value
732                                         );
733                                 }
734
735                         }
736
737                         public class RemovePeer {
738
739                                 @Test
740                                 public void byName() throws InterruptedException, ExecutionException, IOException {
741                                         Future<Boolean> peer = fcpClient.removePeer().byName("Friend1").execute();
742                                         connectAndAssert(() -> matchesRemovePeer("Friend1"));
743                                         replyWithPeerRemoved("Friend1");
744                                         assertThat(peer.get(), is(true));
745                                 }
746
747                                 @Test
748                                 public void invalidName() throws InterruptedException, ExecutionException, IOException {
749                                         Future<Boolean> peer = fcpClient.removePeer().byName("NotFriend1").execute();
750                                         connectAndAssert(() -> matchesRemovePeer("NotFriend1"));
751                                         replyWithUnknownNodeIdentifier();
752                                         assertThat(peer.get(), is(false));
753                                 }
754
755                                 @Test
756                                 public void byIdentity() throws InterruptedException, ExecutionException, IOException {
757                                         Future<Boolean> peer = fcpClient.removePeer().byIdentity("id1").execute();
758                                         connectAndAssert(() -> matchesRemovePeer("id1"));
759                                         replyWithPeerRemoved("id1");
760                                         assertThat(peer.get(), is(true));
761                                 }
762
763                                 @Test
764                                 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
765                                         Future<Boolean> peer = fcpClient.removePeer().byHostAndPort("1.2.3.4", 5678).execute();
766                                         connectAndAssert(() -> matchesRemovePeer("1.2.3.4:5678"));
767                                         replyWithPeerRemoved("Friend1");
768                                         assertThat(peer.get(), is(true));
769                                 }
770
771                                 private Matcher<List<String>> matchesRemovePeer(String nodeIdentifier) {
772                                         return matchesFcpMessage(
773                                                 "RemovePeer",
774                                                 "Identifier=" + identifier,
775                                                 "NodeIdentifier=" + nodeIdentifier
776                                         );
777                                 }
778
779                                 private void replyWithPeerRemoved(String nodeIdentifier) throws IOException {
780                                         fcpServer.writeLine(
781                                                 "PeerRemoved",
782                                                 "Identifier=" + identifier,
783                                                 "NodeIdentifier=" + nodeIdentifier,
784                                                 "EndMessage"
785                                         );
786                                 }
787
788                         }
789
790                         private void replyWithPeer(String peerId, String... additionalLines) throws IOException {
791                                 fcpServer.writeLine(
792                                         "Peer",
793                                         "Identifier=" + identifier,
794                                         "identity=" + peerId,
795                                         "opennet=false",
796                                         "ark.pubURI=SSK@3YEf.../ark",
797                                         "ark.number=78",
798                                         "auth.negTypes=2",
799                                         "version=Fred,0.7,1.0,1466",
800                                         "lastGoodVersion=Fred,0.7,1.0,1466"
801                                 );
802                                 fcpServer.writeLine(additionalLines);
803                                 fcpServer.writeLine("EndMessage");
804                         }
805
806                 }
807
808                 public class PeerNoteCommands {
809
810                         public class ListPeerNotes {
811
812                                 @Test
813                                 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
814                                         Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
815                                         connectAndAssert(() -> matchesListPeerNotes("Friend1"));
816                                         replyWithUnknownNodeIdentifier();
817                                         assertThat(peerNote.get().isPresent(), is(false));
818                                 }
819
820                                 @Test
821                                 public void byNodeName() throws InterruptedException, ExecutionException, IOException {
822                                         Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byName("Friend1").execute();
823                                         connectAndAssert(() -> matchesListPeerNotes("Friend1"));
824                                         replyWithPeerNote();
825                                         replyWithEndListPeerNotes();
826                                         assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
827                                         assertThat(peerNote.get().get().getPeerNoteType(), is(1));
828                                 }
829
830                                 @Test
831                                 public void byNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
832                                         Future<Optional<PeerNote>> peerNote = fcpClient.listPeerNotes().byIdentity("id1").execute();
833                                         connectAndAssert(() -> matchesListPeerNotes("id1"));
834                                         replyWithPeerNote();
835                                         replyWithEndListPeerNotes();
836                                         assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
837                                         assertThat(peerNote.get().get().getPeerNoteType(), is(1));
838                                 }
839
840                                 @Test
841                                 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
842                                         Future<Optional<PeerNote>> peerNote =
843                                                 fcpClient.listPeerNotes().byHostAndPort("1.2.3.4", 5678).execute();
844                                         connectAndAssert(() -> matchesListPeerNotes("1.2.3.4:5678"));
845                                         replyWithPeerNote();
846                                         replyWithEndListPeerNotes();
847                                         assertThat(peerNote.get().get().getNoteText(), is("RXhhbXBsZSBUZXh0Lg=="));
848                                         assertThat(peerNote.get().get().getPeerNoteType(), is(1));
849                                 }
850
851                                 private Matcher<List<String>> matchesListPeerNotes(String nodeIdentifier) {
852                                         return matchesFcpMessage(
853                                                 "ListPeerNotes",
854                                                 "NodeIdentifier=" + nodeIdentifier
855                                         );
856                                 }
857
858                                 private void replyWithEndListPeerNotes() throws IOException {
859                                         fcpServer.writeLine(
860                                                 "EndListPeerNotes",
861                                                 "Identifier=" + identifier,
862                                                 "EndMessage"
863                                         );
864                                 }
865
866                                 private void replyWithPeerNote() throws IOException {
867                                         fcpServer.writeLine(
868                                                 "PeerNote",
869                                                 "Identifier=" + identifier,
870                                                 "NodeIdentifier=Friend1",
871                                                 "NoteText=RXhhbXBsZSBUZXh0Lg==",
872                                                 "PeerNoteType=1",
873                                                 "EndMessage"
874                                         );
875                                 }
876
877                         }
878
879                         public class ModifyPeerNotes {
880
881                                 @Test
882                                 public void byName() throws InterruptedException, ExecutionException, IOException {
883                                         Future<Boolean> noteUpdated =
884                                                 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
885                                         connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
886                                         replyWithPeerNote();
887                                         assertThat(noteUpdated.get(), is(true));
888                                 }
889
890                                 @Test
891                                 public void onUnknownNodeIdentifier() throws InterruptedException, ExecutionException, IOException {
892                                         Future<Boolean> noteUpdated =
893                                                 fcpClient.modifyPeerNote().darknetComment("foo").byName("Friend1").execute();
894                                         connectAndAssert(() -> matchesModifyPeerNote("Friend1"));
895                                         replyWithUnknownNodeIdentifier();
896                                         assertThat(noteUpdated.get(), is(false));
897                                 }
898
899                                 @Test
900                                 public void defaultFcpClientFailsToModifyPeerNoteWithoutPeerNote()
901                                 throws InterruptedException, ExecutionException, IOException {
902                                         Future<Boolean> noteUpdated = fcpClient.modifyPeerNote().byName("Friend1").execute();
903                                         assertThat(noteUpdated.get(), is(false));
904                                 }
905
906                                 @Test
907                                 public void byIdentifier() throws InterruptedException, ExecutionException, IOException {
908                                         Future<Boolean> noteUpdated =
909                                                 fcpClient.modifyPeerNote().darknetComment("foo").byIdentifier("id1").execute();
910                                         connectAndAssert(() -> matchesModifyPeerNote("id1"));
911                                         replyWithPeerNote();
912                                         assertThat(noteUpdated.get(), is(true));
913                                 }
914
915                                 @Test
916                                 public void byHostAndPort() throws InterruptedException, ExecutionException, IOException {
917                                         Future<Boolean> noteUpdated =
918                                                 fcpClient.modifyPeerNote().darknetComment("foo").byHostAndPort("1.2.3.4", 5678).execute();
919                                         connectAndAssert(() -> matchesModifyPeerNote("1.2.3.4:5678"));
920                                         replyWithPeerNote();
921                                         assertThat(noteUpdated.get(), is(true));
922                                 }
923
924                                 private Matcher<List<String>> matchesModifyPeerNote(String nodeIdentifier) {
925                                         return matchesFcpMessage(
926                                                 "ModifyPeerNote",
927                                                 "Identifier=" + identifier,
928                                                 "NodeIdentifier=" + nodeIdentifier,
929                                                 "PeerNoteType=1",
930                                                 "NoteText=Zm9v"
931                                         );
932                                 }
933
934                                 private void replyWithPeerNote() throws IOException {
935                                         fcpServer.writeLine(
936                                                 "PeerNote",
937                                                 "Identifier=" + identifier,
938                                                 "NodeIdentifier=Friend1",
939                                                 "NoteText=Zm9v",
940                                                 "PeerNoteType=1",
941                                                 "EndMessage"
942                                         );
943                                 }
944
945                         }
946
947                 }
948
949                 private void replyWithUnknownNodeIdentifier() throws IOException {
950                         fcpServer.writeLine(
951                                 "UnknownNodeIdentifier",
952                                 "Identifier=" + identifier,
953                                 "NodeIdentifier=id2",
954                                 "EndMessage"
955                         );
956                 }
957
958         }
959
960         public class PluginCommands {
961
962                 private static final String CLASS_NAME = "foo.plugin.Plugin";
963
964                 private void replyWithPluginInfo() throws IOException {
965                         fcpServer.writeLine(
966                                 "PluginInfo",
967                                 "Identifier=" + identifier,
968                                 "PluginName=superPlugin",
969                                 "IsTalkable=true",
970                                 "LongVersion=1.2.3",
971                                 "Version=42",
972                                 "OriginUri=superPlugin",
973                                 "Started=true",
974                                 "EndMessage"
975                         );
976                 }
977
978                 private void verifyPluginInfo(Future<Optional<PluginInfo>> pluginInfo)
979                 throws InterruptedException, ExecutionException {
980                         assertThat(pluginInfo.get().get().getPluginName(), is("superPlugin"));
981                         assertThat(pluginInfo.get().get().getOriginalURI(), is("superPlugin"));
982                         assertThat(pluginInfo.get().get().isTalkable(), is(true));
983                         assertThat(pluginInfo.get().get().getVersion(), is("42"));
984                         assertThat(pluginInfo.get().get().getLongVersion(), is("1.2.3"));
985                         assertThat(pluginInfo.get().get().isStarted(), is(true));
986                 }
987
988                 public class LoadPlugin {
989
990                         public class OfficialPlugins {
991
992                                 @Test
993                                 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
994                                         Future<Optional<PluginInfo>> pluginInfo =
995                                                 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
996                                         connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
997                                         assertThat(lines, not(contains(startsWith("Store="))));
998                                         replyWithPluginInfo();
999                                         verifyPluginInfo(pluginInfo);
1000                                 }
1001
1002                                 @Test
1003                                 public void persistentFromFreenet() throws ExecutionException, InterruptedException, IOException {
1004                                         Future<Optional<PluginInfo>> pluginInfo =
1005                                                 fcpClient.loadPlugin().addToConfig().officialFromFreenet("superPlugin").execute();
1006                                         connectAndAssert(() -> createMatcherForOfficialSource("freenet"));
1007                                         assertThat(lines, hasItem("Store=true"));
1008                                         replyWithPluginInfo();
1009                                         verifyPluginInfo(pluginInfo);
1010                                 }
1011
1012                                 @Test
1013                                 public void fromHttps() throws ExecutionException, InterruptedException, IOException {
1014                                         Future<Optional<PluginInfo>> pluginInfo =
1015                                                 fcpClient.loadPlugin().officialFromHttps("superPlugin").execute();
1016                                         connectAndAssert(() -> createMatcherForOfficialSource("https"));
1017                                         replyWithPluginInfo();
1018                                         verifyPluginInfo(pluginInfo);
1019                                 }
1020
1021                                 private Matcher<List<String>> createMatcherForOfficialSource(String officialSource) {
1022                                         return matchesFcpMessage(
1023                                                 "LoadPlugin",
1024                                                 "Identifier=" + identifier,
1025                                                 "PluginURL=superPlugin",
1026                                                 "URLType=official",
1027                                                 "OfficialSource=" + officialSource
1028                                         );
1029                                 }
1030
1031                         }
1032
1033                         public class FromOtherSources {
1034
1035                                 private static final String FILE_PATH = "/path/to/plugin.jar";
1036                                 private static final String URL = "http://server.com/plugin.jar";
1037                                 private static final String KEY = "KSK@plugin.jar";
1038
1039                                 @Test
1040                                 public void fromFile() throws ExecutionException, InterruptedException, IOException {
1041                                         Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFile(FILE_PATH).execute();
1042                                         connectAndAssert(() -> createMatcher("file", FILE_PATH));
1043                                         replyWithPluginInfo();
1044                                         verifyPluginInfo(pluginInfo);
1045                                 }
1046
1047                                 @Test
1048                                 public void fromUrl() throws ExecutionException, InterruptedException, IOException {
1049                                         Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromUrl(URL).execute();
1050                                         connectAndAssert(() -> createMatcher("url", URL));
1051                                         replyWithPluginInfo();
1052                                         verifyPluginInfo(pluginInfo);
1053                                 }
1054
1055                                 @Test
1056                                 public void fromFreenet() throws ExecutionException, InterruptedException, IOException {
1057                                         Future<Optional<PluginInfo>> pluginInfo = fcpClient.loadPlugin().fromFreenet(KEY).execute();
1058                                         connectAndAssert(() -> createMatcher("freenet", KEY));
1059                                         replyWithPluginInfo();
1060                                         verifyPluginInfo(pluginInfo);
1061                                 }
1062
1063                                 private Matcher<List<String>> createMatcher(String urlType, String url) {
1064                                         return matchesFcpMessage(
1065                                                 "LoadPlugin",
1066                                                 "Identifier=" + identifier,
1067                                                 "PluginURL=" + url,
1068                                                 "URLType=" + urlType
1069                                         );
1070                                 }
1071
1072                         }
1073
1074                         public class Failed {
1075
1076                                 @Test
1077                                 public void failedLoad() throws ExecutionException, InterruptedException, IOException {
1078                                         Future<Optional<PluginInfo>> pluginInfo =
1079                                                 fcpClient.loadPlugin().officialFromFreenet("superPlugin").execute();
1080                                         connectAndAssert(() -> matchesFcpMessage("LoadPlugin"));
1081                                         replyWithProtocolError();
1082                                         assertThat(pluginInfo.get().isPresent(), is(false));
1083                                 }
1084
1085                         }
1086
1087                 }
1088
1089                 private void replyWithProtocolError() throws IOException {
1090                         fcpServer.writeLine(
1091                                 "ProtocolError",
1092                                 "Identifier=" + identifier,
1093                                 "EndMessage"
1094                         );
1095                 }
1096
1097                 public class ReloadPlugin {
1098
1099                         @Test
1100                         public void reloadingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1101                                 Future<Optional<PluginInfo>> pluginInfo = fcpClient.reloadPlugin().plugin(CLASS_NAME).execute();
1102                                 connectAndAssert(this::matchReloadPluginMessage);
1103                                 replyWithPluginInfo();
1104                                 verifyPluginInfo(pluginInfo);
1105                         }
1106
1107                         @Test
1108                         public void reloadingPluginWithMaxWaitTimeWorks()
1109                         throws InterruptedException, ExecutionException, IOException {
1110                                 Future<Optional<PluginInfo>> pluginInfo =
1111                                         fcpClient.reloadPlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1112                                 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("MaxWaitTime=1234")));
1113                                 replyWithPluginInfo();
1114                                 verifyPluginInfo(pluginInfo);
1115                         }
1116
1117                         @Test
1118                         public void reloadingPluginWithPurgeWorks()
1119                         throws InterruptedException, ExecutionException, IOException {
1120                                 Future<Optional<PluginInfo>> pluginInfo =
1121                                         fcpClient.reloadPlugin().purge().plugin(CLASS_NAME).execute();
1122                                 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Purge=true")));
1123                                 replyWithPluginInfo();
1124                                 verifyPluginInfo(pluginInfo);
1125                         }
1126
1127                         @Test
1128                         public void reloadingPluginWithStoreWorks()
1129                         throws InterruptedException, ExecutionException, IOException {
1130                                 Future<Optional<PluginInfo>> pluginInfo =
1131                                         fcpClient.reloadPlugin().addToConfig().plugin(CLASS_NAME).execute();
1132                                 connectAndAssert(() -> allOf(matchReloadPluginMessage(), hasItem("Store=true")));
1133                                 replyWithPluginInfo();
1134                                 verifyPluginInfo(pluginInfo);
1135                         }
1136
1137                         private Matcher<List<String>> matchReloadPluginMessage() {
1138                                 return matchesFcpMessage(
1139                                         "ReloadPlugin",
1140                                         "Identifier=" + identifier,
1141                                         "PluginName=" + CLASS_NAME
1142                                 );
1143                         }
1144
1145                 }
1146
1147                 public class RemovePlugin {
1148
1149                         @Test
1150                         public void removingPluginWorks() throws InterruptedException, ExecutionException, IOException {
1151                                 Future<Boolean> pluginRemoved = fcpClient.removePlugin().plugin(CLASS_NAME).execute();
1152                                 connectAndAssert(this::matchPluginRemovedMessage);
1153                                 replyWithPluginRemoved();
1154                                 assertThat(pluginRemoved.get(), is(true));
1155                         }
1156
1157                         @Test
1158                         public void removingPluginWithMaxWaitTimeWorks()
1159                         throws InterruptedException, ExecutionException, IOException {
1160                                 Future<Boolean> pluginRemoved = fcpClient.removePlugin().waitFor(1234).plugin(CLASS_NAME).execute();
1161                                 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("MaxWaitTime=1234")));
1162                                 replyWithPluginRemoved();
1163                                 assertThat(pluginRemoved.get(), is(true));
1164                         }
1165
1166                         @Test
1167                         public void removingPluginWithPurgeWorks()
1168                         throws InterruptedException, ExecutionException, IOException {
1169                                 Future<Boolean> pluginRemoved = fcpClient.removePlugin().purge().plugin(CLASS_NAME).execute();
1170                                 connectAndAssert(() -> allOf(matchPluginRemovedMessage(), hasItem("Purge=true")));
1171                                 replyWithPluginRemoved();
1172                                 assertThat(pluginRemoved.get(), is(true));
1173                         }
1174
1175                         private void replyWithPluginRemoved() throws IOException {
1176                                 fcpServer.writeLine(
1177                                         "PluginRemoved",
1178                                         "Identifier=" + identifier,
1179                                         "PluginName=" + CLASS_NAME,
1180                                         "EndMessage"
1181                                 );
1182                         }
1183
1184                         private Matcher<List<String>> matchPluginRemovedMessage() {
1185                                 return matchesFcpMessage(
1186                                         "RemovePlugin",
1187                                         "Identifier=" + identifier,
1188                                         "PluginName=" + CLASS_NAME
1189                                 );
1190                         }
1191
1192                 }
1193
1194                 public class GetPluginInfo {
1195
1196                         @Test
1197                         public void gettingPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
1198                                 Future<Optional<PluginInfo>> pluginInfo = fcpClient.getPluginInfo().plugin(CLASS_NAME).execute();
1199                                 connectAndAssert(this::matchGetPluginInfoMessage);
1200                                 replyWithPluginInfo();
1201                                 verifyPluginInfo(pluginInfo);
1202                         }
1203
1204                         @Test
1205                         public void gettingPluginInfoWithDetailsWorks()
1206                         throws InterruptedException, ExecutionException, IOException {
1207                                 Future<Optional<PluginInfo>> pluginInfo =
1208                                         fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1209                                 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1210                                 replyWithPluginInfo();
1211                                 verifyPluginInfo(pluginInfo);
1212                         }
1213
1214                         @Test
1215                         public void protocolErrorIsRecognizedAsFailure()
1216                         throws InterruptedException, ExecutionException, IOException {
1217                                 Future<Optional<PluginInfo>> pluginInfo =
1218                                         fcpClient.getPluginInfo().detailed().plugin(CLASS_NAME).execute();
1219                                 connectAndAssert(() -> allOf(matchGetPluginInfoMessage(), hasItem("Detailed=true")));
1220                                 replyWithProtocolError();
1221                                 assertThat(pluginInfo.get(), is(Optional.empty()));
1222                         }
1223
1224                         private Matcher<List<String>> matchGetPluginInfoMessage() {
1225                                 return matchesFcpMessage(
1226                                         "GetPluginInfo",
1227                                         "Identifier=" + identifier,
1228                                         "PluginName=" + CLASS_NAME
1229                                 );
1230                         }
1231
1232                 }
1233
1234         }
1235
1236         public class UskSubscriptionCommands {
1237
1238                 private static final String URI = "USK@some,uri/file.txt";
1239
1240                 @Test
1241                 public void subscriptionWorks() throws InterruptedException, ExecutionException, IOException {
1242                         Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1243                         connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1244                         replyWithSubscribed();
1245                         assertThat(uskSubscription.get().get().getUri(), is(URI));
1246                         AtomicInteger edition = new AtomicInteger();
1247                         CountDownLatch updated = new CountDownLatch(2);
1248                         uskSubscription.get().get().onUpdate(e -> {
1249                                 edition.set(e);
1250                                 updated.countDown();
1251                         });
1252                         sendUpdateNotification(23);
1253                         sendUpdateNotification(24);
1254                         assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1255                         assertThat(edition.get(), is(24));
1256                 }
1257
1258                 @Test
1259                 public void subscriptionUpdatesMultipleTimes() throws InterruptedException, ExecutionException, IOException {
1260                         Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1261                         connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1262                         replyWithSubscribed();
1263                         assertThat(uskSubscription.get().get().getUri(), is(URI));
1264                         AtomicInteger edition = new AtomicInteger();
1265                         CountDownLatch updated = new CountDownLatch(2);
1266                         uskSubscription.get().get().onUpdate(e -> {
1267                                 edition.set(e);
1268                                 updated.countDown();
1269                         });
1270                         uskSubscription.get().get().onUpdate(e -> updated.countDown());
1271                         sendUpdateNotification(23);
1272                         assertThat("updated in time", updated.await(5, TimeUnit.SECONDS), is(true));
1273                         assertThat(edition.get(), is(23));
1274                 }
1275
1276                 @Test
1277                 public void subscriptionCanBeCancelled() throws InterruptedException, ExecutionException, IOException {
1278                         Future<Optional<UskSubscription>> uskSubscription = fcpClient.subscribeUsk().uri(URI).execute();
1279                         connectAndAssert(() -> matchesFcpMessage("SubscribeUSK", "URI=" + URI));
1280                         replyWithSubscribed();
1281                         assertThat(uskSubscription.get().get().getUri(), is(URI));
1282                         AtomicBoolean updated = new AtomicBoolean();
1283                         uskSubscription.get().get().onUpdate(e -> updated.set(true));
1284                         uskSubscription.get().get().cancel();
1285                         readMessage(() -> matchesFcpMessage("UnsubscribeUSK", "Identifier=" + identifier));
1286                         sendUpdateNotification(23);
1287                         assertThat(updated.get(), is(false));
1288                 }
1289
1290                 private void replyWithSubscribed() throws IOException {
1291                         fcpServer.writeLine(
1292                                 "SubscribedUSK",
1293                                 "Identifier=" + identifier,
1294                                 "URI=" + URI,
1295                                 "DontPoll=false",
1296                                 "EndMessage"
1297                         );
1298                 }
1299
1300                 private void sendUpdateNotification(int edition, String... additionalLines) throws IOException {
1301                         fcpServer.writeLine(
1302                                 "SubscribedUSKUpdate",
1303                                 "Identifier=" + identifier,
1304                                 "URI=" + URI,
1305                                 "Edition=" + edition
1306                         );
1307                         fcpServer.writeLine(additionalLines);
1308                         fcpServer.writeLine("EndMessage");
1309                 }
1310
1311         }
1312
1313         public class ClientGet {
1314
1315                 @Test
1316                 public void works() throws InterruptedException, ExecutionException, IOException {
1317                         Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1318                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "ReturnType=direct"));
1319                         replyWithAllData("not-test", "Hello World", "text/plain;charset=latin-9");
1320                         replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1321                         Optional<Data> data = dataFuture.get();
1322                         verifyData(data);
1323                 }
1324
1325                 @Test
1326                 public void getFailedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1327                         Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1328                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1329                         replyWithGetFailed("not-test");
1330                         replyWithGetFailed(identifier);
1331                         Optional<Data> data = dataFuture.get();
1332                         assertThat(data.isPresent(), is(false));
1333                 }
1334
1335                 @Test
1336                 public void getFailedForDifferentIdentifierIsIgnored()
1337                 throws InterruptedException, ExecutionException, IOException {
1338                         Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1339                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1340                         replyWithGetFailed("not-test");
1341                         replyWithAllData(identifier, "Hello", "text/plain;charset=utf-8");
1342                         Optional<Data> data = dataFuture.get();
1343                         verifyData(data);
1344                 }
1345
1346                 @Test(expected = ExecutionException.class)
1347                 public void connectionClosedIsRecognized() throws InterruptedException, ExecutionException, IOException {
1348                         Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt").execute();
1349                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
1350                         fcpServer.close();
1351                         dataFuture.get();
1352                 }
1353
1354                 @Test
1355                 public void withIgnoreData() throws InterruptedException, ExecutionException, IOException {
1356                         fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt").execute();
1357                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
1358                 }
1359
1360                 @Test
1361                 public void withDataStoreOnly() throws InterruptedException, ExecutionException, IOException {
1362                         fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt").execute();
1363                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
1364                 }
1365
1366                 @Test
1367                 public void clientGetWithMaxSizeSettingSendsCorrectCommands()
1368                 throws InterruptedException, ExecutionException, IOException {
1369                         fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt").execute();
1370                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
1371                 }
1372
1373                 @Test
1374                 public void clientGetWithPrioritySettingSendsCorrectCommands()
1375                 throws InterruptedException, ExecutionException, IOException {
1376                         fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt").execute();
1377                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
1378                 }
1379
1380                 @Test
1381                 public void clientGetWithRealTimeSettingSendsCorrectCommands()
1382                 throws InterruptedException, ExecutionException, IOException {
1383                         fcpClient.clientGet().realTime().uri("KSK@foo.txt").execute();
1384                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
1385                 }
1386
1387                 @Test
1388                 public void clientGetWithGlobalSettingSendsCorrectCommands()
1389                 throws InterruptedException, ExecutionException, IOException {
1390                         fcpClient.clientGet().global().uri("KSK@foo.txt").execute();
1391                         connectAndAssert(() -> matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
1392                 }
1393
1394                 private void replyWithGetFailed(String identifier) throws IOException {
1395                         fcpServer.writeLine(
1396                                 "GetFailed",
1397                                 "Identifier=" + identifier,
1398                                 "Code=3",
1399                                 "EndMessage"
1400                         );
1401                 }
1402
1403                 private void replyWithAllData(String identifier, String text, String contentType) throws IOException {
1404                         fcpServer.writeLine(
1405                                 "AllData",
1406                                 "Identifier=" + identifier,
1407                                 "DataLength=" + (text.length() + 1),
1408                                 "StartupTime=1435610539000",
1409                                 "CompletionTime=1435610540000",
1410                                 "Metadata.ContentType=" + contentType,
1411                                 "Data",
1412                                 text
1413                         );
1414                 }
1415
1416                 private void verifyData(Optional<Data> data) throws IOException {
1417                         assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
1418                         assertThat(data.get().size(), is(6L));
1419                         assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
1420                                 is("Hello\n".getBytes(StandardCharsets.UTF_8)));
1421                 }
1422
1423         }
1424
1425         public class ClientPut {
1426
1427                 @Test
1428                 public void sendsCorrectCommand() throws IOException, ExecutionException, InterruptedException {
1429                         fcpClient.clientPut()
1430                                 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1431                                 .length(6)
1432                                 .uri("KSK@foo.txt")
1433                                 .execute();
1434                         connectNode();
1435                         readMessage("Hello", this::matchesDirectClientPut);
1436                 }
1437
1438                 @Test
1439                 public void succeedsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1440                         Future<Optional<Key>> key = fcpClient.clientPut()
1441                                 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1442                                 .length(6)
1443                                 .uri("KSK@foo.txt")
1444                                 .execute();
1445                         connectNode();
1446                         readMessage("Hello", this::matchesDirectClientPut);
1447                         replyWithPutFailed("not-the-right-one");
1448                         replyWithPutSuccessful(identifier);
1449                         assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1450                 }
1451
1452                 @Test
1453                 public void failsOnCorrectIdentifier() throws InterruptedException, ExecutionException, IOException {
1454                         Future<Optional<Key>> key = fcpClient.clientPut()
1455                                 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1456                                 .length(6)
1457                                 .uri("KSK@foo.txt")
1458                                 .execute();
1459                         connectNode();
1460                         readMessage("Hello", this::matchesDirectClientPut);
1461                         replyWithPutSuccessful("not-the-right-one");
1462                         replyWithPutFailed(identifier);
1463                         assertThat(key.get().isPresent(), is(false));
1464                 }
1465
1466                 @Test
1467                 public void renameIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1468                         fcpClient.clientPut()
1469                                 .named("otherName.txt")
1470                                 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1471                                 .length(6)
1472                                 .uri("KSK@foo.txt")
1473                                 .execute();
1474                         connectNode();
1475                         readMessage("Hello", () -> allOf(
1476                                 hasHead("ClientPut"),
1477                                 hasParameters(1, 2, "TargetFilename=otherName.txt", "UploadFrom=direct", "DataLength=6",
1478                                         "URI=KSK@foo.txt"),
1479                                 hasTail("EndMessage", "Hello")
1480                         ));
1481                 }
1482
1483                 @Test
1484                 public void redirectIsSentCorrecly() throws IOException, ExecutionException, InterruptedException {
1485                         fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt").execute();
1486                         connectAndAssert(() ->
1487                                 matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
1488                 }
1489
1490                 @Test
1491                 public void withFileIsSentCorrectly() throws InterruptedException, ExecutionException, IOException {
1492                         fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1493                         connectAndAssert(() ->
1494                                 matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
1495                 }
1496
1497                 public class DDA {
1498
1499                         private final File ddaFile;
1500                         private final File fileToUpload;
1501
1502                         public DDA() throws IOException {
1503                                 ddaFile = createDdaFile();
1504                                 fileToUpload = new File(ddaFile.getParent(), "test.dat");
1505                         }
1506
1507                         private Matcher<List<String>> matchesFileClientPut(File file) {
1508                                 return matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=" + file);
1509                         }
1510
1511                         @Test
1512                         public void completeDda() throws IOException, ExecutionException, InterruptedException {
1513                                 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1514                                 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1515                                 sendDdaRequired(identifier);
1516                                 readMessage(() -> matchesTestDDARequest(ddaFile));
1517                                 sendTestDDAReply(ddaFile.getParent(), ddaFile);
1518                                 readMessage(() -> matchesTestDDAResponse(ddaFile));
1519                                 writeTestDDAComplete(ddaFile);
1520                                 readMessage(() -> matchesFileClientPut(fileToUpload));
1521                         }
1522
1523                         @Test
1524                         public void ignoreOtherDda() throws IOException, ExecutionException, InterruptedException {
1525                                 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1526                                 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1527                                 sendDdaRequired(identifier);
1528                                 readMessage(() -> matchesTestDDARequest(ddaFile));
1529                                 sendTestDDAReply("/some-other-directory", ddaFile);
1530                                 sendTestDDAReply(ddaFile.getParent(), ddaFile);
1531                                 readMessage(() -> matchesTestDDAResponse(ddaFile));
1532                         }
1533
1534                         @Test
1535                         public void sendResponseIfFileUnreadable() throws IOException, ExecutionException, InterruptedException {
1536                                 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1537                                 connectAndAssert(() -> matchesFileClientPut(fileToUpload));
1538                                 sendDdaRequired(identifier);
1539                                 readMessage(() -> matchesTestDDARequest(ddaFile));
1540                                 sendTestDDAReply(ddaFile.getParent(), new File(ddaFile + ".foo"));
1541                                 readMessage(this::matchesFailedToReadResponse);
1542                         }
1543
1544                         @Test
1545                         public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
1546                         throws IOException, ExecutionException, InterruptedException {
1547                                 fcpClient.clientPut().from(fileToUpload).uri("KSK@foo.txt").execute();
1548                                 connectNode();
1549                                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1550                                 String identifier = extractIdentifier(lines);
1551                                 fcpServer.writeLine(
1552                                         "TestDDAComplete",
1553                                         "Directory=/some-other-directory",
1554                                         "EndMessage"
1555                                 );
1556                                 sendDdaRequired(identifier);
1557                                 lines = fcpServer.collectUntil(is("EndMessage"));
1558                                 assertThat(lines, matchesFcpMessage(
1559                                         "TestDDARequest",
1560                                         "Directory=" + ddaFile.getParent(),
1561                                         "WantReadDirectory=true",
1562                                         "WantWriteDirectory=false"
1563                                 ));
1564                         }
1565
1566                         private Matcher<List<String>> matchesFailedToReadResponse() {
1567                                 return matchesFcpMessage(
1568                                         "TestDDAResponse",
1569                                         "Directory=" + ddaFile.getParent(),
1570                                         "ReadContent=failed-to-read"
1571                                 );
1572                         }
1573
1574                         private void writeTestDDAComplete(File tempFile) throws IOException {
1575                                 fcpServer.writeLine(
1576                                         "TestDDAComplete",
1577                                         "Directory=" + tempFile.getParent(),
1578                                         "ReadDirectoryAllowed=true",
1579                                         "EndMessage"
1580                                 );
1581                         }
1582
1583                         private Matcher<List<String>> matchesTestDDAResponse(File tempFile) {
1584                                 return matchesFcpMessage(
1585                                         "TestDDAResponse",
1586                                         "Directory=" + tempFile.getParent(),
1587                                         "ReadContent=test-content"
1588                                 );
1589                         }
1590
1591                         private void sendTestDDAReply(String directory, File tempFile) throws IOException {
1592                                 fcpServer.writeLine(
1593                                         "TestDDAReply",
1594                                         "Directory=" + directory,
1595                                         "ReadFilename=" + tempFile,
1596                                         "EndMessage"
1597                                 );
1598                         }
1599
1600                         private Matcher<List<String>> matchesTestDDARequest(File tempFile) {
1601                                 return matchesFcpMessage(
1602                                         "TestDDARequest",
1603                                         "Directory=" + tempFile.getParent(),
1604                                         "WantReadDirectory=true",
1605                                         "WantWriteDirectory=false"
1606                                 );
1607                         }
1608
1609                         private void sendDdaRequired(String identifier) throws IOException {
1610                                 fcpServer.writeLine(
1611                                         "ProtocolError",
1612                                         "Identifier=" + identifier,
1613                                         "Code=25",
1614                                         "EndMessage"
1615                                 );
1616                         }
1617
1618                 }
1619
1620                 private void replyWithPutSuccessful(String identifier) throws IOException {
1621                         fcpServer.writeLine(
1622                                 "PutSuccessful",
1623                                 "URI=KSK@foo.txt",
1624                                 "Identifier=" + identifier,
1625                                 "EndMessage"
1626                         );
1627                 }
1628
1629                 private void replyWithPutFailed(String identifier) throws IOException {
1630                         fcpServer.writeLine(
1631                                 "PutFailed",
1632                                 "Identifier=" + identifier,
1633                                 "EndMessage"
1634                         );
1635                 }
1636
1637                 private Matcher<List<String>> matchesDirectClientPut() {
1638                         return allOf(
1639                                 hasHead("ClientPut"),
1640                                 hasParameters(1, 2, "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"),
1641                                 hasTail("EndMessage", "Hello")
1642                         );
1643                 }
1644
1645                 private File createDdaFile() throws IOException {
1646                         File tempFile = File.createTempFile("test-dda-", ".dat");
1647                         tempFile.deleteOnExit();
1648                         Files.write("test-content", tempFile, StandardCharsets.UTF_8);
1649                         return tempFile;
1650                 }
1651
1652                 @Test
1653                 public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
1654                 throws InterruptedException, ExecutionException, IOException {
1655                         Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1656                         connectNode();
1657                         List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1658                         String identifier = extractIdentifier(lines);
1659                         fcpServer.writeLine(
1660                                 "ProtocolError",
1661                                 "Identifier=not-the-right-one",
1662                                 "Code=25",
1663                                 "EndMessage"
1664                         );
1665                         fcpServer.writeLine(
1666                                 "PutSuccessful",
1667                                 "Identifier=" + identifier,
1668                                 "URI=KSK@foo.txt",
1669                                 "EndMessage"
1670                         );
1671                         assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1672                 }
1673
1674                 @Test
1675                 public void clientPutAbortsOnProtocolErrorOtherThan25()
1676                 throws InterruptedException, ExecutionException, IOException {
1677                         Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt").execute();
1678                         connectNode();
1679                         List<String> lines = fcpServer.collectUntil(is("EndMessage"));
1680                         String identifier = extractIdentifier(lines);
1681                         fcpServer.writeLine(
1682                                 "ProtocolError",
1683                                 "Identifier=" + identifier,
1684                                 "Code=1",
1685                                 "EndMessage"
1686                         );
1687                         assertThat(key.get().isPresent(), is(false));
1688                 }
1689
1690                 @Test
1691                 public void clientPutSendsNotificationsForGeneratedKeys()
1692                 throws InterruptedException, ExecutionException, IOException {
1693                         List<String> generatedKeys = new CopyOnWriteArrayList<>();
1694                         Future<Optional<Key>> key = fcpClient.clientPut()
1695                                 .onKeyGenerated(generatedKeys::add)
1696                                 .from(new ByteArrayInputStream("Hello\n".getBytes()))
1697                                 .length(6)
1698                                 .uri("KSK@foo.txt")
1699                                 .execute();
1700                         connectNode();
1701                         List<String> lines = fcpServer.collectUntil(is("Hello"));
1702                         String identifier = extractIdentifier(lines);
1703                         fcpServer.writeLine(
1704                                 "URIGenerated",
1705                                 "Identifier=" + identifier,
1706                                 "URI=KSK@foo.txt",
1707                                 "EndMessage"
1708                         );
1709                         replyWithPutSuccessful(identifier);
1710                         assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
1711                         assertThat(generatedKeys, contains("KSK@foo.txt"));
1712                 }
1713
1714         }
1715
1716         public class ConfigCommand {
1717
1718                 public class GetConfig {
1719
1720                         @Test
1721                         public void defaultFcpClientCanGetConfigWithoutDetails()
1722                         throws InterruptedException, ExecutionException, IOException {
1723                                 Future<ConfigData> configData = fcpClient.getConfig().execute();
1724                                 connectAndAssert(() -> matchesFcpMessage("GetConfig", "Identifier=" + identifier));
1725                                 replyWithConfigData();
1726                                 assertThat(configData.get(), notNullValue());
1727                         }
1728
1729                         @Test
1730                         public void defaultFcpClientCanGetConfigWithCurrent()
1731                         throws InterruptedException, ExecutionException, IOException {
1732                                 Future<ConfigData> configData = fcpClient.getConfig().withCurrent().execute();
1733                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithCurrent"));
1734                                 replyWithConfigData("current.foo=bar");
1735                                 assertThat(configData.get().getCurrent("foo"), is("bar"));
1736                         }
1737
1738                         @Test
1739                         public void defaultFcpClientCanGetConfigWithDefaults()
1740                         throws InterruptedException, ExecutionException, IOException {
1741                                 Future<ConfigData> configData = fcpClient.getConfig().withDefaults().execute();
1742                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithDefaults"));
1743                                 replyWithConfigData("default.foo=bar");
1744                                 assertThat(configData.get().getDefault("foo"), is("bar"));
1745                         }
1746
1747                         @Test
1748                         public void defaultFcpClientCanGetConfigWithSortOrder()
1749                         throws InterruptedException, ExecutionException, IOException {
1750                                 Future<ConfigData> configData = fcpClient.getConfig().withSortOrder().execute();
1751                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithSortOrder"));
1752                                 replyWithConfigData("sortOrder.foo=17");
1753                                 assertThat(configData.get().getSortOrder("foo"), is(17));
1754                         }
1755
1756                         @Test
1757                         public void defaultFcpClientCanGetConfigWithExpertFlag()
1758                         throws InterruptedException, ExecutionException, IOException {
1759                                 Future<ConfigData> configData = fcpClient.getConfig().withExpertFlag().execute();
1760                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithExpertFlag"));
1761                                 replyWithConfigData("expertFlag.foo=true");
1762                                 assertThat(configData.get().getExpertFlag("foo"), is(true));
1763                         }
1764
1765                         @Test
1766                         public void defaultFcpClientCanGetConfigWithForceWriteFlag()
1767                         throws InterruptedException, ExecutionException, IOException {
1768                                 Future<ConfigData> configData = fcpClient.getConfig().withForceWriteFlag().execute();
1769                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithForceWriteFlag"));
1770                                 replyWithConfigData("forceWriteFlag.foo=true");
1771                                 assertThat(configData.get().getForceWriteFlag("foo"), is(true));
1772                         }
1773
1774                         @Test
1775                         public void defaultFcpClientCanGetConfigWithShortDescription()
1776                         throws InterruptedException, ExecutionException, IOException {
1777                                 Future<ConfigData> configData = fcpClient.getConfig().withShortDescription().execute();
1778                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithShortDescription"));
1779                                 replyWithConfigData("shortDescription.foo=bar");
1780                                 assertThat(configData.get().getShortDescription("foo"), is("bar"));
1781                         }
1782
1783                         @Test
1784                         public void defaultFcpClientCanGetConfigWithLongDescription()
1785                         throws InterruptedException, ExecutionException, IOException {
1786                                 Future<ConfigData> configData = fcpClient.getConfig().withLongDescription().execute();
1787                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithLongDescription"));
1788                                 replyWithConfigData("longDescription.foo=bar");
1789                                 assertThat(configData.get().getLongDescription("foo"), is("bar"));
1790                         }
1791
1792                         @Test
1793                         public void defaultFcpClientCanGetConfigWithDataTypes()
1794                         throws InterruptedException, ExecutionException, IOException {
1795                                 Future<ConfigData> configData = fcpClient.getConfig().withDataTypes().execute();
1796                                 connectAndAssert(() -> matchesGetConfigWithAdditionalParameter("WithDataTypes"));
1797                                 replyWithConfigData("dataType.foo=number");
1798                                 assertThat(configData.get().getDataType("foo"), is("number"));
1799                         }
1800
1801                         private Matcher<List<String>> matchesGetConfigWithAdditionalParameter(String additionalParameter) {
1802                                 return matchesFcpMessage(
1803                                         "GetConfig",
1804                                         "Identifier=" + identifier,
1805                                         additionalParameter + "=true"
1806                                 );
1807                         }
1808
1809                 }
1810
1811                 public class ModifyConfig {
1812
1813                         @Test
1814                         public void defaultFcpClientCanModifyConfigData()
1815                         throws InterruptedException, ExecutionException, IOException {
1816                                 Future<ConfigData> newConfigData = fcpClient.modifyConfig().set("foo.bar").to("baz").execute();
1817                                 connectAndAssert(() -> matchesFcpMessage(
1818                                         "ModifyConfig",
1819                                         "Identifier=" + identifier,
1820                                         "foo.bar=baz"
1821                                 ));
1822                                 replyWithConfigData("current.foo.bar=baz");
1823                                 assertThat(newConfigData.get().getCurrent("foo.bar"), is("baz"));
1824                         }
1825
1826                 }
1827
1828                 private void replyWithConfigData(String... additionalLines) throws IOException {
1829                         fcpServer.writeLine("ConfigData", "Identifier=" + identifier);
1830                         fcpServer.writeLine(additionalLines);
1831                         fcpServer.writeLine("EndMessage");
1832                 }
1833
1834         }
1835
1836 }