<version>1.3</version>
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.10.19</version>
+ </dependency>
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
- <source>1.6</source>
- <target>1.6</target>
+ <source>1.8</source>
+ <target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
--- /dev/null
+package net.pterodactylus.fcp.quelaton;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import net.pterodactylus.fcp.ClientHello;
+import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
+import net.pterodactylus.fcp.FcpConnection;
+import net.pterodactylus.fcp.FcpKeyPair;
+import net.pterodactylus.fcp.GenerateSSK;
+import net.pterodactylus.fcp.NodeHello;
+import net.pterodactylus.fcp.SSKKeypair;
+
+/**
+ * Default {@link FcpClient} implementation.
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class DefaultFcpClient implements FcpClient {
+
+ private final ExecutorService threadPool;
+ private final String hostname;
+ private final int port;
+ private final AtomicReference<FcpConnection> fcpConnection = new AtomicReference<>();
+ private final Supplier<String> clientName;
+ private final Supplier<String> expectedVersion;
+
+ public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier<String> clientName,
+ Supplier<String> expectedVersion) {
+ this.threadPool = threadPool;
+ this.hostname = hostname;
+ this.port = port;
+ this.clientName = clientName;
+ this.expectedVersion = expectedVersion;
+ }
+
+ private void connect() throws IOException {
+ if (fcpConnection.get() != null) {
+ return;
+ }
+ fcpConnection.compareAndSet(null, createConnection());
+ }
+
+ private FcpConnection createConnection() throws IOException {
+ FcpConnection connection = new FcpConnection(hostname, port);
+ connection.connect();
+ AtomicReference<NodeHello> receivedNodeHello = new AtomicReference<>();
+ AtomicBoolean receivedClosed = new AtomicBoolean();
+ FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection);
+ nodeHelloSequence
+ .handle(NodeHello.class)
+ .with((nodeHello) -> receivedNodeHello.set(nodeHello));
+ nodeHelloSequence
+ .handle(CloseConnectionDuplicateClientName.class)
+ .with((closeConnection) -> receivedClosed.set(true));
+ nodeHelloSequence.waitFor(() -> receivedNodeHello.get() != null || receivedClosed.get());
+ ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get());
+ try {
+ nodeHelloSequence.send(clientHello).get();
+ } catch (InterruptedException | ExecutionException e) {
+ connection.close();
+ throw new IOException(String.format("Could not connect to %s:%d.", hostname, port), e);
+ }
+ return connection;
+ }
+
+ @Override
+ public GenerateKeypairCommand generateKeypair() {
+ return new GenerateKeypairCommandImpl();
+ }
+
+ private class GenerateKeypairCommandImpl implements GenerateKeypairCommand {
+
+ @Override
+ public Future<FcpKeyPair> execute() {
+ return threadPool.submit(() -> {
+ connect();
+ GenerateSSK generateSSK = new GenerateSSK();
+ AtomicReference<FcpKeyPair> keyPair = new AtomicReference<>();
+ FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get());
+ replySequence.handle(SSKKeypair.class)
+ .with((message) -> keyPair.set(
+ new FcpKeyPair(message.getRequestURI(), message.getInsertURI())));
+ replySequence.waitFor(() -> keyPair.get() != null);
+ replySequence.send(generateSSK).get();
+ return keyPair.get();
+ });
+ }
+
+ }
+
+}
--- /dev/null
+package net.pterodactylus.fcp.quelaton;
+
+/**
+ * FCP client used to communicate with a Freenet node.
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public interface FcpClient {
+
+ GenerateKeypairCommand generateKeypair();
+
+}
--- /dev/null
+package net.pterodactylus.fcp.quelaton;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import net.pterodactylus.fcp.AllData;
+import net.pterodactylus.fcp.BaseMessage;
+import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
+import net.pterodactylus.fcp.ConfigData;
+import net.pterodactylus.fcp.DataFound;
+import net.pterodactylus.fcp.EndListPeerNotes;
+import net.pterodactylus.fcp.EndListPeers;
+import net.pterodactylus.fcp.EndListPersistentRequests;
+import net.pterodactylus.fcp.FCPPluginReply;
+import net.pterodactylus.fcp.FcpConnection;
+import net.pterodactylus.fcp.FcpListener;
+import net.pterodactylus.fcp.FcpMessage;
+import net.pterodactylus.fcp.FinishedCompression;
+import net.pterodactylus.fcp.GetFailed;
+import net.pterodactylus.fcp.IdentifierCollision;
+import net.pterodactylus.fcp.NodeData;
+import net.pterodactylus.fcp.NodeHello;
+import net.pterodactylus.fcp.Peer;
+import net.pterodactylus.fcp.PeerNote;
+import net.pterodactylus.fcp.PeerRemoved;
+import net.pterodactylus.fcp.PersistentGet;
+import net.pterodactylus.fcp.PersistentPut;
+import net.pterodactylus.fcp.PersistentPutDir;
+import net.pterodactylus.fcp.PersistentRequestModified;
+import net.pterodactylus.fcp.PersistentRequestRemoved;
+import net.pterodactylus.fcp.PluginInfo;
+import net.pterodactylus.fcp.ProtocolError;
+import net.pterodactylus.fcp.PutFailed;
+import net.pterodactylus.fcp.PutFetchable;
+import net.pterodactylus.fcp.PutSuccessful;
+import net.pterodactylus.fcp.ReceivedBookmarkFeed;
+import net.pterodactylus.fcp.SSKKeypair;
+import net.pterodactylus.fcp.SentFeed;
+import net.pterodactylus.fcp.SimpleProgress;
+import net.pterodactylus.fcp.StartedCompression;
+import net.pterodactylus.fcp.SubscribedUSKUpdate;
+import net.pterodactylus.fcp.TestDDAComplete;
+import net.pterodactylus.fcp.TestDDAReply;
+import net.pterodactylus.fcp.URIGenerated;
+import net.pterodactylus.fcp.UnknownNodeIdentifier;
+import net.pterodactylus.fcp.UnknownPeerNoteType;
+
+/**
+ * An FCP reply sequence enables you to conveniently wait for a specific set of FCP replies.
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class FcpReplySequence implements AutoCloseable, FcpListener {
+
+ private final ExecutorService executorService;
+ private final FcpConnection fcpConnection;
+ private final Map<Class<? extends BaseMessage>, Consumer<BaseMessage>> expectedMessageActions = new HashMap<>();
+ private final List<Consumer<FcpMessage>> unknownMessageHandlers = new ArrayList<>();
+ private final List<Consumer<Throwable>> closeHandlers = new ArrayList<>();
+ private Supplier<Boolean> endPredicate;
+
+ public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) {
+ this.executorService = executorService;
+ this.fcpConnection = fcpConnection;
+ }
+
+ public <M extends BaseMessage> $1<M> handle(Class<M> messageClass) {
+ return new $1<>(messageClass);
+ }
+
+ public class $1<M extends BaseMessage> {
+
+ private Class<M> messageClass;
+
+ private $1(Class<M> messageClass) {
+ this.messageClass = messageClass;
+ }
+
+ public FcpReplySequence with(Consumer<M> action) {
+ expectedMessageActions.put(messageClass, (Consumer<BaseMessage>) action);
+ return FcpReplySequence.this;
+ }
+
+ }
+
+ public $2 handleUnknown() {
+ return new $2();
+ }
+
+ public class $2 {
+
+ public FcpReplySequence with(Consumer<FcpMessage> consumer) {
+ unknownMessageHandlers.add(consumer);
+ return FcpReplySequence.this;
+ }
+
+ }
+
+ public $3 handleClose() {
+ return new $3();
+ }
+
+ public class $3 {
+
+ public FcpReplySequence with(Consumer<Throwable> consumer) {
+ closeHandlers.add(consumer);
+ return FcpReplySequence.this;
+ }
+
+ }
+
+ public void waitFor(Supplier<Boolean> endPredicate) {
+ this.endPredicate = endPredicate;
+ }
+
+ public Future<?> send(FcpMessage fcpMessage) throws IOException {
+ fcpConnection.addFcpListener(this);
+ fcpConnection.sendMessage(fcpMessage);
+ return executorService.submit(() -> {
+ synchronized (endPredicate) {
+ while (!endPredicate.get()) {
+ endPredicate.wait();
+ }
+ }
+ return null;
+ });
+ }
+
+ @Override
+ public void close() {
+ fcpConnection.removeFcpListener(this);
+ }
+
+ private <M extends BaseMessage> void consume(Class<M> fcpMessageClass, BaseMessage fcpMessage) {
+ if (expectedMessageActions.containsKey(fcpMessageClass)) {
+ expectedMessageActions.get(fcpMessageClass).accept(fcpMessage);
+ }
+ synchronized (endPredicate) {
+ endPredicate.notifyAll();
+ }
+ }
+
+ private void consumeUnknown(FcpMessage fcpMessage) {
+ for (Consumer<FcpMessage> unknownMessageHandler : unknownMessageHandlers) {
+ unknownMessageHandler.accept(fcpMessage);
+ }
+ synchronized (endPredicate) {
+ endPredicate.notifyAll();
+ }
+ }
+
+ private void consumeClose(Throwable throwable) {
+ for (Consumer<Throwable> closeHandler : closeHandlers) {
+ closeHandler.accept(throwable);
+ }
+ synchronized (endPredicate) {
+ endPredicate.notifyAll();
+ }
+ }
+
+ @Override
+ public void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) {
+ consume(NodeHello.class, nodeHello);
+ }
+
+ @Override
+ public void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection,
+ CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
+ consume(CloseConnectionDuplicateClientName.class, closeConnectionDuplicateClientName);
+ }
+
+ @Override
+ public void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) {
+ consume(SSKKeypair.class, sskKeypair);
+ }
+
+ @Override
+ public void receivedPeer(FcpConnection fcpConnection, Peer peer) {
+ consume(Peer.class, peer);
+ }
+
+ @Override
+ public void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) {
+ consume(EndListPeers.class, endListPeers);
+ }
+
+ @Override
+ public void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) {
+ consume(PeerNote.class, peerNote);
+ }
+
+ @Override
+ public void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) {
+ consume(EndListPeerNotes.class, endListPeerNotes);
+ }
+
+ @Override
+ public void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) {
+ consume(PeerRemoved.class, peerRemoved);
+ }
+
+ @Override
+ public void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) {
+ consume(NodeData.class, nodeData);
+ }
+
+ @Override
+ public void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) {
+ consume(TestDDAReply.class, testDDAReply);
+ }
+
+ @Override
+ public void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) {
+ consume(TestDDAComplete.class, testDDAComplete);
+ }
+
+ @Override
+ public void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) {
+ consume(PersistentGet.class, persistentGet);
+ }
+
+ @Override
+ public void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) {
+ consume(PersistentPut.class, persistentPut);
+ }
+
+ @Override
+ public void receivedEndListPersistentRequests(FcpConnection fcpConnection,
+ EndListPersistentRequests endListPersistentRequests) {
+ consume(EndListPersistentRequests.class, endListPersistentRequests);
+ }
+
+ @Override
+ public void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) {
+ consume(URIGenerated.class, uriGenerated);
+ }
+
+ @Override
+ public void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) {
+ consume(DataFound.class, dataFound);
+ }
+
+ @Override
+ public void receivedAllData(FcpConnection fcpConnection, AllData allData) {
+ consume(AllData.class, allData);
+ }
+
+ @Override
+ public void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) {
+ consume(SimpleProgress.class, simpleProgress);
+ }
+
+ @Override
+ public void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) {
+ consume(StartedCompression.class, startedCompression);
+ }
+
+ @Override
+ public void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) {
+ consume(FinishedCompression.class, finishedCompression);
+ }
+
+ @Override
+ public void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) {
+ consume(UnknownPeerNoteType.class, unknownPeerNoteType);
+ }
+
+ @Override
+ public void receivedUnknownNodeIdentifier(FcpConnection fcpConnection,
+ UnknownNodeIdentifier unknownNodeIdentifier) {
+ consume(UnknownNodeIdentifier.class, unknownNodeIdentifier);
+ }
+
+ @Override
+ public void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) {
+ consume(ConfigData.class, configData);
+ }
+
+ @Override
+ public void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) {
+ consume(GetFailed.class, getFailed);
+ }
+
+ @Override
+ public void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) {
+ consume(PutFailed.class, putFailed);
+ }
+
+ @Override
+ public void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) {
+ consume(IdentifierCollision.class, identifierCollision);
+ }
+
+ @Override
+ public void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) {
+ consume(PersistentPutDir.class, persistentPutDir);
+ }
+
+ @Override
+ public void receivedPersistentRequestRemoved(FcpConnection fcpConnection,
+ PersistentRequestRemoved persistentRequestRemoved) {
+ consume(PersistentRequestRemoved.class, persistentRequestRemoved);
+ }
+
+ @Override
+ public void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) {
+ consume(SubscribedUSKUpdate.class, subscribedUSKUpdate);
+ }
+
+ @Override
+ public void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) {
+ consume(PluginInfo.class, pluginInfo);
+ }
+
+ @Override
+ public void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) {
+ consume(FCPPluginReply.class, fcpPluginReply);
+ }
+
+ @Override
+ public void receivedPersistentRequestModified(FcpConnection fcpConnection,
+ PersistentRequestModified persistentRequestModified) {
+ consume(PersistentRequestModified.class, persistentRequestModified);
+ }
+
+ @Override
+ public void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) {
+ consume(PutSuccessful.class, putSuccessful);
+ }
+
+ @Override
+ public void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) {
+ consume(PutFetchable.class, putFetchable);
+ }
+
+ @Override
+ public void receivedSentFeed(FcpConnection source, SentFeed sentFeed) {
+ consume(SentFeed.class, sentFeed);
+ }
+
+ @Override
+ public void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) {
+ consume(ReceivedBookmarkFeed.class, receivedBookmarkFeed);
+ }
+
+ @Override
+ public void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) {
+ consume(ProtocolError.class, protocolError);
+ }
+
+ @Override
+ public void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
+ consumeUnknown(fcpMessage);
+ }
+
+ @Override
+ public void connectionClosed(FcpConnection fcpConnection, Throwable throwable) {
+ consumeClose(throwable);
+ }
+
+}
--- /dev/null
+package net.pterodactylus.fcp.quelaton;
+
+import net.pterodactylus.fcp.FcpKeyPair;
+
+import java.util.concurrent.Future;
+
+/**
+ * Command to generate an SSK key pair.
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public interface GenerateKeypairCommand {
+
+ Future<FcpKeyPair> execute();
+
+}
--- /dev/null
+package net.pterodactylus.fcp.fake;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.hamcrest.Matcher;
+
+/**
+ * TODO
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class FakeTcpServer {
+
+ private final ServerSocket serverSocket;
+ private final ExecutorService executorService;
+ private final AtomicReference<TextSocket> clientSocket = new AtomicReference<>();
+
+ public FakeTcpServer(ExecutorService executorService) throws IOException {
+ this.executorService = executorService;
+ this.serverSocket = new ServerSocket(0);
+ }
+
+ public int getPort() {
+ return serverSocket.getLocalPort();
+ }
+
+ public Future<?> connect() throws IOException {
+ return executorService.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ clientSocket.set(new TextSocket(serverSocket.accept()));
+ return null;
+ }
+ });
+ }
+
+ public List<String> collectUntil(Matcher<String> lineMatcher) throws IOException {
+ return clientSocket.get().collectUntil(lineMatcher);
+ }
+
+ public void writeLine(String line) throws IOException {
+ clientSocket.get().writeLine(line);
+ }
+
+ public String readLine() throws IOException {
+ return clientSocket.get().readLine();
+ }
+
+}
--- /dev/null
+package net.pterodactylus.fcp.fake;
+
+import static java.util.Arrays.asList;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import java.io.IOException;
+import java.net.Proxy;
+import java.net.ProxySelector;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.URI;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.junit.Test;
+
+/**
+ * TODO
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class FakeTcpServerTest {
+
+ private final ExecutorService sameThread = Executors.newSingleThreadExecutor();
+ private final FakeTcpServer tcpServer;
+
+ public FakeTcpServerTest() throws IOException {
+ this.tcpServer = new FakeTcpServer(sameThread);
+ }
+
+ @Test
+ public void testConnect() throws IOException, ExecutionException, InterruptedException {
+ ProxySelector.setDefault(new ProxySelector() {
+ @Override
+ public List<Proxy> select(URI uri) {
+ return asList(Proxy.NO_PROXY);
+ }
+
+ @Override
+ public void connectFailed(URI uri, SocketAddress sa, IOException ioe) {
+ }
+ });
+ tcpServer.connect();
+ try (TextSocket clientSocket = new TextSocket(new Socket("127.0.0.1", tcpServer.getPort()))) {
+ clientSocket.writeLine("Hello");
+ clientSocket.writeLine("Bye");
+ List<String> receivedLines = tcpServer.collectUntil(is("Bye"));
+ assertThat(receivedLines, contains("Hello", "Bye"));
+ tcpServer.writeLine("Yes");
+ tcpServer.writeLine("Quit");
+ receivedLines = clientSocket.collectUntil(is("Quit"));
+ assertThat(receivedLines, contains("Yes", "Quit"));
+ }
+ }
+
+}
--- /dev/null
+package net.pterodactylus.fcp.fake;
+
+import java.io.BufferedReader;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.Socket;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hamcrest.Matcher;
+
+/**
+ * TODO
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+class TextSocket implements Closeable {
+
+ private final Socket socket;
+ private final InputStream socketInput;
+ private final OutputStream socketOutput;
+ private final BufferedReader inputReader;
+ private final Writer outputWriter;
+
+ TextSocket(Socket socket) throws IOException {
+ this.socket = socket;
+ this.socketInput = socket.getInputStream();
+ this.socketOutput = socket.getOutputStream();
+ this.inputReader = new BufferedReader(new InputStreamReader(socketInput, "UTF-8"));
+ this.outputWriter = new OutputStreamWriter(socketOutput, "UTF-8");
+ }
+
+ public String readLine() throws IOException {
+ return inputReader.readLine();
+ }
+
+ public void writeLine(String line) throws IOException {
+ outputWriter.write(line + "\n");
+ outputWriter.flush();
+ }
+
+ public List<String> collectUntil(Matcher<String> lineMatcher) throws IOException {
+ List<String> collectedLines = new ArrayList<String>();
+ while (true) {
+ String line = readLine();
+ if (line == null) {
+ throw new EOFException();
+ }
+ collectedLines.add(line);
+ if (lineMatcher.matches(line)) {
+ break;
+ }
+ }
+ return collectedLines;
+ }
+
+ @Override
+ public void close() throws IOException {
+ outputWriter.close();
+ inputReader.close();
+ socketOutput.close();
+ socketInput.close();
+ socket.close();
+ }
+
+}
--- /dev/null
+package net.pterodactylus.fcp.quelaton;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import net.pterodactylus.fcp.FcpKeyPair;
+import net.pterodactylus.fcp.fake.FakeTcpServer;
+
+import org.junit.Test;
+
+/**
+ * Unit test for {@link DefaultFcpClient}.
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class DefaultFcpClientTest {
+
+ private final ExecutorService threadPool = Executors.newCachedThreadPool();
+ private final FakeTcpServer fcpServer;
+ private final DefaultFcpClient fcpClient;
+
+ public DefaultFcpClientTest() throws IOException {
+ fcpServer = new FakeTcpServer(threadPool);
+ fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test", () -> "2.0");
+ }
+
+ @Test
+ public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException {
+ Future<FcpKeyPair> keyPairFuture = fcpClient.generateKeypair().execute();
+ connectNode();
+ fcpServer.collectUntil(is("EndMessage"));
+ fcpServer.writeLine("SSKKeypair\n"
+ + "InsertURI=freenet:SSK@AKTTKG6YwjrHzWo67laRcoPqibyiTdyYufjVg54fBlWr,AwUSJG5ZS-FDZTqnt6skTzhxQe08T-fbKXj8aEHZsXM/\n"
+ + "RequestURI=freenet:SSK@BnHXXv3Fa43w~~iz1tNUd~cj4OpUuDjVouOWZ5XlpX0,AwUSJG5ZS-FDZTqnt6skTzhxQe08T-fbKXj8aEHZsXM,AQABAAE/\n"
+ + "Identifier=My Identifier from GenerateSSK\n"
+ + "EndMessage");
+ FcpKeyPair keyPair = keyPairFuture.get();
+ assertThat(keyPair.getPublicKey(),
+ is("freenet:SSK@BnHXXv3Fa43w~~iz1tNUd~cj4OpUuDjVouOWZ5XlpX0,AwUSJG5ZS-FDZTqnt6skTzhxQe08T-fbKXj8aEHZsXM,AQABAAE/"));
+ assertThat(keyPair.getPrivateKey(), is(
+ "freenet:SSK@AKTTKG6YwjrHzWo67laRcoPqibyiTdyYufjVg54fBlWr,AwUSJG5ZS-FDZTqnt6skTzhxQe08T-fbKXj8aEHZsXM/"));
+ }
+
+ private void connectNode() throws InterruptedException, ExecutionException, IOException {
+ fcpServer.connect().get();
+ fcpServer.collectUntil(is("EndMessage"));
+ fcpServer.writeLine("NodeHello\n"
+ + "FCPVersion=2.0\n"
+ + "ConnectionIdentifier=754595fc35701d76096d8279d15c57e6\n"
+ + "Version=Fred,0.7,1.0,1231\n"
+ + "Node=Fred\n"
+ + "NodeLanguage=ENGLISH\n"
+ + "ExtRevision=23771\n"
+ + "Build=1231\n"
+ + "Testnet=false\n"
+ + "ExtBuild=26\n"
+ + "CompressionCodecs=3 - GZIP(0), BZIP2(1), LZMA(2)\n"
+ + "Revision=@custom@\n"
+ + "EndMessage");
+ }
+
+}
--- /dev/null
+package net.pterodactylus.fcp.quelaton;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+import net.pterodactylus.fcp.AllData;
+import net.pterodactylus.fcp.BaseMessage;
+import net.pterodactylus.fcp.CloseConnectionDuplicateClientName;
+import net.pterodactylus.fcp.ConfigData;
+import net.pterodactylus.fcp.DataFound;
+import net.pterodactylus.fcp.EndListPeerNotes;
+import net.pterodactylus.fcp.EndListPeers;
+import net.pterodactylus.fcp.EndListPersistentRequests;
+import net.pterodactylus.fcp.FCPPluginReply;
+import net.pterodactylus.fcp.FcpConnection;
+import net.pterodactylus.fcp.FcpMessage;
+import net.pterodactylus.fcp.FinishedCompression;
+import net.pterodactylus.fcp.GetFailed;
+import net.pterodactylus.fcp.IdentifierCollision;
+import net.pterodactylus.fcp.NodeData;
+import net.pterodactylus.fcp.NodeHello;
+import net.pterodactylus.fcp.Peer;
+import net.pterodactylus.fcp.PeerNote;
+import net.pterodactylus.fcp.PeerRemoved;
+import net.pterodactylus.fcp.PersistentGet;
+import net.pterodactylus.fcp.PersistentPut;
+import net.pterodactylus.fcp.PersistentPutDir;
+import net.pterodactylus.fcp.PersistentRequestModified;
+import net.pterodactylus.fcp.PersistentRequestRemoved;
+import net.pterodactylus.fcp.PluginInfo;
+import net.pterodactylus.fcp.ProtocolError;
+import net.pterodactylus.fcp.PutFailed;
+import net.pterodactylus.fcp.PutFetchable;
+import net.pterodactylus.fcp.PutSuccessful;
+import net.pterodactylus.fcp.ReceivedBookmarkFeed;
+import net.pterodactylus.fcp.SSKKeypair;
+import net.pterodactylus.fcp.SentFeed;
+import net.pterodactylus.fcp.SimpleProgress;
+import net.pterodactylus.fcp.StartedCompression;
+import net.pterodactylus.fcp.SubscribedUSKUpdate;
+import net.pterodactylus.fcp.TestDDAComplete;
+import net.pterodactylus.fcp.TestDDAReply;
+import net.pterodactylus.fcp.URIGenerated;
+import net.pterodactylus.fcp.UnknownNodeIdentifier;
+import net.pterodactylus.fcp.UnknownPeerNoteType;
+
+import org.junit.Test;
+
+/**
+ * Unit test for {@link FcpReplySequence}.
+ *
+ * @author <a href="bombe@freenetproject.org">David ‘Bombe’ Roden</a>
+ */
+public class FcpReplySequenceTest {
+
+ private final FcpConnection fcpConnection = mock(FcpConnection.class);
+ private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ private final FcpReplySequence replyWaiter = new FcpReplySequence(executorService, fcpConnection);
+ private final FcpMessage fcpMessage = new FcpMessage("Test");
+
+ @Test
+ public void canSendMessage() throws IOException {
+ replyWaiter.send(fcpMessage);
+ verify(fcpConnection).sendMessage(fcpMessage);
+ }
+
+ @Test
+ public void sendingAMessageRegistersTheWaiterAsFcpListener() throws IOException {
+ replyWaiter.send(fcpMessage);
+ verify(fcpConnection).addFcpListener(replyWaiter);
+ }
+
+ @Test
+ public void closingTheReplyWaiterRemovesTheFcpListener() throws IOException {
+ replyWaiter.send(fcpMessage);
+ replyWaiter.close();
+ verify(fcpConnection).removeFcpListener(replyWaiter);
+ }
+
+ private <M extends BaseMessage> void waitForASpecificMessage(MessageReceiver<M> messageReceiver,
+ Class<M> messageClass, Supplier<M> message) throws IOException, InterruptedException, ExecutionException {
+ AtomicBoolean gotMessage = setupMessage(messageClass);
+ Future<?> result = replyWaiter.send(fcpMessage);
+ sendMessage(messageReceiver, message.get());
+ result.get();
+ assertThat(gotMessage.get(), is(true));
+ }
+
+ private <M extends BaseMessage> void sendMessage(MessageReceiver<M> messageReceiver, M message) {
+ messageReceiver.receive(fcpConnection, message);
+ }
+
+ private interface MessageReceiver<M extends BaseMessage> {
+
+ void receive(FcpConnection fcpConnection, M message);
+ }
+
+ private <M extends BaseMessage> AtomicBoolean setupMessage(Class<M> messageClass) {
+ AtomicBoolean gotMessage = new AtomicBoolean();
+ replyWaiter.handle(messageClass).with((message) -> gotMessage.set(true));
+ replyWaiter.waitFor(() -> gotMessage.get());
+ return gotMessage;
+ }
+
+ @Test
+ public void waitingForNodeHelloWorks() throws IOException, ExecutionException, InterruptedException {
+ waitForASpecificMessage(replyWaiter::receivedNodeHello, NodeHello.class,
+ () -> new NodeHello(new FcpMessage("NodeHello")));
+ }
+
+ @Test
+ public void waitingForConnectionClosedDuplicateClientNameWorks()
+ throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedCloseConnectionDuplicateClientName,
+ CloseConnectionDuplicateClientName.class,
+ () -> new CloseConnectionDuplicateClientName(new FcpMessage("CloseConnectionDuplicateClientName")));
+ }
+
+ @Test
+ public void waitingForSSKKeypairWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedSSKKeypair, SSKKeypair.class,
+ () -> new SSKKeypair(new FcpMessage("SSKKeypair")));
+ }
+
+ @Test
+ public void waitForPeerWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedPeer, Peer.class, () -> new Peer(new FcpMessage("Peer")));
+ }
+
+ @Test
+ public void waitForEndListPeersWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedEndListPeers, EndListPeers.class,
+ () -> new EndListPeers(new FcpMessage("EndListPeers")));
+ }
+
+ @Test
+ public void waitForPeerNoteWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedPeerNote, PeerNote.class,
+ () -> new PeerNote(new FcpMessage("PeerNote")));
+ }
+
+ @Test
+ public void waitForEndListPeerNotesWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedEndListPeerNotes, EndListPeerNotes.class,
+ () -> new EndListPeerNotes(new FcpMessage("EndListPeerNotes")));
+ }
+
+ @Test
+ public void waitForPeerRemovedWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedPeerRemoved, PeerRemoved.class,
+ () -> new PeerRemoved(new FcpMessage("PeerRemoved")));
+ }
+
+ @Test
+ public void waitForNodeDataWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedNodeData, NodeData.class,
+ () -> new NodeData(new FcpMessage("NodeData").put("ark.pubURI", "")
+ .put(
+ "ark.number", "0")
+ .put("auth.negTypes", "")
+ .put("version", "0,0,0,0")
+ .put("lastGoodVersion", "0,0,0,0")));
+ }
+
+ @Test
+ public void waitForTestDDAReplyWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedTestDDAReply, TestDDAReply.class,
+ () -> new TestDDAReply(new FcpMessage("TestDDAReply")));
+ }
+
+ @Test
+ public void waitForTestDDACompleteWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedTestDDAComplete, TestDDAComplete.class,
+ () -> new TestDDAComplete(new FcpMessage("TestDDAComplete")));
+ }
+
+ @Test
+ public void waitForPersistentGetWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedPersistentGet, PersistentGet.class,
+ () -> new PersistentGet(new FcpMessage("PersistentGet")));
+ }
+
+ @Test
+ public void waitForPersistentPutWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedPersistentPut, PersistentPut.class,
+ () -> new PersistentPut(new FcpMessage("PersistentPut")));
+ }
+
+ @Test
+ public void waitForEndListPersistentRequestsWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedEndListPersistentRequests, EndListPersistentRequests.class,
+ () -> new EndListPersistentRequests(new FcpMessage("EndListPersistentRequests")));
+ }
+
+ @Test
+ public void waitForURIGeneratedWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedURIGenerated, URIGenerated.class,
+ () -> new URIGenerated(new FcpMessage("URIGenerated")));
+ }
+
+ @Test
+ public void waitForDataFoundWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedDataFound, DataFound.class,
+ () -> new DataFound(new FcpMessage("DataFound")));
+ }
+
+ @Test
+ public void waitForAllDataWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedAllData, AllData.class,
+ () -> new AllData(new FcpMessage("AllData"), null));
+ }
+
+ @Test
+ public void waitForSimpleProgressWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedSimpleProgress, SimpleProgress.class,
+ () -> new SimpleProgress(new FcpMessage("SimpleProgress")));
+ }
+
+ @Test
+ public void waitForStartedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedStartedCompression, StartedCompression.class,
+ () -> new StartedCompression(new FcpMessage("StartedCompression")));
+ }
+
+ @Test
+ public void waitForFinishedCompressionWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedFinishedCompression, FinishedCompression.class,
+ () -> new FinishedCompression(new FcpMessage("FinishedCompression")));
+ }
+
+ @Test
+ public void waitForUnknownPeerNoteTypeWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedUnknownPeerNoteType, UnknownPeerNoteType.class,
+ () -> new UnknownPeerNoteType(new FcpMessage("UnknownPeerNoteType")));
+ }
+
+ @Test
+ public void waitForUnknownNodeIdentifierWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class,
+ () -> new UnknownNodeIdentifier(new FcpMessage("UnknownNodeIdentifier")));
+ }
+
+ @Test
+ public void waitForConfigDataWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedConfigData, ConfigData.class,
+ () -> new ConfigData(new FcpMessage("ConfigData")));
+ }
+
+ @Test
+ public void waitForGetFailedWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedGetFailed, GetFailed.class,
+ () -> new GetFailed(new FcpMessage("GetFailed")));
+ }
+
+ @Test
+ public void waitForPutFailedWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedPutFailed, PutFailed.class,
+ () -> new PutFailed(new FcpMessage("PutFailed")));
+ }
+
+ @Test
+ public void waitForIdentifierCollisionWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedIdentifierCollision, IdentifierCollision.class,
+ () -> new IdentifierCollision(new FcpMessage("IdentifierCollision")));
+ }
+
+ @Test
+ public void waitForPersistentPutDirWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedPersistentPutDir, PersistentPutDir.class,
+ () -> new PersistentPutDir(new FcpMessage("PersistentPutDir")));
+ }
+
+ @Test
+ public void waitForPersistentRequestRemovedWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedPersistentRequestRemoved, PersistentRequestRemoved.class,
+ () -> new PersistentRequestRemoved(new FcpMessage("PersistentRequestRemoved")));
+ }
+
+ @Test
+ public void waitForSubscribedUSKUpdateWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class,
+ () -> new SubscribedUSKUpdate(new FcpMessage("SubscribedUSKUpdate")));
+ }
+
+ @Test
+ public void waitForPluginInfoWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedPluginInfo, PluginInfo.class,
+ () -> new PluginInfo(new FcpMessage("PluginInfo")));
+ }
+
+ @Test
+ public void waitForFCPPluginReply() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedFCPPluginReply, FCPPluginReply.class,
+ () -> new FCPPluginReply(new FcpMessage("FCPPluginReply"), null));
+ }
+
+ @Test
+ public void waitForPersistentRequestModifiedWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedPersistentRequestModified, PersistentRequestModified.class,
+ () -> new PersistentRequestModified(new FcpMessage("PersistentRequestModified")));
+ }
+
+ @Test
+ public void waitForPutSuccessfulWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedPutSuccessful, PutSuccessful.class,
+ () -> new PutSuccessful(new FcpMessage("PutSuccessful")));
+ }
+
+ @Test
+ public void waitForPutFetchableWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedPutFetchable, PutFetchable.class,
+ () -> new PutFetchable(new FcpMessage("PutFetchable")));
+ }
+
+ @Test
+ public void waitForSentFeedWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedSentFeed, SentFeed.class,
+ () -> new SentFeed(new FcpMessage("SentFeed")));
+ }
+
+ @Test
+ public void waitForReceivedBookmarkFeedWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedBookmarkFeed, ReceivedBookmarkFeed.class,
+ () -> new ReceivedBookmarkFeed(new FcpMessage("ReceivedBookmarkFeed")));
+ }
+
+ @Test
+ public void waitForProtocolErrorWorks() throws InterruptedException, ExecutionException, IOException {
+ waitForASpecificMessage(replyWaiter::receivedProtocolError, ProtocolError.class,
+ () -> new ProtocolError(new FcpMessage("ProtocolError")));
+ }
+
+ @Test
+ public void waitForUnknownMessageWorks() throws IOException, ExecutionException, InterruptedException {
+ AtomicReference<FcpMessage> receivedMessage = new AtomicReference<>();
+ replyWaiter.handleUnknown().with((message) -> receivedMessage.set(message));
+ replyWaiter.waitFor(() -> receivedMessage.get() != null);
+ Future<?> result = replyWaiter.send(fcpMessage);
+ replyWaiter.receivedMessage(fcpConnection, fcpMessage);
+ result.get();
+ assertThat(receivedMessage.get(), is(fcpMessage));
+ }
+
+ @Test
+ public void waitingForMultipleMessagesWorks() throws IOException, ExecutionException, InterruptedException {
+ AtomicBoolean gotPutFailed = new AtomicBoolean();
+ replyWaiter.handle(PutFailed.class).with((getFailed) -> gotPutFailed.set(true));
+ AtomicBoolean gotGetFailed = new AtomicBoolean();
+ replyWaiter.handle(GetFailed.class).with((getFailed) -> gotGetFailed.set(true));
+ replyWaiter.waitFor(() -> gotGetFailed.get() && gotPutFailed.get());
+ Future<?> result = replyWaiter.send(fcpMessage);
+ assertThat(result.isDone(), is(false));
+ replyWaiter.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed")));
+ assertThat(result.isDone(), is(false));
+ replyWaiter.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed")));
+ result.get();
+ }
+
+ @Test
+ public void waitingForConnectionClosureWorks() throws IOException, ExecutionException, InterruptedException {
+ AtomicReference<Throwable> receivedThrowable = new AtomicReference<>();
+ replyWaiter.handleClose().with((e) -> receivedThrowable.set(e));
+ replyWaiter.waitFor(() -> receivedThrowable.get() != null);
+ Future<?> result = replyWaiter.send(fcpMessage);
+ Throwable throwable = new Throwable();
+ replyWaiter.connectionClosed(fcpConnection, throwable);
+ result.get();
+ assertThat(receivedThrowable.get(), is(throwable));
+ }
+
+}