Don’t let clients handle connection failures
[jFCPlib.git] / src / test / java / net / pterodactylus / fcp / quelaton / DefaultFcpClientTest.java
1 package net.pterodactylus.fcp.quelaton;
2
3 import static org.hamcrest.MatcherAssert.assertThat;
4 import static org.hamcrest.Matchers.containsInAnyOrder;
5 import static org.hamcrest.Matchers.hasSize;
6 import static org.hamcrest.Matchers.is;
7
8 import java.io.ByteArrayInputStream;
9 import java.io.File;
10 import java.io.IOException;
11 import java.nio.charset.StandardCharsets;
12 import java.util.Collection;
13 import java.util.List;
14 import java.util.Optional;
15 import java.util.concurrent.ExecutionException;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.Future;
19 import java.util.logging.Level;
20 import java.util.logging.Logger;
21 import java.util.stream.Collectors;
22
23 import net.pterodactylus.fcp.FcpKeyPair;
24 import net.pterodactylus.fcp.Key;
25 import net.pterodactylus.fcp.Peer;
26 import net.pterodactylus.fcp.Priority;
27 import net.pterodactylus.fcp.fake.FakeTcpServer;
28 import net.pterodactylus.fcp.quelaton.ClientGetCommand.Data;
29
30 import com.google.common.io.ByteStreams;
31 import com.google.common.io.Files;
32 import org.hamcrest.Description;
33 import org.hamcrest.Matcher;
34 import org.hamcrest.TypeSafeDiagnosingMatcher;
35 import org.junit.After;
36 import org.junit.Test;
37
38 /**
39  * Unit test for {@link DefaultFcpClient}.
40  *
41  * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
42  */
43 public class DefaultFcpClientTest {
44
45         private static final String INSERT_URI =
46                 "SSK@RVCHbJdkkyTCeNN9AYukEg76eyqmiosSaNKgE3U9zUw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQECAAE/";
47         private static final String REQUEST_URI =
48                 "SSK@wtbgd2loNcJCXvtQVOftl2tuWBomDQHfqS6ytpPRhfw,7SHH53gletBVb9JD7nBsyClbLQsBubDPEIcwg908r7Y,AQACAAE/";
49
50         private static int threadCounter = 0;
51         private final ExecutorService threadPool =
52                 Executors.newCachedThreadPool(r -> new Thread(r, "Test-Thread-" + threadCounter++));
53         private final FakeTcpServer fcpServer;
54         private final DefaultFcpClient fcpClient;
55
56         public DefaultFcpClientTest() throws IOException {
57                 fcpServer = new FakeTcpServer(threadPool);
58                 fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test");
59         }
60
61         @After
62         public void tearDown() throws IOException {
63                 fcpServer.close();
64         }
65
66         @Test(expected = ExecutionException.class)
67         public void defaultFcpClientThrowsExceptionIfItCanNotConnect()
68         throws IOException, ExecutionException, InterruptedException {
69                 Logger.getAnonymousLogger().getParent().setLevel(Level.FINEST);
70                 Logger.getAnonymousLogger().getParent().getHandlers()[0].setLevel(Level.FINEST);
71                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
72                 fcpServer.connect().get();
73                 fcpServer.collectUntil(is("EndMessage"));
74                 fcpServer.writeLine(
75                         "CloseConnectionDuplicateClientName",
76                         "EndMessage"
77                 );
78                 keyPairFuture.get();
79         }
80
81         @Test(expected = ExecutionException.class)
82         public void defaultFcpClientThrowsExceptionIfConnectionIsClosed()
83         throws IOException, ExecutionException, InterruptedException {
84                 Logger.getAnonymousLogger().getParent().setLevel(Level.FINEST);
85                 Logger.getAnonymousLogger().getParent().getHandlers()[0].setLevel(Level.FINEST);
86                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
87                 fcpServer.connect().get();
88                 fcpServer.collectUntil(is("EndMessage"));
89                 fcpServer.close();
90                 keyPairFuture.get();
91         }
92
93         @Test
94         public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException {
95                 Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
96                 connectNode();
97                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
98                 String identifier = extractIdentifier(lines);
99                 fcpServer.writeLine("SSKKeypair",
100                         "InsertURI=" + INSERT_URI + "",
101                         "RequestURI=" + REQUEST_URI + "",
102                         "Identifier=" + identifier,
103                         "EndMessage");
104                 FcpKeyPair keyPair = keyPairFuture.get();
105                 assertThat(keyPair.getPublicKey(), is(REQUEST_URI));
106                 assertThat(keyPair.getPrivateKey(), is(INSERT_URI));
107         }
108
109         private void connectNode() throws InterruptedException, ExecutionException, IOException {
110                 fcpServer.connect().get();
111                 fcpServer.collectUntil(is("EndMessage"));
112                 fcpServer.writeLine("NodeHello",
113                         "CompressionCodecs=4 - GZIP(0), BZIP2(1), LZMA(2), LZMA_NEW(3)",
114                         "Revision=build01466",
115                         "Testnet=false",
116                         "Version=Fred,0.7,1.0,1466",
117                         "Build=1466",
118                         "ConnectionIdentifier=14318898267048452a81b36e7f13a3f0",
119                         "Node=Fred",
120                         "ExtBuild=29",
121                         "FCPVersion=2.0",
122                         "NodeLanguage=ENGLISH",
123                         "ExtRevision=v29",
124                         "EndMessage"
125                 );
126         }
127
128         @Test
129         public void clientGetCanDownloadData() throws InterruptedException, ExecutionException, IOException {
130                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
131                 connectNode();
132                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
133                 assertThat(lines, matchesFcpMessage("ClientGet", "ReturnType=direct", "URI=KSK@foo.txt"));
134                 String identifier = extractIdentifier(lines);
135                 fcpServer.writeLine(
136                         "AllData",
137                         "Identifier=" + identifier,
138                         "DataLength=6",
139                         "StartupTime=1435610539000",
140                         "CompletionTime=1435610540000",
141                         "Metadata.ContentType=text/plain;charset=utf-8",
142                         "Data",
143                         "Hello"
144                 );
145                 Optional<Data> data = dataFuture.get();
146                 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
147                 assertThat(data.get().size(), is(6L));
148                 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
149                         is("Hello\n".getBytes(StandardCharsets.UTF_8)));
150         }
151
152         private String extractIdentifier(List<String> lines) {
153                 return lines.stream()
154                         .filter(s -> s.startsWith("Identifier="))
155                         .map(s -> s.substring(s.indexOf('=') + 1))
156                         .findFirst()
157                         .orElse("");
158         }
159
160         @Test
161         public void clientGetDownloadsDataForCorrectIdentifier()
162         throws InterruptedException, ExecutionException, IOException {
163                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
164                 connectNode();
165                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
166                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
167                 String identifier = extractIdentifier(lines);
168                 fcpServer.writeLine(
169                         "AllData",
170                         "Identifier=not-test",
171                         "DataLength=12",
172                         "StartupTime=1435610539000",
173                         "CompletionTime=1435610540000",
174                         "Metadata.ContentType=text/plain;charset=latin-9",
175                         "Data",
176                         "Hello World"
177                 );
178                 fcpServer.writeLine(
179                         "AllData",
180                         "Identifier=" + identifier,
181                         "DataLength=6",
182                         "StartupTime=1435610539000",
183                         "CompletionTime=1435610540000",
184                         "Metadata.ContentType=text/plain;charset=utf-8",
185                         "Data",
186                         "Hello"
187                 );
188                 Optional<Data> data = dataFuture.get();
189                 assertThat(data.get().getMimeType(), is("text/plain;charset=utf-8"));
190                 assertThat(data.get().size(), is(6L));
191                 assertThat(ByteStreams.toByteArray(data.get().getInputStream()),
192                         is("Hello\n".getBytes(StandardCharsets.UTF_8)));
193         }
194
195         @Test
196         public void clientGetRecognizesGetFailed() throws InterruptedException, ExecutionException, IOException {
197                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
198                 connectNode();
199                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
200                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
201                 String identifier = extractIdentifier(lines);
202                 fcpServer.writeLine(
203                         "GetFailed",
204                         "Identifier=" + identifier,
205                         "Code=3",
206                         "EndMessage"
207                 );
208                 Optional<Data> data = dataFuture.get();
209                 assertThat(data.isPresent(), is(false));
210         }
211
212         @Test
213         public void clientGetRecognizesGetFailedForCorrectIdentifier()
214         throws InterruptedException, ExecutionException, IOException {
215                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
216                 connectNode();
217                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
218                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
219                 String identifier = extractIdentifier(lines);
220                 fcpServer.writeLine(
221                         "GetFailed",
222                         "Identifier=not-test",
223                         "Code=3",
224                         "EndMessage"
225                 );
226                 fcpServer.writeLine(
227                         "GetFailed",
228                         "Identifier=" + identifier,
229                         "Code=3",
230                         "EndMessage"
231                 );
232                 Optional<Data> data = dataFuture.get();
233                 assertThat(data.isPresent(), is(false));
234         }
235
236         @Test(expected = ExecutionException.class)
237         public void clientGetRecognizesConnectionClosed() throws InterruptedException, ExecutionException, IOException {
238                 Future<Optional<Data>> dataFuture = fcpClient.clientGet().uri("KSK@foo.txt");
239                 connectNode();
240                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
241                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt"));
242                 fcpServer.close();
243                 dataFuture.get();
244         }
245
246         @Test
247         public void clientGetWithIgnoreDataStoreSettingSendsCorrectCommands()
248         throws InterruptedException, ExecutionException, IOException {
249                 fcpClient.clientGet().ignoreDataStore().uri("KSK@foo.txt");
250                 connectNode();
251                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
252                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "IgnoreDS=true"));
253         }
254
255         @Test
256         public void clientGetWithDataStoreOnlySettingSendsCorrectCommands()
257         throws InterruptedException, ExecutionException, IOException {
258                 fcpClient.clientGet().dataStoreOnly().uri("KSK@foo.txt");
259                 connectNode();
260                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
261                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "DSonly=true"));
262         }
263
264         @Test
265         public void clientGetWithMaxSizeSettingSendsCorrectCommands()
266         throws InterruptedException, ExecutionException, IOException {
267                 fcpClient.clientGet().maxSize(1048576).uri("KSK@foo.txt");
268                 connectNode();
269                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
270                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "MaxSize=1048576"));
271         }
272
273         @Test
274         public void clientGetWithPrioritySettingSendsCorrectCommands()
275         throws InterruptedException, ExecutionException, IOException {
276                 fcpClient.clientGet().priority(Priority.interactive).uri("KSK@foo.txt");
277                 connectNode();
278                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
279                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "PriorityClass=1"));
280         }
281
282         @Test
283         public void clientGetWithRealTimeSettingSendsCorrectCommands()
284         throws InterruptedException, ExecutionException, IOException {
285                 fcpClient.clientGet().realTime().uri("KSK@foo.txt");
286                 connectNode();
287                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
288                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "RealTimeFlag=true"));
289         }
290
291         @Test
292         public void clientGetWithGlobalSettingSendsCorrectCommands()
293         throws InterruptedException, ExecutionException, IOException {
294                 fcpClient.clientGet().global().uri("KSK@foo.txt");
295                 connectNode();
296                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
297                 assertThat(lines, matchesFcpMessage("ClientGet", "URI=KSK@foo.txt", "Global=true"));
298         }
299
300         private Matcher<List<String>> matchesFcpMessage(String name, String... requiredLines) {
301                 return new TypeSafeDiagnosingMatcher<List<String>>() {
302                         @Override
303                         protected boolean matchesSafely(List<String> item, Description mismatchDescription) {
304                                 if (!item.get(0).equals(name)) {
305                                         mismatchDescription.appendText("FCP message is named ").appendValue(item.get(0));
306                                         return false;
307                                 }
308                                 for (String requiredLine : requiredLines) {
309                                         if (item.indexOf(requiredLine) < 1) {
310                                                 mismatchDescription.appendText("FCP message does not contain ").appendValue(requiredLine);
311                                                 return false;
312                                         }
313                                 }
314                                 return true;
315                         }
316
317                         @Override
318                         public void describeTo(Description description) {
319                                 description.appendText("FCP message named ").appendValue(name);
320                                 description.appendValueList(", containing the lines ", ", ", "", requiredLines);
321                         }
322                 };
323         }
324
325         @Test
326         public void clientPutWithDirectDataSendsCorrectCommand()
327         throws IOException, ExecutionException, InterruptedException {
328                 fcpClient.clientPut()
329                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
330                         .length(6)
331                         .uri("KSK@foo.txt");
332                 connectNode();
333                 List<String> lines = fcpServer.collectUntil(is("Hello"));
334                 assertThat(lines, matchesFcpMessage("ClientPut", "UploadFrom=direct", "DataLength=6", "URI=KSK@foo.txt"));
335         }
336
337         @Test
338         public void clientPutWithDirectDataSucceedsOnCorrectIdentifier()
339         throws InterruptedException, ExecutionException, IOException {
340                 Future<Optional<Key>> key = fcpClient.clientPut()
341                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
342                         .length(6)
343                         .uri("KSK@foo.txt");
344                 connectNode();
345                 List<String> lines = fcpServer.collectUntil(is("Hello"));
346                 String identifier = extractIdentifier(lines);
347                 fcpServer.writeLine(
348                         "PutFailed",
349                         "Identifier=not-the-right-one",
350                         "EndMessage"
351                 );
352                 fcpServer.writeLine(
353                         "PutSuccessful",
354                         "URI=KSK@foo.txt",
355                         "Identifier=" + identifier,
356                         "EndMessage"
357                 );
358                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
359         }
360
361         @Test
362         public void clientPutWithDirectDataFailsOnCorrectIdentifier()
363         throws InterruptedException, ExecutionException, IOException {
364                 Future<Optional<Key>> key = fcpClient.clientPut()
365                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
366                         .length(6)
367                         .uri("KSK@foo.txt");
368                 connectNode();
369                 List<String> lines = fcpServer.collectUntil(is("Hello"));
370                 String identifier = extractIdentifier(lines);
371                 fcpServer.writeLine(
372                         "PutSuccessful",
373                         "Identifier=not-the-right-one",
374                         "URI=KSK@foo.txt",
375                         "EndMessage"
376                 );
377                 fcpServer.writeLine(
378                         "PutFailed",
379                         "Identifier=" + identifier,
380                         "EndMessage"
381                 );
382                 assertThat(key.get().isPresent(), is(false));
383         }
384
385         @Test
386         public void clientPutWithRenamedDirectDataSendsCorrectCommand()
387         throws InterruptedException, ExecutionException, IOException {
388                 fcpClient.clientPut()
389                         .named("otherName.txt")
390                         .from(new ByteArrayInputStream("Hello\n".getBytes()))
391                         .length(6)
392                         .uri("KSK@foo.txt");
393                 connectNode();
394                 List<String> lines = fcpServer.collectUntil(is("Hello"));
395                 assertThat(lines, matchesFcpMessage("ClientPut", "TargetFilename=otherName.txt", "UploadFrom=direct",
396                         "DataLength=6", "URI=KSK@foo.txt"));
397         }
398
399         @Test
400         public void clientPutWithRedirectSendsCorrectCommand()
401         throws IOException, ExecutionException, InterruptedException {
402                 fcpClient.clientPut().redirectTo("KSK@bar.txt").uri("KSK@foo.txt");
403                 connectNode();
404                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
405                 assertThat(lines,
406                         matchesFcpMessage("ClientPut", "UploadFrom=redirect", "URI=KSK@foo.txt", "TargetURI=KSK@bar.txt"));
407         }
408
409         @Test
410         public void clientPutWithFileSendsCorrectCommand() throws InterruptedException, ExecutionException, IOException {
411                 fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt");
412                 connectNode();
413                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
414                 assertThat(lines,
415                         matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt", "Filename=/tmp/data.txt"));
416         }
417
418         @Test
419         public void clientPutWithFileCanCompleteTestDdaSequence()
420         throws IOException, ExecutionException, InterruptedException {
421                 File tempFile = createTempFile();
422                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
423                 connectNode();
424                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
425                 String identifier = extractIdentifier(lines);
426                 fcpServer.writeLine(
427                         "ProtocolError",
428                         "Identifier=" + identifier,
429                         "Code=25",
430                         "EndMessage"
431                 );
432                 lines = fcpServer.collectUntil(is("EndMessage"));
433                 assertThat(lines, matchesFcpMessage(
434                         "TestDDARequest",
435                         "Directory=" + tempFile.getParent(),
436                         "WantReadDirectory=true",
437                         "WantWriteDirectory=false",
438                         "EndMessage"
439                 ));
440                 fcpServer.writeLine(
441                         "TestDDAReply",
442                         "Directory=" + tempFile.getParent(),
443                         "ReadFilename=" + tempFile,
444                         "EndMessage"
445                 );
446                 lines = fcpServer.collectUntil(is("EndMessage"));
447                 assertThat(lines, matchesFcpMessage(
448                         "TestDDAResponse",
449                         "Directory=" + tempFile.getParent(),
450                         "ReadContent=test-content",
451                         "EndMessage"
452                 ));
453                 fcpServer.writeLine(
454                         "TestDDAComplete",
455                         "Directory=" + tempFile.getParent(),
456                         "ReadDirectoryAllowed=true",
457                         "EndMessage"
458                 );
459                 lines = fcpServer.collectUntil(is("EndMessage"));
460                 assertThat(lines,
461                         matchesFcpMessage("ClientPut", "UploadFrom=disk", "URI=KSK@foo.txt",
462                                 "Filename=" + new File(tempFile.getParent(), "test.dat")));
463         }
464
465         private File createTempFile() throws IOException {
466                 File tempFile = File.createTempFile("test-dda-", ".dat");
467                 tempFile.deleteOnExit();
468                 Files.write("test-content", tempFile, StandardCharsets.UTF_8);
469                 return tempFile;
470         }
471
472         @Test
473         public void clientPutDoesNotReactToProtocolErrorForDifferentIdentifier()
474         throws InterruptedException, ExecutionException, IOException {
475                 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt");
476                 connectNode();
477                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
478                 String identifier = extractIdentifier(lines);
479                 fcpServer.writeLine(
480                         "ProtocolError",
481                         "Identifier=not-the-right-one",
482                         "Code=25",
483                         "EndMessage"
484                 );
485                 fcpServer.writeLine(
486                         "PutSuccessful",
487                         "Identifier=" + identifier,
488                         "URI=KSK@foo.txt",
489                         "EndMessage"
490                 );
491                 assertThat(key.get().get().getKey(), is("KSK@foo.txt"));
492         }
493
494         @Test
495         public void clientPutAbortsOnProtocolErrorOtherThan25()
496         throws InterruptedException, ExecutionException, IOException {
497                 Future<Optional<Key>> key = fcpClient.clientPut().from(new File("/tmp/data.txt")).uri("KSK@foo.txt");
498                 connectNode();
499                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
500                 String identifier = extractIdentifier(lines);
501                 fcpServer.writeLine(
502                         "ProtocolError",
503                         "Identifier=" + identifier,
504                         "Code=1",
505                         "EndMessage"
506                 );
507                 assertThat(key.get().isPresent(), is(false));
508         }
509
510         @Test
511         public void clientPutDoesNotReplyToWrongTestDdaReply() throws IOException, ExecutionException,
512         InterruptedException {
513                 File tempFile = createTempFile();
514                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
515                 connectNode();
516                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
517                 String identifier = extractIdentifier(lines);
518                 fcpServer.writeLine(
519                         "ProtocolError",
520                         "Identifier=" + identifier,
521                         "Code=25",
522                         "EndMessage"
523                 );
524                 lines = fcpServer.collectUntil(is("EndMessage"));
525                 assertThat(lines, matchesFcpMessage(
526                         "TestDDARequest",
527                         "Directory=" + tempFile.getParent(),
528                         "WantReadDirectory=true",
529                         "WantWriteDirectory=false",
530                         "EndMessage"
531                 ));
532                 fcpServer.writeLine(
533                         "TestDDAReply",
534                         "Directory=/some-other-directory",
535                         "ReadFilename=" + tempFile,
536                         "EndMessage"
537                 );
538                 fcpServer.writeLine(
539                         "TestDDAReply",
540                         "Directory=" + tempFile.getParent(),
541                         "ReadFilename=" + tempFile,
542                         "EndMessage"
543                 );
544                 lines = fcpServer.collectUntil(is("EndMessage"));
545                 assertThat(lines, matchesFcpMessage(
546                         "TestDDAResponse",
547                         "Directory=" + tempFile.getParent(),
548                         "ReadContent=test-content",
549                         "EndMessage"
550                 ));
551         }
552
553         @Test
554         public void clientPutSendsResponseEvenIfFileCanNotBeRead()
555         throws IOException, ExecutionException, InterruptedException {
556                 File tempFile = createTempFile();
557                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
558                 connectNode();
559                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
560                 String identifier = extractIdentifier(lines);
561                 fcpServer.writeLine(
562                         "ProtocolError",
563                         "Identifier=" + identifier,
564                         "Code=25",
565                         "EndMessage"
566                 );
567                 lines = fcpServer.collectUntil(is("EndMessage"));
568                 assertThat(lines, matchesFcpMessage(
569                         "TestDDARequest",
570                         "Directory=" + tempFile.getParent(),
571                         "WantReadDirectory=true",
572                         "WantWriteDirectory=false",
573                         "EndMessage"
574                 ));
575                 fcpServer.writeLine(
576                         "TestDDAReply",
577                         "Directory=" + tempFile.getParent(),
578                         "ReadFilename=" + tempFile + ".foo",
579                         "EndMessage"
580                 );
581                 lines = fcpServer.collectUntil(is("EndMessage"));
582                 assertThat(lines, matchesFcpMessage(
583                         "TestDDAResponse",
584                         "Directory=" + tempFile.getParent(),
585                         "ReadContent=failed-to-read",
586                         "EndMessage"
587                 ));
588         }
589
590         @Test
591         public void clientPutDoesNotResendOriginalClientPutOnTestDDACompleteWithWrongDirectory()
592         throws IOException, ExecutionException, InterruptedException {
593                 File tempFile = createTempFile();
594                 fcpClient.clientPut().from(new File(tempFile.getParent(), "test.dat")).uri("KSK@foo.txt");
595                 connectNode();
596                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
597                 String identifier = extractIdentifier(lines);
598                 fcpServer.writeLine(
599                         "TestDDAComplete",
600                         "Directory=/some-other-directory",
601                         "EndMessage"
602                 );
603                 fcpServer.writeLine(
604                         "ProtocolError",
605                         "Identifier=" + identifier,
606                         "Code=25",
607                         "EndMessage"
608                 );
609                 lines = fcpServer.collectUntil(is("EndMessage"));
610                 assertThat(lines, matchesFcpMessage(
611                         "TestDDARequest",
612                         "Directory=" + tempFile.getParent(),
613                         "WantReadDirectory=true",
614                         "WantWriteDirectory=false",
615                         "EndMessage"
616                 ));
617         }
618
619         @Test
620         public void clientCanListPeers() throws IOException, ExecutionException, InterruptedException {
621                 Future<Collection<Peer>> peers = fcpClient.listPeers().execute();
622                 connectNode();
623                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
624                 assertThat(lines, matchesFcpMessage(
625                         "ListPeers",
626                         "WithVolatile=false",
627                         "WithMetadata=false",
628                         "EndMessage"
629                 ));
630                 String identifier = extractIdentifier(lines);
631                 fcpServer.writeLine(
632                         "Peer",
633                         "Identifier=" + identifier,
634                         "identity=id1",
635                         "EndMessage"
636                 );
637                 fcpServer.writeLine(
638                         "Peer",
639                         "Identifier=" + identifier,
640                         "identity=id2",
641                         "EndMessage"
642                 );
643                 fcpServer.writeLine(
644                         "EndListPeers",
645                         "Identifier=" + identifier,
646                         "EndMessage"
647                 );
648                 assertThat(peers.get(), hasSize(2));
649                 assertThat(peers.get().stream().map(Peer::getIdentity).collect(Collectors.toList()),
650                         containsInAnyOrder("id1", "id2"));
651         }
652
653         @Test
654         public void clientCanListPeersWithMetadata() throws IOException, ExecutionException, InterruptedException {
655                 Future<Collection<Peer>> peers = fcpClient.listPeers().includeMetadata().execute();
656                 connectNode();
657                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
658                 assertThat(lines, matchesFcpMessage(
659                         "ListPeers",
660                         "WithVolatile=false",
661                         "WithMetadata=true",
662                         "EndMessage"
663                 ));
664                 String identifier = extractIdentifier(lines);
665                 fcpServer.writeLine(
666                         "Peer",
667                         "Identifier=" + identifier,
668                         "identity=id1",
669                         "metadata.foo=bar1",
670                         "EndMessage"
671                 );
672                 fcpServer.writeLine(
673                         "Peer",
674                         "Identifier=" + identifier,
675                         "identity=id2",
676                         "metadata.foo=bar2",
677                         "EndMessage"
678                 );
679                 fcpServer.writeLine(
680                         "EndListPeers",
681                         "Identifier=" + identifier,
682                         "EndMessage"
683                 );
684                 assertThat(peers.get(), hasSize(2));
685                 assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()),
686                         containsInAnyOrder("bar1", "bar2"));
687         }
688
689         @Test
690         public void clientCanListPeersWithVolatiles() throws IOException, ExecutionException, InterruptedException {
691                 Future<Collection<Peer>> peers = fcpClient.listPeers().includeVolatile().execute();
692                 connectNode();
693                 List<String> lines = fcpServer.collectUntil(is("EndMessage"));
694                 assertThat(lines, matchesFcpMessage(
695                         "ListPeers",
696                         "WithVolatile=true",
697                         "WithMetadata=false",
698                         "EndMessage"
699                 ));
700                 String identifier = extractIdentifier(lines);
701                 fcpServer.writeLine(
702                         "Peer",
703                         "Identifier=" + identifier,
704                         "identity=id1",
705                         "volatile.foo=bar1",
706                         "EndMessage"
707                 );
708                 fcpServer.writeLine(
709                         "Peer",
710                         "Identifier=" + identifier,
711                         "identity=id2",
712                         "volatile.foo=bar2",
713                         "EndMessage"
714                 );
715                 fcpServer.writeLine(
716                         "EndListPeers",
717                         "Identifier=" + identifier,
718                         "EndMessage"
719                 );
720                 assertThat(peers.get(), hasSize(2));
721                 assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()),
722                         containsInAnyOrder("bar1", "bar2"));
723         }
724
725 }