Add test for CloseConnectionDuplicateClientName handling
[jFCPlib.git] / src / test / java / net / pterodactylus / fcp / quelaton / DefaultFcpClientTest.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import static org.hamcrest.MatcherAssert.assertThat;
4 import static org.hamcrest.Matchers.containsInAnyOrder;
5 import static org.hamcrest.Matchers.hasSize;
6 import static org.hamcrest.Matchers.is;
7
8 import java.io.ByteArrayInputStream;
9 import java.io.File;
10 import java.io.IOException;
11 import java.nio.charset.StandardCharsets;
12 import java.util.Collection;
13 import java.util.List;
14 import java.util.Optional;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.Future;
19 import java.util.logging.Level;
20 import java.util.logging.Logger;
21 import java.util.stream.Collectors;
22
23 import net.pterodactylus.fcp.FcpKeyPair;
24 import net.pterodactylus.fcp.Key;
25 import net.pterodactylus.fcp.Peer;
26 import net.pterodactylus.fcp.Priority;
27 import net.pterodactylus.fcp.fake.FakeTcpServer;
28 import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
29
30 import com.google.common.io.ByteStreams;
31 import com.google.common.io.Files;
32 import org.hamcrest.Description;
33 import org.hamcrest.Matcher;
34 import org.hamcrest.TypeSafeDiagnosingMatcher;
35 import org.junit.After;
36 import org.junit.Test;
37
38 /**
39  * Unit test for {@link DefaultFcpClient}.
40  *
41  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
42  */
43 public class DefaultFcpClientTest {
44
45         private static final String INSERT_URI =
46                 "SSK@RVCHbJdkkyTCeNN9AYukEg76eyqmiosSaNKgE3U9zUw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQECAAE/";
47         private static final String REQUEST_URI =
48                 "SSK@wtbgd2loNcJCXvtQVOftl2tuWBomDQHfqS6ytpPRhfw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQACAAE/";
49
50         private static int threadCounter = 0;
51         private final ExecutorService threadPool =
52                 Executors.newCachedThreadPool(r -> new Thread(r, "Test-Thread-" + threadCounter++));
53         private final FakeTcpServer fcpServer;
54         private final DefaultFcpClient fcpClient;
55
56         public DefaultFcpClientTest() throws IOException {
57                 fcpServer = new FakeTcpServer(threadPool);
58                 fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test");
59         }
60
61         @After
62         public void tearDown() throws IOException {
63                 fcpServer.close();
64         }
65
66         @Test(expected = ExecutionException.class)
67         public void defaultFcpClientThrowsExceptionIfItCanNotConnect()
68         throws IOException, ExecutionException, InterruptedException {
69                 Logger.getAnonymousLogger().getParent().setLevel(Level.FINEST);
70                 Logger.getAnonymousLogger().getParent().getHandlers()[0].setLevel(Level.FINEST);
71                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
72                 fcpServer.connect().get();
73                 fcpServer.collectUntil(is("EndMessage"));
74                 fcpServer.writeLine(
75                         "CloseConnectionDuplicateClientName",
76                         "EndMessage"
77                 );
78                 keyPairFuture.get();
79         }
80
81         @Test
82         public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException {
83                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
84                 connectNode();
85                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
86                 String identifier = extractIdentifier(lines);
87                 fcpServer.writeLine("SSKKeypair",
88                         "InsertURI=" + INSERT_URI + "",
89                         "RequestURI=" + REQUEST_URI + "",
90                         "Identifier=" + identifier,
91                         "EndMessage");
92                 FcpKeyPair keyPair = keyPairFuture.get();
93                 assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
94                 assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
95         }
96
97         private void connectNode() throws InterruptedException, ExecutionException, IOException {
98                 fcpServer.connect().get();
99                 fcpServer.collectUntil(is("EndMessage"));
100                 fcpServer.writeLine("NodeHello",
101                         "CompressionCodecs=4 - GZIP(0), BZIP2(1), LZMA(2), LZMA_NEW(3)",
102                         "Revision=build01466",
103                         "Testnet=false",
104                         "Version=Fred,0.7,1.0,1466",
105                         "Build=1466",
106                         "ConnectionIdentifier=14318898267048452a81b36e7f13a3f0",
107                         "Node=Fred",
108                         "ExtBuild=29",
109                         "FCPVersion=2.0",
110                         "NodeLanguage=ENGLISH",
111                         "ExtRevision=v29",
112                         "EndMessage"
113                 );
114         }
115
116         @Test
117         public void clientGetCanDownloadData() throws InterruptedException, ExecutionException, IOException {
118                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
119                 connectNode();
120                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
121                 assertThat(lines, matchesFcpMessage("ClientGet", "ReturnType=direct", "URI=KSK@foo.txt"));
122                 String identifier = extractIdentifier(lines);
123                 fcpServer.writeLine(
124                         "AllData",
125                         "Identifier=" + identifier,
126                         "DataLength=6",
127                         "StartupTime=1435610539000",
128                         "CompletionTime=1435610540000",
129                         "Metadata.ContentType=text/plain;charset=utf-8",
130                         "Data",
131                         "Hello"
132                 );
133                 Optional<Data> data = dataFuture.get();
134                 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
135                 assertThat(data.get().size(), is(6L));
136                 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
137                         is("Hello\n".getBytes(StandardCharsets.UTF_8)));
138         }
139
140         private String extractIdentifier(List<String> lines) {
141                 return lines.stream()
142                         .filter(s -> s.startsWith("Identifier="))
143                         .map(s -> s.substring(s.indexOf('=') + 1))
144                         .findFirst()
145                         .orElse("");
146         }
147
148         @Test
149         public void clientGetDownloadsDataForCorrectIdentifier()
150         throws InterruptedException, ExecutionException, IOException {
151                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
152                 connectNode();
153                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
154                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
155                 String identifier = extractIdentifier(lines);
156                 fcpServer.writeLine(
157                         "AllData",
158                         "Identifier=not-test",
159                         "DataLength=12",
160                         "StartupTime=1435610539000",
161                         "CompletionTime=1435610540000",
162                         "Metadata.ContentType=text/plain;charset=latin-9",
163                         "Data",
164                         "Hello World"
165                 );
166                 fcpServer.writeLine(
167                         "AllData",
168                         "Identifier=" + identifier,
169                         "DataLength=6",
170                         "StartupTime=1435610539000",
171                         "CompletionTime=1435610540000",
172                         "Metadata.ContentType=text/plain;charset=utf-8",
173                         "Data",
174                         "Hello"
175                 );
176                 Optional<Data> data = dataFuture.get();
177                 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
178                 assertThat(data.get().size(), is(6L));
179                 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
180                         is("Hello\n".getBytes(StandardCharsets.UTF_8)));
181         }
182
183         @Test
184         public void clientGetRecognizesGetFailed() throws InterruptedException, ExecutionException, IOException {
185                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
186                 connectNode();
187                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
188                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
189                 String identifier = extractIdentifier(lines);
190                 fcpServer.writeLine(
191                         "GetFailed",
192                         "Identifier=" + identifier,
193                         "Code=3",
194                         "EndMessage"
195                 );
196                 Optional<Data> data = dataFuture.get();
197                 assertThat(data.isPresent(), is(false));
198         }
199
200         @Test
201         public void clientGetRecognizesGetFailedForCorrectIdentifier()
202         throws InterruptedException, ExecutionException, IOException {
203                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
204                 connectNode();
205                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
206                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
207                 String identifier = extractIdentifier(lines);
208                 fcpServer.writeLine(
209                         "GetFailed",
210                         "Identifier=not-test",
211                         "Code=3",
212                         "EndMessage"
213                 );
214                 fcpServer.writeLine(
215                         "GetFailed",
216                         "Identifier=" + identifier,
217                         "Code=3",
218                         "EndMessage"
219                 );
220                 Optional<Data> data = dataFuture.get();
221                 assertThat(data.isPresent(), is(false));
222         }
223
224         @Test
225         public void clientGetRecognizesConnectionClosed() throws InterruptedException, ExecutionException, IOException {
226                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
227                 connectNode();
228                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
229                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
230                 fcpServer.close();
231                 Optional<Data> data = dataFuture.get();
232                 assertThat(data.isPresent(), is(false));
233         }
234
235         @Test
236         public void clientGetWithIgnoreDataStoreSettingSendsCorrectCommands()
237         throws InterruptedException, ExecutionException, IOException {
238                 fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt");
239                 connectNode();
240                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
241                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
242         }
243
244         @Test
245         public void clientGetWithDataStoreOnlySettingSendsCorrectCommands()
246         throws InterruptedException, ExecutionException, IOException {
247                 fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt");
248                 connectNode();
249                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
250                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
251         }
252
253         @Test
254         public void clientGetWithMaxSizeSettingSendsCorrectCommands()
255         throws InterruptedException, ExecutionException, IOException {
256                 fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt");
257                 connectNode();
258                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
259                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
260         }
261
262         @Test
263         public void clientGetWithPrioritySettingSendsCorrectCommands()
264         throws InterruptedException, ExecutionException, IOException {
265                 fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt");
266                 connectNode();
267                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
268                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
269         }
270
271         @Test
272         public void clientGetWithRealTimeSettingSendsCorrectCommands()
273         throws InterruptedException, ExecutionException, IOException {
274                 fcpClient.clientGet().realTime().uri("KSK@foo.txt");
275                 connectNode();
276                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
277                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
278         }
279
280         @Test
281         public void clientGetWithGlobalSettingSendsCorrectCommands()
282         throws InterruptedException, ExecutionException, IOException {
283                 fcpClient.clientGet().global().uri("KSK@foo.txt");
284                 connectNode();
285                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
286                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
287         }
288
289         private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
290                 return new TypeSafeDiagnosingMatcher<List<String>>() {
291                         @Override
292                         protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
293                                 if (!item.get(0).equals(name)) {
294                                         mismatchDescription.appendText("FCP message is named ").appendValue(item.get(0));
295                                         return false;
296                                 }
297                                 for (String requiredLine : requiredLines) {
298                                         if (item.indexOf(requiredLine) < 1) {
299                                                 mismatchDescription.appendText("FCP message does not contain ").appendValue(requiredLine);
300                                                 return false;
301                                         }
302                                 }
303                                 return true;
304                         }
305
306                         @Override
307                         public void describeTo(Description description) {
308                                 description.appendText("FCP message named ").appendValue(name);
309                                 description.appendValueList(", containing the lines ", ", ", "", requiredLines);
310                         }
311                 };
312         }
313
314         @Test
315         public void clientPutWithDirectDataSendsCorrectCommand()
316         throws IOException, ExecutionException, InterruptedException {
317                 fcpClient.clientPut()
318                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
319                         .length(6)
320                         .uri("KSK@foo.txt");
321                 connectNode();
322                 List<String> lines = fcpServer.collectUntil(is("Hello"));
323                 assertThat(lines, matchesFcpMessage("ClientPut", "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"));
324         }
325
326         @Test
327         public void clientPutWithDirectDataSucceedsOnCorrectIdentifier()
328         throws InterruptedException, ExecutionException, IOException {
329                 Future<Optional<Key>> key = fcpClient.clientPut()
330                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
331                         .length(6)
332                         .uri("KSK@foo.txt");
333                 connectNode();
334                 List<String> lines = fcpServer.collectUntil(is("Hello"));
335                 String identifier = extractIdentifier(lines);
336                 fcpServer.writeLine(
337                         "PutFailed",
338                         "Identifier=not-the-right-one",
339                         "EndMessage"
340                 );
341                 fcpServer.writeLine(
342                         "PutSuccessful",
343                         "URI=KSK@foo.txt",
344                         "Identifier=" + identifier,
345                         "EndMessage"
346                 );
347                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
348         }
349
350         @Test
351         public void clientPutWithDirectDataFailsOnCorrectIdentifier()
352         throws InterruptedException, ExecutionException, IOException {
353                 Future<Optional<Key>> key = fcpClient.clientPut()
354                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
355                         .length(6)
356                         .uri("KSK@foo.txt");
357                 connectNode();
358                 List<String> lines = fcpServer.collectUntil(is("Hello"));
359                 String identifier = extractIdentifier(lines);
360                 fcpServer.writeLine(
361                         "PutSuccessful",
362                         "Identifier=not-the-right-one",
363                         "URI=KSK@foo.txt",
364                         "EndMessage"
365                 );
366                 fcpServer.writeLine(
367                         "PutFailed",
368                         "Identifier=" + identifier,
369                         "EndMessage"
370                 );
371                 assertThat(key.get().isPresent(), is(false));
372         }
373
374         @Test
375         public void clientPutWithRenamedDirectDataSendsCorrectCommand()
376         throws InterruptedException, ExecutionException, IOException {
377                 fcpClient.clientPut()
378                         .named("otherName.txt")
379                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
380                         .length(6)
381                         .uri("KSK@foo.txt");
382                 connectNode();
383                 List<String> lines = fcpServer.collectUntil(is("Hello"));
384                 assertThat(lines, matchesFcpMessage("ClientPut", "TargetFilename=otherName.txt", "UploadFrom=direct",
385                         "DataLength=6", "URI=KSK@foo.txt"));
386         }
387
388         @Test
389         public void clientPutWithRedirectSendsCorrectCommand()
390         throws IOException, ExecutionException, InterruptedException {
391                 fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt");
392                 connectNode();
393                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
394                 assertThat(lines,
395                         matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
396         }
397
398         @Test
399         public void clientPutWithFileSendsCorrectCommand() throws InterruptedException, ExecutionException, IOException {
400                 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt");
401                 connectNode();
402                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
403                 assertThat(lines,
404                         matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
405         }
406
407         @Test
408         public void clientPutWithFileCanCompleteTestDdaSequence()
409         throws IOException, ExecutionException, InterruptedException {
410                 File tempFile = createTempFile();
411                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
412                 connectNode();
413                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
414                 String identifier = extractIdentifier(lines);
415                 fcpServer.writeLine(
416                         "ProtocolError",
417                         "Identifier=" + identifier,
418                         "Code=25",
419                         "EndMessage"
420                 );
421                 lines = fcpServer.collectUntil(is("EndMessage"));
422                 assertThat(lines, matchesFcpMessage(
423                         "TestDDARequest",
424                         "Directory=" + tempFile.getParent(),
425                         "WantReadDirectory=true",
426                         "WantWriteDirectory=false",
427                         "EndMessage"
428                 ));
429                 fcpServer.writeLine(
430                         "TestDDAReply",
431                         "Directory=" + tempFile.getParent(),
432                         "ReadFilename=" + tempFile,
433                         "EndMessage"
434                 );
435                 lines = fcpServer.collectUntil(is("EndMessage"));
436                 assertThat(lines, matchesFcpMessage(
437                         "TestDDAResponse",
438                         "Directory=" + tempFile.getParent(),
439                         "ReadContent=test-content",
440                         "EndMessage"
441                 ));
442                 fcpServer.writeLine(
443                         "TestDDAComplete",
444                         "Directory=" + tempFile.getParent(),
445                         "ReadDirectoryAllowed=true",
446                         "EndMessage"
447                 );
448                 lines = fcpServer.collectUntil(is("EndMessage"));
449                 assertThat(lines,
450                         matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt",
451                                 "Filename=" + new File(tempFile.getParent(), "test.dat")));
452         }
453
454         private File createTempFile() throws IOException {
455                 File tempFile = File.createTempFile("test-dda-", ".dat");
456                 tempFile.deleteOnExit();
457                 Files.write("test-content", tempFile, StandardCharsets.UTF_8);
458                 return tempFile;
459         }
460
461         @Test
462         public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
463         throws InterruptedException, ExecutionException, IOException {
464                 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt");
465                 connectNode();
466                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
467                 String identifier = extractIdentifier(lines);
468                 fcpServer.writeLine(
469                         "ProtocolError",
470                         "Identifier=not-the-right-one",
471                         "Code=25",
472                         "EndMessage"
473                 );
474                 fcpServer.writeLine(
475                         "PutSuccessful",
476                         "Identifier=" + identifier,
477                         "URI=KSK@foo.txt",
478                         "EndMessage"
479                 );
480                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
481         }
482
483         @Test
484         public void clientPutAbortsOnProtocolErrorOtherThan25()
485         throws InterruptedException, ExecutionException, IOException {
486                 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt");
487                 connectNode();
488                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
489                 String identifier = extractIdentifier(lines);
490                 fcpServer.writeLine(
491                         "ProtocolError",
492                         "Identifier=" + identifier,
493                         "Code=1",
494                         "EndMessage"
495                 );
496                 assertThat(key.get().isPresent(), is(false));
497         }
498
499         @Test
500         public void clientPutDoesNotReplyToWrongTestDdaReply() throws IOException, ExecutionException,
501         InterruptedException {
502                 File tempFile = createTempFile();
503                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
504                 connectNode();
505                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
506                 String identifier = extractIdentifier(lines);
507                 fcpServer.writeLine(
508                         "ProtocolError",
509                         "Identifier=" + identifier,
510                         "Code=25",
511                         "EndMessage"
512                 );
513                 lines = fcpServer.collectUntil(is("EndMessage"));
514                 assertThat(lines, matchesFcpMessage(
515                         "TestDDARequest",
516                         "Directory=" + tempFile.getParent(),
517                         "WantReadDirectory=true",
518                         "WantWriteDirectory=false",
519                         "EndMessage"
520                 ));
521                 fcpServer.writeLine(
522                         "TestDDAReply",
523                         "Directory=/some-other-directory",
524                         "ReadFilename=" + tempFile,
525                         "EndMessage"
526                 );
527                 fcpServer.writeLine(
528                         "TestDDAReply",
529                         "Directory=" + tempFile.getParent(),
530                         "ReadFilename=" + tempFile,
531                         "EndMessage"
532                 );
533                 lines = fcpServer.collectUntil(is("EndMessage"));
534                 assertThat(lines, matchesFcpMessage(
535                         "TestDDAResponse",
536                         "Directory=" + tempFile.getParent(),
537                         "ReadContent=test-content",
538                         "EndMessage"
539                 ));
540         }
541
542         @Test
543         public void clientPutSendsResponseEvenIfFileCanNotBeRead()
544         throws IOException, ExecutionException, InterruptedException {
545                 File tempFile = createTempFile();
546                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
547                 connectNode();
548                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
549                 String identifier = extractIdentifier(lines);
550                 fcpServer.writeLine(
551                         "ProtocolError",
552                         "Identifier=" + identifier,
553                         "Code=25",
554                         "EndMessage"
555                 );
556                 lines = fcpServer.collectUntil(is("EndMessage"));
557                 assertThat(lines, matchesFcpMessage(
558                         "TestDDARequest",
559                         "Directory=" + tempFile.getParent(),
560                         "WantReadDirectory=true",
561                         "WantWriteDirectory=false",
562                         "EndMessage"
563                 ));
564                 fcpServer.writeLine(
565                         "TestDDAReply",
566                         "Directory=" + tempFile.getParent(),
567                         "ReadFilename=" + tempFile + ".foo",
568                         "EndMessage"
569                 );
570                 lines = fcpServer.collectUntil(is("EndMessage"));
571                 assertThat(lines, matchesFcpMessage(
572                         "TestDDAResponse",
573                         "Directory=" + tempFile.getParent(),
574                         "ReadContent=failed-to-read",
575                         "EndMessage"
576                 ));
577         }
578
579         @Test
580         public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
581         throws IOException, ExecutionException, InterruptedException {
582                 File tempFile = createTempFile();
583                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
584                 connectNode();
585                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
586                 String identifier = extractIdentifier(lines);
587                 fcpServer.writeLine(
588                         "TestDDAComplete",
589                         "Directory=/some-other-directory",
590                         "EndMessage"
591                 );
592                 fcpServer.writeLine(
593                         "ProtocolError",
594                         "Identifier=" + identifier,
595                         "Code=25",
596                         "EndMessage"
597                 );
598                 lines = fcpServer.collectUntil(is("EndMessage"));
599                 assertThat(lines, matchesFcpMessage(
600                         "TestDDARequest",
601                         "Directory=" + tempFile.getParent(),
602                         "WantReadDirectory=true",
603                         "WantWriteDirectory=false",
604                         "EndMessage"
605                 ));
606         }
607
608         @Test
609         public void clientCanListPeers() throws IOException, ExecutionException, InterruptedException {
610                 Future<Collection<Peer>> peers = fcpClient.listPeers().execute();
611                 connectNode();
612                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
613                 assertThat(lines, matchesFcpMessage(
614                         "ListPeers",
615                         "WithVolatile=false",
616                         "WithMetadata=false",
617                         "EndMessage"
618                 ));
619                 String identifier = extractIdentifier(lines);
620                 fcpServer.writeLine(
621                         "Peer",
622                         "Identifier=" + identifier,
623                         "identity=id1",
624                         "EndMessage"
625                 );
626                 fcpServer.writeLine(
627                         "Peer",
628                         "Identifier=" + identifier,
629                         "identity=id2",
630                         "EndMessage"
631                 );
632                 fcpServer.writeLine(
633                         "EndListPeers",
634                         "Identifier=" + identifier,
635                         "EndMessage"
636                 );
637                 assertThat(peers.get(), hasSize(2));
638                 assertThat(peers.get().stream().map(Peer::getIdentity).collect(Collectors.toList()),
639                         containsInAnyOrder("id1", "id2"));
640         }
641
642         @Test
643         public void clientCanListPeersWithMetadata() throws IOException, ExecutionException, InterruptedException {
644                 Future<Collection<Peer>> peers = fcpClient.listPeers().includeMetadata().execute();
645                 connectNode();
646                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
647                 assertThat(lines, matchesFcpMessage(
648                         "ListPeers",
649                         "WithVolatile=false",
650                         "WithMetadata=true",
651                         "EndMessage"
652                 ));
653                 String identifier = extractIdentifier(lines);
654                 fcpServer.writeLine(
655                         "Peer",
656                         "Identifier=" + identifier,
657                         "identity=id1",
658                         "metadata.foo=bar1",
659                         "EndMessage"
660                 );
661                 fcpServer.writeLine(
662                         "Peer",
663                         "Identifier=" + identifier,
664                         "identity=id2",
665                         "metadata.foo=bar2",
666                         "EndMessage"
667                 );
668                 fcpServer.writeLine(
669                         "EndListPeers",
670                         "Identifier=" + identifier,
671                         "EndMessage"
672                 );
673                 assertThat(peers.get(), hasSize(2));
674                 assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()),
675                         containsInAnyOrder("bar1", "bar2"));
676         }
677
678         @Test
679         public void clientCanListPeersWithVolatiles() throws IOException, ExecutionException, InterruptedException {
680                 Future<Collection<Peer>> peers = fcpClient.listPeers().includeVolatile().execute();
681                 connectNode();
682                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
683                 assertThat(lines, matchesFcpMessage(
684                         "ListPeers",
685                         "WithVolatile=true",
686                         "WithMetadata=false",
687                         "EndMessage"
688                 ));
689                 String identifier = extractIdentifier(lines);
690                 fcpServer.writeLine(
691                         "Peer",
692                         "Identifier=" + identifier,
693                         "identity=id1",
694                         "volatile.foo=bar1",
695                         "EndMessage"
696                 );
697                 fcpServer.writeLine(
698                         "Peer",
699                         "Identifier=" + identifier,
700                         "identity=id2",
701                         "volatile.foo=bar2",
702                         "EndMessage"
703                 );
704                 fcpServer.writeLine(
705                         "EndListPeers",
706                         "Identifier=" + identifier,
707                         "EndMessage"
708                 );
709                 assertThat(peers.get(), hasSize(2));
710                 assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()),
711                         containsInAnyOrder("bar1", "bar2"));
712         }
713
714 }