Add “with metadata” and “with volatile” flags to ListPeer command
[jFCPlib.git] / src / test / java / net / pterodactylus / fcp / quelaton / DefaultFcpClientTest.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import static org.hamcrest.MatcherAssert.assertThat;
4 import static org.hamcrest.Matchers.containsInAnyOrder;
5 import static org.hamcrest.Matchers.hasSize;
6 import static org.hamcrest.Matchers.is;
7
8 import java.io.ByteArrayInputStream;
9 import java.io.File;
10 import java.io.IOException;
11 import java.nio.charset.StandardCharsets;
12 import java.util.Collection;
13 import java.util.List;
14 import java.util.Optional;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.Future;
19 import java.util.stream.Collectors;
20
21 import net.pterodactylus.fcp.FcpKeyPair;
22 import net.pterodactylus.fcp.Key;
23 import net.pterodactylus.fcp.Peer;
24 import net.pterodactylus.fcp.Priority;
25 import net.pterodactylus.fcp.fake.FakeTcpServer;
26 import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
27
28 import com.google.common.io.ByteStreams;
29 import com.google.common.io.Files;
30 import org.hamcrest.Description;
31 import org.hamcrest.Matcher;
32 import org.hamcrest.TypeSafeDiagnosingMatcher;
33 import org.junit.After;
34 import org.junit.Test;
35
36 /**
37  * Unit test for {@link DefaultFcpClient}.
38  *
39  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
40  */
41 public class DefaultFcpClientTest {
42
43         private static final String INSERT_URI =
44                 "SSK@RVCHbJdkkyTCeNN9AYukEg76eyqmiosSaNKgE3U9zUw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQECAAE/";
45         private static final String REQUEST_URI =
46                 "SSK@wtbgd2loNcJCXvtQVOftl2tuWBomDQHfqS6ytpPRhfw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQACAAE/";
47
48         private static int threadCounter = 0;
49         private final ExecutorService threadPool =
50                 Executors.newCachedThreadPool(r -> new Thread(r, "Test-Thread-" + threadCounter++));
51         private final FakeTcpServer fcpServer;
52         private final DefaultFcpClient fcpClient;
53
54         public DefaultFcpClientTest() throws IOException {
55                 fcpServer = new FakeTcpServer(threadPool);
56                 fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test");
57         }
58
59         @After
60         public void tearDown() throws IOException {
61                 fcpServer.close();
62         }
63
64         @Test
65         public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException {
66                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
67                 connectNode();
68                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
69                 String identifier = extractIdentifier(lines);
70                 fcpServer.writeLine("SSKKeypair",
71                         "InsertURI=" + INSERT_URI + "",
72                         "RequestURI=" + REQUEST_URI + "",
73                         "Identifier=" + identifier,
74                         "EndMessage");
75                 FcpKeyPair keyPair = keyPairFuture.get();
76                 assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
77                 assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
78         }
79
80         private void connectNode() throws InterruptedException, ExecutionException, IOException {
81                 fcpServer.connect().get();
82                 fcpServer.collectUntil(is("EndMessage"));
83                 fcpServer.writeLine("NodeHello",
84                         "CompressionCodecs=4 - GZIP(0), BZIP2(1), LZMA(2), LZMA_NEW(3)",
85                         "Revision=build01466",
86                         "Testnet=false",
87                         "Version=Fred,0.7,1.0,1466",
88                         "Build=1466",
89                         "ConnectionIdentifier=14318898267048452a81b36e7f13a3f0",
90                         "Node=Fred",
91                         "ExtBuild=29",
92                         "FCPVersion=2.0",
93                         "NodeLanguage=ENGLISH",
94                         "ExtRevision=v29",
95                         "EndMessage"
96                 );
97         }
98
99         @Test
100         public void clientGetCanDownloadData() throws InterruptedException, ExecutionException, IOException {
101                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
102                 connectNode();
103                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
104                 assertThat(lines, matchesFcpMessage("ClientGet", "ReturnType=direct", "URI=KSK@foo.txt"));
105                 String identifier = extractIdentifier(lines);
106                 fcpServer.writeLine(
107                         "AllData",
108                         "Identifier=" + identifier,
109                         "DataLength=6",
110                         "StartupTime=1435610539000",
111                         "CompletionTime=1435610540000",
112                         "Metadata.ContentType=text/plain;charset=utf-8",
113                         "Data",
114                         "Hello"
115                 );
116                 Optional<Data> data = dataFuture.get();
117                 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
118                 assertThat(data.get().size(), is(6L));
119                 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
120                         is("Hello\n".getBytes(StandardCharsets.UTF_8)));
121         }
122
123         private String extractIdentifier(List<String> lines) {
124                 return lines.stream()
125                         .filter(s -> s.startsWith("Identifier="))
126                         .map(s -> s.substring(s.indexOf('=') + 1))
127                         .findFirst()
128                         .orElse("");
129         }
130
131         @Test
132         public void clientGetDownloadsDataForCorrectIdentifier()
133         throws InterruptedException, ExecutionException, IOException {
134                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
135                 connectNode();
136                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
137                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
138                 String identifier = extractIdentifier(lines);
139                 fcpServer.writeLine(
140                         "AllData",
141                         "Identifier=not-test",
142                         "DataLength=12",
143                         "StartupTime=1435610539000",
144                         "CompletionTime=1435610540000",
145                         "Metadata.ContentType=text/plain;charset=latin-9",
146                         "Data",
147                         "Hello World"
148                 );
149                 fcpServer.writeLine(
150                         "AllData",
151                         "Identifier=" + identifier,
152                         "DataLength=6",
153                         "StartupTime=1435610539000",
154                         "CompletionTime=1435610540000",
155                         "Metadata.ContentType=text/plain;charset=utf-8",
156                         "Data",
157                         "Hello"
158                 );
159                 Optional<Data> data = dataFuture.get();
160                 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
161                 assertThat(data.get().size(), is(6L));
162                 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
163                         is("Hello\n".getBytes(StandardCharsets.UTF_8)));
164         }
165
166         @Test
167         public void clientGetRecognizesGetFailed() throws InterruptedException, ExecutionException, IOException {
168                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
169                 connectNode();
170                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
171                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
172                 String identifier = extractIdentifier(lines);
173                 fcpServer.writeLine(
174                         "GetFailed",
175                         "Identifier=" + identifier,
176                         "Code=3",
177                         "EndMessage"
178                 );
179                 Optional<Data> data = dataFuture.get();
180                 assertThat(data.isPresent(), is(false));
181         }
182
183         @Test
184         public void clientGetRecognizesGetFailedForCorrectIdentifier()
185         throws InterruptedException, ExecutionException, IOException {
186                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
187                 connectNode();
188                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
189                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
190                 String identifier = extractIdentifier(lines);
191                 fcpServer.writeLine(
192                         "GetFailed",
193                         "Identifier=not-test",
194                         "Code=3",
195                         "EndMessage"
196                 );
197                 fcpServer.writeLine(
198                         "GetFailed",
199                         "Identifier=" + identifier,
200                         "Code=3",
201                         "EndMessage"
202                 );
203                 Optional<Data> data = dataFuture.get();
204                 assertThat(data.isPresent(), is(false));
205         }
206
207         @Test
208         public void clientGetRecognizesConnectionClosed() throws InterruptedException, ExecutionException, IOException {
209                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
210                 connectNode();
211                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
212                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
213                 fcpServer.close();
214                 Optional<Data> data = dataFuture.get();
215                 assertThat(data.isPresent(), is(false));
216         }
217
218         @Test
219         public void clientGetWithIgnoreDataStoreSettingSendsCorrectCommands()
220         throws InterruptedException, ExecutionException, IOException {
221                 fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt");
222                 connectNode();
223                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
224                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
225         }
226
227         @Test
228         public void clientGetWithDataStoreOnlySettingSendsCorrectCommands()
229         throws InterruptedException, ExecutionException, IOException {
230                 fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt");
231                 connectNode();
232                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
233                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
234         }
235
236         @Test
237         public void clientGetWithMaxSizeSettingSendsCorrectCommands()
238         throws InterruptedException, ExecutionException, IOException {
239                 fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt");
240                 connectNode();
241                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
242                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
243         }
244
245         @Test
246         public void clientGetWithPrioritySettingSendsCorrectCommands()
247         throws InterruptedException, ExecutionException, IOException {
248                 fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt");
249                 connectNode();
250                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
251                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
252         }
253
254         @Test
255         public void clientGetWithRealTimeSettingSendsCorrectCommands()
256         throws InterruptedException, ExecutionException, IOException {
257                 fcpClient.clientGet().realTime().uri("KSK@foo.txt");
258                 connectNode();
259                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
260                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
261         }
262
263         @Test
264         public void clientGetWithGlobalSettingSendsCorrectCommands()
265         throws InterruptedException, ExecutionException, IOException {
266                 fcpClient.clientGet().global().uri("KSK@foo.txt");
267                 connectNode();
268                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
269                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
270         }
271
272         private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
273                 return new TypeSafeDiagnosingMatcher<List<String>>() {
274                         @Override
275                         protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
276                                 if (!item.get(0).equals(name)) {
277                                         mismatchDescription.appendText("FCP message is named ").appendValue(item.get(0));
278                                         return false;
279                                 }
280                                 for (String requiredLine : requiredLines) {
281                                         if (item.indexOf(requiredLine) < 1) {
282                                                 mismatchDescription.appendText("FCP message does not contain ").appendValue(requiredLine);
283                                                 return false;
284                                         }
285                                 }
286                                 return true;
287                         }
288
289                         @Override
290                         public void describeTo(Description description) {
291                                 description.appendText("FCP message named ").appendValue(name);
292                                 description.appendValueList(", containing the lines ", ", ", "", requiredLines);
293                         }
294                 };
295         }
296
297         @Test
298         public void clientPutWithDirectDataSendsCorrectCommand()
299         throws IOException, ExecutionException, InterruptedException {
300                 fcpClient.clientPut()
301                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
302                         .length(6)
303                         .uri("KSK@foo.txt");
304                 connectNode();
305                 List<String> lines = fcpServer.collectUntil(is("Hello"));
306                 assertThat(lines, matchesFcpMessage("ClientPut", "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"));
307         }
308
309         @Test
310         public void clientPutWithDirectDataSucceedsOnCorrectIdentifier()
311         throws InterruptedException, ExecutionException, IOException {
312                 Future<Optional<Key>> key = fcpClient.clientPut()
313                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
314                         .length(6)
315                         .uri("KSK@foo.txt");
316                 connectNode();
317                 List<String> lines = fcpServer.collectUntil(is("Hello"));
318                 String identifier = extractIdentifier(lines);
319                 fcpServer.writeLine(
320                         "PutFailed",
321                         "Identifier=not-the-right-one",
322                         "EndMessage"
323                 );
324                 fcpServer.writeLine(
325                         "PutSuccessful",
326                         "URI=KSK@foo.txt",
327                         "Identifier=" + identifier,
328                         "EndMessage"
329                 );
330                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
331         }
332
333         @Test
334         public void clientPutWithDirectDataFailsOnCorrectIdentifier()
335         throws InterruptedException, ExecutionException, IOException {
336                 Future<Optional<Key>> key = fcpClient.clientPut()
337                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
338                         .length(6)
339                         .uri("KSK@foo.txt");
340                 connectNode();
341                 List<String> lines = fcpServer.collectUntil(is("Hello"));
342                 String identifier = extractIdentifier(lines);
343                 fcpServer.writeLine(
344                         "PutSuccessful",
345                         "Identifier=not-the-right-one",
346                         "URI=KSK@foo.txt",
347                         "EndMessage"
348                 );
349                 fcpServer.writeLine(
350                         "PutFailed",
351                         "Identifier=" + identifier,
352                         "EndMessage"
353                 );
354                 assertThat(key.get().isPresent(), is(false));
355         }
356
357         @Test
358         public void clientPutWithRenamedDirectDataSendsCorrectCommand()
359         throws InterruptedException, ExecutionException, IOException {
360                 fcpClient.clientPut()
361                         .named("otherName.txt")
362                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
363                         .length(6)
364                         .uri("KSK@foo.txt");
365                 connectNode();
366                 List<String> lines = fcpServer.collectUntil(is("Hello"));
367                 assertThat(lines, matchesFcpMessage("ClientPut", "TargetFilename=otherName.txt", "UploadFrom=direct",
368                         "DataLength=6", "URI=KSK@foo.txt"));
369         }
370
371         @Test
372         public void clientPutWithRedirectSendsCorrectCommand()
373         throws IOException, ExecutionException, InterruptedException {
374                 fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt");
375                 connectNode();
376                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
377                 assertThat(lines,
378                         matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
379         }
380
381         @Test
382         public void clientPutWithFileSendsCorrectCommand() throws InterruptedException, ExecutionException, IOException {
383                 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt");
384                 connectNode();
385                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
386                 assertThat(lines,
387                         matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
388         }
389
390         @Test
391         public void clientPutWithFileCanCompleteTestDdaSequence()
392         throws IOException, ExecutionException, InterruptedException {
393                 File tempFile = createTempFile();
394                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
395                 connectNode();
396                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
397                 String identifier = extractIdentifier(lines);
398                 fcpServer.writeLine(
399                         "ProtocolError",
400                         "Identifier=" + identifier,
401                         "Code=25",
402                         "EndMessage"
403                 );
404                 lines = fcpServer.collectUntil(is("EndMessage"));
405                 assertThat(lines, matchesFcpMessage(
406                         "TestDDARequest",
407                         "Directory=" + tempFile.getParent(),
408                         "WantReadDirectory=true",
409                         "WantWriteDirectory=false",
410                         "EndMessage"
411                 ));
412                 fcpServer.writeLine(
413                         "TestDDAReply",
414                         "Directory=" + tempFile.getParent(),
415                         "ReadFilename=" + tempFile,
416                         "EndMessage"
417                 );
418                 lines = fcpServer.collectUntil(is("EndMessage"));
419                 assertThat(lines, matchesFcpMessage(
420                         "TestDDAResponse",
421                         "Directory=" + tempFile.getParent(),
422                         "ReadContent=test-content",
423                         "EndMessage"
424                 ));
425                 fcpServer.writeLine(
426                         "TestDDAComplete",
427                         "Directory=" + tempFile.getParent(),
428                         "ReadDirectoryAllowed=true",
429                         "EndMessage"
430                 );
431                 lines = fcpServer.collectUntil(is("EndMessage"));
432                 assertThat(lines,
433                         matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt",
434                                 "Filename=" + new File(tempFile.getParent(), "test.dat")));
435         }
436
437         private File createTempFile() throws IOException {
438                 File tempFile = File.createTempFile("test-dda-", ".dat");
439                 tempFile.deleteOnExit();
440                 Files.write("test-content", tempFile, StandardCharsets.UTF_8);
441                 return tempFile;
442         }
443
444         @Test
445         public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
446         throws InterruptedException, ExecutionException, IOException {
447                 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt");
448                 connectNode();
449                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
450                 String identifier = extractIdentifier(lines);
451                 fcpServer.writeLine(
452                         "ProtocolError",
453                         "Identifier=not-the-right-one",
454                         "Code=25",
455                         "EndMessage"
456                 );
457                 fcpServer.writeLine(
458                         "PutSuccessful",
459                         "Identifier=" + identifier,
460                         "URI=KSK@foo.txt",
461                         "EndMessage"
462                 );
463                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
464         }
465
466         @Test
467         public void clientPutAbortsOnProtocolErrorOtherThan25()
468         throws InterruptedException, ExecutionException, IOException {
469                 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt");
470                 connectNode();
471                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
472                 String identifier = extractIdentifier(lines);
473                 fcpServer.writeLine(
474                         "ProtocolError",
475                         "Identifier=" + identifier,
476                         "Code=1",
477                         "EndMessage"
478                 );
479                 assertThat(key.get().isPresent(), is(false));
480         }
481
482         @Test
483         public void clientPutDoesNotReplyToWrongTestDdaReply() throws IOException, ExecutionException,
484         InterruptedException {
485                 File tempFile = createTempFile();
486                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
487                 connectNode();
488                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
489                 String identifier = extractIdentifier(lines);
490                 fcpServer.writeLine(
491                         "ProtocolError",
492                         "Identifier=" + identifier,
493                         "Code=25",
494                         "EndMessage"
495                 );
496                 lines = fcpServer.collectUntil(is("EndMessage"));
497                 assertThat(lines, matchesFcpMessage(
498                         "TestDDARequest",
499                         "Directory=" + tempFile.getParent(),
500                         "WantReadDirectory=true",
501                         "WantWriteDirectory=false",
502                         "EndMessage"
503                 ));
504                 fcpServer.writeLine(
505                         "TestDDAReply",
506                         "Directory=/some-other-directory",
507                         "ReadFilename=" + tempFile,
508                         "EndMessage"
509                 );
510                 fcpServer.writeLine(
511                         "TestDDAReply",
512                         "Directory=" + tempFile.getParent(),
513                         "ReadFilename=" + tempFile,
514                         "EndMessage"
515                 );
516                 lines = fcpServer.collectUntil(is("EndMessage"));
517                 assertThat(lines, matchesFcpMessage(
518                         "TestDDAResponse",
519                         "Directory=" + tempFile.getParent(),
520                         "ReadContent=test-content",
521                         "EndMessage"
522                 ));
523         }
524
525         @Test
526         public void clientPutSendsResponseEvenIfFileCanNotBeRead()
527         throws IOException, ExecutionException, InterruptedException {
528                 File tempFile = createTempFile();
529                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
530                 connectNode();
531                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
532                 String identifier = extractIdentifier(lines);
533                 fcpServer.writeLine(
534                         "ProtocolError",
535                         "Identifier=" + identifier,
536                         "Code=25",
537                         "EndMessage"
538                 );
539                 lines = fcpServer.collectUntil(is("EndMessage"));
540                 assertThat(lines, matchesFcpMessage(
541                         "TestDDARequest",
542                         "Directory=" + tempFile.getParent(),
543                         "WantReadDirectory=true",
544                         "WantWriteDirectory=false",
545                         "EndMessage"
546                 ));
547                 fcpServer.writeLine(
548                         "TestDDAReply",
549                         "Directory=" + tempFile.getParent(),
550                         "ReadFilename=" + tempFile + ".foo",
551                         "EndMessage"
552                 );
553                 lines = fcpServer.collectUntil(is("EndMessage"));
554                 assertThat(lines, matchesFcpMessage(
555                         "TestDDAResponse",
556                         "Directory=" + tempFile.getParent(),
557                         "ReadContent=failed-to-read",
558                         "EndMessage"
559                 ));
560         }
561
562         @Test
563         public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
564         throws IOException, ExecutionException, InterruptedException {
565                 File tempFile = createTempFile();
566                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
567                 connectNode();
568                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
569                 String identifier = extractIdentifier(lines);
570                 fcpServer.writeLine(
571                         "TestDDAComplete",
572                         "Directory=/some-other-directory",
573                         "EndMessage"
574                 );
575                 fcpServer.writeLine(
576                         "ProtocolError",
577                         "Identifier=" + identifier,
578                         "Code=25",
579                         "EndMessage"
580                 );
581                 lines = fcpServer.collectUntil(is("EndMessage"));
582                 assertThat(lines, matchesFcpMessage(
583                         "TestDDARequest",
584                         "Directory=" + tempFile.getParent(),
585                         "WantReadDirectory=true",
586                         "WantWriteDirectory=false",
587                         "EndMessage"
588                 ));
589         }
590
591         @Test
592         public void clientCanListPeers() throws IOException, ExecutionException, InterruptedException {
593                 Future<Collection<Peer>> peers = fcpClient.listPeers().execute();
594                 connectNode();
595                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
596                 assertThat(lines, matchesFcpMessage(
597                         "ListPeers",
598                         "WithVolatile=false",
599                         "WithMetadata=false",
600                         "EndMessage"
601                 ));
602                 String identifier = extractIdentifier(lines);
603                 fcpServer.writeLine(
604                         "Peer",
605                         "Identifier=" + identifier,
606                         "identity=id1",
607                         "EndMessage"
608                 );
609                 fcpServer.writeLine(
610                         "Peer",
611                         "Identifier=" + identifier,
612                         "identity=id2",
613                         "EndMessage"
614                 );
615                 fcpServer.writeLine(
616                         "EndListPeers",
617                         "Identifier=" + identifier,
618                         "EndMessage"
619                 );
620                 assertThat(peers.get(), hasSize(2));
621                 assertThat(peers.get().stream().map(Peer::getIdentity).collect(Collectors.toList()),
622                         containsInAnyOrder("id1", "id2"));
623         }
624
625         @Test
626         public void clientCanListPeersWithMetadata() throws IOException, ExecutionException, InterruptedException {
627                 Future<Collection<Peer>> peers = fcpClient.listPeers().includeMetadata().execute();
628                 connectNode();
629                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
630                 assertThat(lines, matchesFcpMessage(
631                         "ListPeers",
632                         "WithVolatile=false",
633                         "WithMetadata=true",
634                         "EndMessage"
635                 ));
636                 String identifier = extractIdentifier(lines);
637                 fcpServer.writeLine(
638                         "Peer",
639                         "Identifier=" + identifier,
640                         "identity=id1",
641                         "metadata.foo=bar1",
642                         "EndMessage"
643                 );
644                 fcpServer.writeLine(
645                         "Peer",
646                         "Identifier=" + identifier,
647                         "identity=id2",
648                         "metadata.foo=bar2",
649                         "EndMessage"
650                 );
651                 fcpServer.writeLine(
652                         "EndListPeers",
653                         "Identifier=" + identifier,
654                         "EndMessage"
655                 );
656                 assertThat(peers.get(), hasSize(2));
657                 assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()),
658                         containsInAnyOrder("bar1", "bar2"));
659         }
660
661         @Test
662         public void clientCanListPeersWithVolatiles() throws IOException, ExecutionException, InterruptedException {
663                 Future<Collection<Peer>> peers = fcpClient.listPeers().includeVolatile().execute();
664                 connectNode();
665                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
666                 assertThat(lines, matchesFcpMessage(
667                         "ListPeers",
668                         "WithVolatile=true",
669                         "WithMetadata=false",
670                         "EndMessage"
671                 ));
672                 String identifier = extractIdentifier(lines);
673                 fcpServer.writeLine(
674                         "Peer",
675                         "Identifier=" + identifier,
676                         "identity=id1",
677                         "volatile.foo=bar1",
678                         "EndMessage"
679                 );
680                 fcpServer.writeLine(
681                         "Peer",
682                         "Identifier=" + identifier,
683                         "identity=id2",
684                         "volatile.foo=bar2",
685                         "EndMessage"
686                 );
687                 fcpServer.writeLine(
688                         "EndListPeers",
689                         "Identifier=" + identifier,
690                         "EndMessage"
691                 );
692                 assertThat(peers.get(), hasSize(2));
693                 assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()),
694                         containsInAnyOrder("bar1", "bar2"));
695         }
696
697 }