private final ListeningExecutorService threadPool;
private final ConnectionSupplier connectionSupplier;
+ private final AtomicBoolean includeMetadata = new AtomicBoolean(false);
+ private final AtomicBoolean includeVolatile = new AtomicBoolean(false);
public ListPeersCommandImpl(ExecutorService threadPool, ConnectionSupplier connectionSupplier) {
this.threadPool = MoreExecutors.listeningDecorator(threadPool);
}
@Override
+ public ListPeersCommand includeMetadata() {
+ includeMetadata.set(true);
+ return this;
+ }
+
+ @Override
+ public ListPeersCommand includeVolatile() {
+ includeVolatile.set(true);
+ return this;
+ }
+
+ @Override
public Future<Collection<Peer>> execute() {
String identifier = new RandomIdentifierGenerator().generate();
- ListPeers listPeers = new ListPeers(identifier);
+ ListPeers listPeers = new ListPeers(identifier, includeMetadata.get(), includeVolatile.get());
return threadPool.submit(() -> new ListPeersReplySequence().send(listPeers).get());
}
containsInAnyOrder("id1", "id2"));
}
+ @Test
+ public void clientCanListPeersWithMetadata() throws IOException, ExecutionException, InterruptedException {
+ Future<Collection<Peer>> peers = fcpClient.listPeers().includeMetadata().execute();
+ connectNode();
+ List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+ assertThat(lines, matchesFcpMessage(
+ "ListPeers",
+ "WithVolatile=false",
+ "WithMetadata=true",
+ "EndMessage"
+ ));
+ String identifier = extractIdentifier(lines);
+ fcpServer.writeLine(
+ "Peer",
+ "Identifier=" + identifier,
+ "identity=id1",
+ "metadata.foo=bar1",
+ "EndMessage"
+ );
+ fcpServer.writeLine(
+ "Peer",
+ "Identifier=" + identifier,
+ "identity=id2",
+ "metadata.foo=bar2",
+ "EndMessage"
+ );
+ fcpServer.writeLine(
+ "EndListPeers",
+ "Identifier=" + identifier,
+ "EndMessage"
+ );
+ assertThat(peers.get(), hasSize(2));
+ assertThat(peers.get().stream().map(peer -> peer.getMetadata("foo")).collect(Collectors.toList()),
+ containsInAnyOrder("bar1", "bar2"));
+ }
+
+ @Test
+ public void clientCanListPeersWithVolatiles() throws IOException, ExecutionException, InterruptedException {
+ Future<Collection<Peer>> peers = fcpClient.listPeers().includeVolatile().execute();
+ connectNode();
+ List<String> lines = fcpServer.collectUntil(is("EndMessage"));
+ assertThat(lines, matchesFcpMessage(
+ "ListPeers",
+ "WithVolatile=true",
+ "WithMetadata=false",
+ "EndMessage"
+ ));
+ String identifier = extractIdentifier(lines);
+ fcpServer.writeLine(
+ "Peer",
+ "Identifier=" + identifier,
+ "identity=id1",
+ "volatile.foo=bar1",
+ "EndMessage"
+ );
+ fcpServer.writeLine(
+ "Peer",
+ "Identifier=" + identifier,
+ "identity=id2",
+ "volatile.foo=bar2",
+ "EndMessage"
+ );
+ fcpServer.writeLine(
+ "EndListPeers",
+ "Identifier=" + identifier,
+ "EndMessage"
+ );
+ assertThat(peers.get(), hasSize(2));
+ assertThat(peers.get().stream().map(peer -> peer.getVolatile("foo")).collect(Collectors.toList()),
+ containsInAnyOrder("bar1", "bar2"));
+ }
+
}