From 075f351f114b8a58f82a8e2d2e295b987237d66e Mon Sep 17 00:00:00 2001 From: =?utf8?q?David=20=E2=80=98Bombe=E2=80=99=20Roden?= Date: Sun, 28 Jun 2015 00:01:03 +0200 Subject: [PATCH] =?utf8?q?Add=20new=20=E2=80=9Cquelaton=E2=80=9D=20FCP=20c?= =?utf8?q?lient=20API?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- pom.xml | 9 +- .../fcp/quelaton/DefaultFcpClient.java | 97 ++++++ .../net/pterodactylus/fcp/quelaton/FcpClient.java | 12 + .../fcp/quelaton/FcpReplySequence.java | 368 ++++++++++++++++++++ .../fcp/quelaton/GenerateKeypairCommand.java | 16 + .../net/pterodactylus/fcp/fake/FakeTcpServer.java | 55 +++ .../pterodactylus/fcp/fake/FakeTcpServerTest.java | 61 ++++ .../net/pterodactylus/fcp/fake/TextSocket.java | 72 ++++ .../fcp/quelaton/DefaultFcpClientTest.java | 68 ++++ .../fcp/quelaton/FcpReplySequenceTest.java | 382 +++++++++++++++++++++ 10 files changed, 1138 insertions(+), 2 deletions(-) create mode 100644 src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java create mode 100644 src/main/java/net/pterodactylus/fcp/quelaton/FcpClient.java create mode 100644 src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java create mode 100644 src/main/java/net/pterodactylus/fcp/quelaton/GenerateKeypairCommand.java create mode 100644 src/test/java/net/pterodactylus/fcp/fake/FakeTcpServer.java create mode 100644 src/test/java/net/pterodactylus/fcp/fake/FakeTcpServerTest.java create mode 100644 src/test/java/net/pterodactylus/fcp/fake/TextSocket.java create mode 100644 src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java create mode 100644 src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java diff --git a/pom.xml b/pom.xml index 0e832fa..4c9da2a 100644 --- a/pom.xml +++ b/pom.xml @@ -25,6 +25,11 @@ 1.3 + org.mockito + mockito-all + 1.10.19 + + com.google.guava guava 16.0.1 @@ -46,8 +51,8 @@ org.apache.maven.plugins maven-compiler-plugin - 1.6 - 1.6 + 1.8 + 1.8 UTF-8 diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java new file mode 100644 index 0000000..e0c6e12 --- /dev/null +++ b/src/main/java/net/pterodactylus/fcp/quelaton/DefaultFcpClient.java @@ -0,0 +1,97 @@ +package net.pterodactylus.fcp.quelaton; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import net.pterodactylus.fcp.ClientHello; +import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; +import net.pterodactylus.fcp.FcpConnection; +import net.pterodactylus.fcp.FcpKeyPair; +import net.pterodactylus.fcp.GenerateSSK; +import net.pterodactylus.fcp.NodeHello; +import net.pterodactylus.fcp.SSKKeypair; + +/** + * Default {@link FcpClient} implementation. + * + * @author David ‘Bombe’ Roden + */ +public class DefaultFcpClient implements FcpClient { + + private final ExecutorService threadPool; + private final String hostname; + private final int port; + private final AtomicReference fcpConnection = new AtomicReference<>(); + private final Supplier clientName; + private final Supplier expectedVersion; + + public DefaultFcpClient(ExecutorService threadPool, String hostname, int port, Supplier clientName, + Supplier expectedVersion) { + this.threadPool = threadPool; + this.hostname = hostname; + this.port = port; + this.clientName = clientName; + this.expectedVersion = expectedVersion; + } + + private void connect() throws IOException { + if (fcpConnection.get() != null) { + return; + } + fcpConnection.compareAndSet(null, createConnection()); + } + + private FcpConnection createConnection() throws IOException { + FcpConnection connection = new FcpConnection(hostname, port); + connection.connect(); + AtomicReference receivedNodeHello = new AtomicReference<>(); + AtomicBoolean receivedClosed = new AtomicBoolean(); + FcpReplySequence nodeHelloSequence = new FcpReplySequence(threadPool, connection); + nodeHelloSequence + .handle(NodeHello.class) + .with((nodeHello) -> receivedNodeHello.set(nodeHello)); + nodeHelloSequence + .handle(CloseConnectionDuplicateClientName.class) + .with((closeConnection) -> receivedClosed.set(true)); + nodeHelloSequence.waitFor(() -> receivedNodeHello.get() != null || receivedClosed.get()); + ClientHello clientHello = new ClientHello(clientName.get(), expectedVersion.get()); + try { + nodeHelloSequence.send(clientHello).get(); + } catch (InterruptedException | ExecutionException e) { + connection.close(); + throw new IOException(String.format("Could not connect to %s:%d.", hostname, port), e); + } + return connection; + } + + @Override + public GenerateKeypairCommand generateKeypair() { + return new GenerateKeypairCommandImpl(); + } + + private class GenerateKeypairCommandImpl implements GenerateKeypairCommand { + + @Override + public Future execute() { + return threadPool.submit(() -> { + connect(); + GenerateSSK generateSSK = new GenerateSSK(); + AtomicReference keyPair = new AtomicReference<>(); + FcpReplySequence replySequence = new FcpReplySequence(threadPool, fcpConnection.get()); + replySequence.handle(SSKKeypair.class) + .with((message) -> keyPair.set( + new FcpKeyPair(message.getRequestURI(), message.getInsertURI()))); + replySequence.waitFor(() -> keyPair.get() != null); + replySequence.send(generateSSK).get(); + return keyPair.get(); + }); + } + + } + +} diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/FcpClient.java b/src/main/java/net/pterodactylus/fcp/quelaton/FcpClient.java new file mode 100644 index 0000000..065f68a --- /dev/null +++ b/src/main/java/net/pterodactylus/fcp/quelaton/FcpClient.java @@ -0,0 +1,12 @@ +package net.pterodactylus.fcp.quelaton; + +/** + * FCP client used to communicate with a Freenet node. + * + * @author David ‘Bombe’ Roden + */ +public interface FcpClient { + + GenerateKeypairCommand generateKeypair(); + +} diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java new file mode 100644 index 0000000..0c54d7c --- /dev/null +++ b/src/main/java/net/pterodactylus/fcp/quelaton/FcpReplySequence.java @@ -0,0 +1,368 @@ +package net.pterodactylus.fcp.quelaton; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.function.Consumer; +import java.util.function.Supplier; + +import net.pterodactylus.fcp.AllData; +import net.pterodactylus.fcp.BaseMessage; +import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; +import net.pterodactylus.fcp.ConfigData; +import net.pterodactylus.fcp.DataFound; +import net.pterodactylus.fcp.EndListPeerNotes; +import net.pterodactylus.fcp.EndListPeers; +import net.pterodactylus.fcp.EndListPersistentRequests; +import net.pterodactylus.fcp.FCPPluginReply; +import net.pterodactylus.fcp.FcpConnection; +import net.pterodactylus.fcp.FcpListener; +import net.pterodactylus.fcp.FcpMessage; +import net.pterodactylus.fcp.FinishedCompression; +import net.pterodactylus.fcp.GetFailed; +import net.pterodactylus.fcp.IdentifierCollision; +import net.pterodactylus.fcp.NodeData; +import net.pterodactylus.fcp.NodeHello; +import net.pterodactylus.fcp.Peer; +import net.pterodactylus.fcp.PeerNote; +import net.pterodactylus.fcp.PeerRemoved; +import net.pterodactylus.fcp.PersistentGet; +import net.pterodactylus.fcp.PersistentPut; +import net.pterodactylus.fcp.PersistentPutDir; +import net.pterodactylus.fcp.PersistentRequestModified; +import net.pterodactylus.fcp.PersistentRequestRemoved; +import net.pterodactylus.fcp.PluginInfo; +import net.pterodactylus.fcp.ProtocolError; +import net.pterodactylus.fcp.PutFailed; +import net.pterodactylus.fcp.PutFetchable; +import net.pterodactylus.fcp.PutSuccessful; +import net.pterodactylus.fcp.ReceivedBookmarkFeed; +import net.pterodactylus.fcp.SSKKeypair; +import net.pterodactylus.fcp.SentFeed; +import net.pterodactylus.fcp.SimpleProgress; +import net.pterodactylus.fcp.StartedCompression; +import net.pterodactylus.fcp.SubscribedUSKUpdate; +import net.pterodactylus.fcp.TestDDAComplete; +import net.pterodactylus.fcp.TestDDAReply; +import net.pterodactylus.fcp.URIGenerated; +import net.pterodactylus.fcp.UnknownNodeIdentifier; +import net.pterodactylus.fcp.UnknownPeerNoteType; + +/** + * An FCP reply sequence enables you to conveniently wait for a specific set of FCP replies. + * + * @author David ‘Bombe’ Roden + */ +public class FcpReplySequence implements AutoCloseable, FcpListener { + + private final ExecutorService executorService; + private final FcpConnection fcpConnection; + private final Map, Consumer> expectedMessageActions = new HashMap<>(); + private final List> unknownMessageHandlers = new ArrayList<>(); + private final List> closeHandlers = new ArrayList<>(); + private Supplier endPredicate; + + public FcpReplySequence(ExecutorService executorService, FcpConnection fcpConnection) { + this.executorService = executorService; + this.fcpConnection = fcpConnection; + } + + public $1 handle(Class messageClass) { + return new $1<>(messageClass); + } + + public class $1 { + + private Class messageClass; + + private $1(Class messageClass) { + this.messageClass = messageClass; + } + + public FcpReplySequence with(Consumer action) { + expectedMessageActions.put(messageClass, (Consumer) action); + return FcpReplySequence.this; + } + + } + + public $2 handleUnknown() { + return new $2(); + } + + public class $2 { + + public FcpReplySequence with(Consumer consumer) { + unknownMessageHandlers.add(consumer); + return FcpReplySequence.this; + } + + } + + public $3 handleClose() { + return new $3(); + } + + public class $3 { + + public FcpReplySequence with(Consumer consumer) { + closeHandlers.add(consumer); + return FcpReplySequence.this; + } + + } + + public void waitFor(Supplier endPredicate) { + this.endPredicate = endPredicate; + } + + public Future send(FcpMessage fcpMessage) throws IOException { + fcpConnection.addFcpListener(this); + fcpConnection.sendMessage(fcpMessage); + return executorService.submit(() -> { + synchronized (endPredicate) { + while (!endPredicate.get()) { + endPredicate.wait(); + } + } + return null; + }); + } + + @Override + public void close() { + fcpConnection.removeFcpListener(this); + } + + private void consume(Class fcpMessageClass, BaseMessage fcpMessage) { + if (expectedMessageActions.containsKey(fcpMessageClass)) { + expectedMessageActions.get(fcpMessageClass).accept(fcpMessage); + } + synchronized (endPredicate) { + endPredicate.notifyAll(); + } + } + + private void consumeUnknown(FcpMessage fcpMessage) { + for (Consumer unknownMessageHandler : unknownMessageHandlers) { + unknownMessageHandler.accept(fcpMessage); + } + synchronized (endPredicate) { + endPredicate.notifyAll(); + } + } + + private void consumeClose(Throwable throwable) { + for (Consumer closeHandler : closeHandlers) { + closeHandler.accept(throwable); + } + synchronized (endPredicate) { + endPredicate.notifyAll(); + } + } + + @Override + public void receivedNodeHello(FcpConnection fcpConnection, NodeHello nodeHello) { + consume(NodeHello.class, nodeHello); + } + + @Override + public void receivedCloseConnectionDuplicateClientName(FcpConnection fcpConnection, + CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { + consume(CloseConnectionDuplicateClientName.class, closeConnectionDuplicateClientName); + } + + @Override + public void receivedSSKKeypair(FcpConnection fcpConnection, SSKKeypair sskKeypair) { + consume(SSKKeypair.class, sskKeypair); + } + + @Override + public void receivedPeer(FcpConnection fcpConnection, Peer peer) { + consume(Peer.class, peer); + } + + @Override + public void receivedEndListPeers(FcpConnection fcpConnection, EndListPeers endListPeers) { + consume(EndListPeers.class, endListPeers); + } + + @Override + public void receivedPeerNote(FcpConnection fcpConnection, PeerNote peerNote) { + consume(PeerNote.class, peerNote); + } + + @Override + public void receivedEndListPeerNotes(FcpConnection fcpConnection, EndListPeerNotes endListPeerNotes) { + consume(EndListPeerNotes.class, endListPeerNotes); + } + + @Override + public void receivedPeerRemoved(FcpConnection fcpConnection, PeerRemoved peerRemoved) { + consume(PeerRemoved.class, peerRemoved); + } + + @Override + public void receivedNodeData(FcpConnection fcpConnection, NodeData nodeData) { + consume(NodeData.class, nodeData); + } + + @Override + public void receivedTestDDAReply(FcpConnection fcpConnection, TestDDAReply testDDAReply) { + consume(TestDDAReply.class, testDDAReply); + } + + @Override + public void receivedTestDDAComplete(FcpConnection fcpConnection, TestDDAComplete testDDAComplete) { + consume(TestDDAComplete.class, testDDAComplete); + } + + @Override + public void receivedPersistentGet(FcpConnection fcpConnection, PersistentGet persistentGet) { + consume(PersistentGet.class, persistentGet); + } + + @Override + public void receivedPersistentPut(FcpConnection fcpConnection, PersistentPut persistentPut) { + consume(PersistentPut.class, persistentPut); + } + + @Override + public void receivedEndListPersistentRequests(FcpConnection fcpConnection, + EndListPersistentRequests endListPersistentRequests) { + consume(EndListPersistentRequests.class, endListPersistentRequests); + } + + @Override + public void receivedURIGenerated(FcpConnection fcpConnection, URIGenerated uriGenerated) { + consume(URIGenerated.class, uriGenerated); + } + + @Override + public void receivedDataFound(FcpConnection fcpConnection, DataFound dataFound) { + consume(DataFound.class, dataFound); + } + + @Override + public void receivedAllData(FcpConnection fcpConnection, AllData allData) { + consume(AllData.class, allData); + } + + @Override + public void receivedSimpleProgress(FcpConnection fcpConnection, SimpleProgress simpleProgress) { + consume(SimpleProgress.class, simpleProgress); + } + + @Override + public void receivedStartedCompression(FcpConnection fcpConnection, StartedCompression startedCompression) { + consume(StartedCompression.class, startedCompression); + } + + @Override + public void receivedFinishedCompression(FcpConnection fcpConnection, FinishedCompression finishedCompression) { + consume(FinishedCompression.class, finishedCompression); + } + + @Override + public void receivedUnknownPeerNoteType(FcpConnection fcpConnection, UnknownPeerNoteType unknownPeerNoteType) { + consume(UnknownPeerNoteType.class, unknownPeerNoteType); + } + + @Override + public void receivedUnknownNodeIdentifier(FcpConnection fcpConnection, + UnknownNodeIdentifier unknownNodeIdentifier) { + consume(UnknownNodeIdentifier.class, unknownNodeIdentifier); + } + + @Override + public void receivedConfigData(FcpConnection fcpConnection, ConfigData configData) { + consume(ConfigData.class, configData); + } + + @Override + public void receivedGetFailed(FcpConnection fcpConnection, GetFailed getFailed) { + consume(GetFailed.class, getFailed); + } + + @Override + public void receivedPutFailed(FcpConnection fcpConnection, PutFailed putFailed) { + consume(PutFailed.class, putFailed); + } + + @Override + public void receivedIdentifierCollision(FcpConnection fcpConnection, IdentifierCollision identifierCollision) { + consume(IdentifierCollision.class, identifierCollision); + } + + @Override + public void receivedPersistentPutDir(FcpConnection fcpConnection, PersistentPutDir persistentPutDir) { + consume(PersistentPutDir.class, persistentPutDir); + } + + @Override + public void receivedPersistentRequestRemoved(FcpConnection fcpConnection, + PersistentRequestRemoved persistentRequestRemoved) { + consume(PersistentRequestRemoved.class, persistentRequestRemoved); + } + + @Override + public void receivedSubscribedUSKUpdate(FcpConnection fcpConnection, SubscribedUSKUpdate subscribedUSKUpdate) { + consume(SubscribedUSKUpdate.class, subscribedUSKUpdate); + } + + @Override + public void receivedPluginInfo(FcpConnection fcpConnection, PluginInfo pluginInfo) { + consume(PluginInfo.class, pluginInfo); + } + + @Override + public void receivedFCPPluginReply(FcpConnection fcpConnection, FCPPluginReply fcpPluginReply) { + consume(FCPPluginReply.class, fcpPluginReply); + } + + @Override + public void receivedPersistentRequestModified(FcpConnection fcpConnection, + PersistentRequestModified persistentRequestModified) { + consume(PersistentRequestModified.class, persistentRequestModified); + } + + @Override + public void receivedPutSuccessful(FcpConnection fcpConnection, PutSuccessful putSuccessful) { + consume(PutSuccessful.class, putSuccessful); + } + + @Override + public void receivedPutFetchable(FcpConnection fcpConnection, PutFetchable putFetchable) { + consume(PutFetchable.class, putFetchable); + } + + @Override + public void receivedSentFeed(FcpConnection source, SentFeed sentFeed) { + consume(SentFeed.class, sentFeed); + } + + @Override + public void receivedBookmarkFeed(FcpConnection fcpConnection, ReceivedBookmarkFeed receivedBookmarkFeed) { + consume(ReceivedBookmarkFeed.class, receivedBookmarkFeed); + } + + @Override + public void receivedProtocolError(FcpConnection fcpConnection, ProtocolError protocolError) { + consume(ProtocolError.class, protocolError); + } + + @Override + public void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) { + consumeUnknown(fcpMessage); + } + + @Override + public void connectionClosed(FcpConnection fcpConnection, Throwable throwable) { + consumeClose(throwable); + } + +} diff --git a/src/main/java/net/pterodactylus/fcp/quelaton/GenerateKeypairCommand.java b/src/main/java/net/pterodactylus/fcp/quelaton/GenerateKeypairCommand.java new file mode 100644 index 0000000..b919537 --- /dev/null +++ b/src/main/java/net/pterodactylus/fcp/quelaton/GenerateKeypairCommand.java @@ -0,0 +1,16 @@ +package net.pterodactylus.fcp.quelaton; + +import net.pterodactylus.fcp.FcpKeyPair; + +import java.util.concurrent.Future; + +/** + * Command to generate an SSK key pair. + * + * @author David ‘Bombe’ Roden + */ +public interface GenerateKeypairCommand { + + Future execute(); + +} diff --git a/src/test/java/net/pterodactylus/fcp/fake/FakeTcpServer.java b/src/test/java/net/pterodactylus/fcp/fake/FakeTcpServer.java new file mode 100644 index 0000000..07afb24 --- /dev/null +++ b/src/test/java/net/pterodactylus/fcp/fake/FakeTcpServer.java @@ -0,0 +1,55 @@ +package net.pterodactylus.fcp.fake; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + +import org.hamcrest.Matcher; + +/** + * TODO + * + * @author David ‘Bombe’ Roden + */ +public class FakeTcpServer { + + private final ServerSocket serverSocket; + private final ExecutorService executorService; + private final AtomicReference clientSocket = new AtomicReference<>(); + + public FakeTcpServer(ExecutorService executorService) throws IOException { + this.executorService = executorService; + this.serverSocket = new ServerSocket(0); + } + + public int getPort() { + return serverSocket.getLocalPort(); + } + + public Future connect() throws IOException { + return executorService.submit(new Callable() { + @Override + public Void call() throws Exception { + clientSocket.set(new TextSocket(serverSocket.accept())); + return null; + } + }); + } + + public List collectUntil(Matcher lineMatcher) throws IOException { + return clientSocket.get().collectUntil(lineMatcher); + } + + public void writeLine(String line) throws IOException { + clientSocket.get().writeLine(line); + } + + public String readLine() throws IOException { + return clientSocket.get().readLine(); + } + +} diff --git a/src/test/java/net/pterodactylus/fcp/fake/FakeTcpServerTest.java b/src/test/java/net/pterodactylus/fcp/fake/FakeTcpServerTest.java new file mode 100644 index 0000000..976dcd6 --- /dev/null +++ b/src/test/java/net/pterodactylus/fcp/fake/FakeTcpServerTest.java @@ -0,0 +1,61 @@ +package net.pterodactylus.fcp.fake; + +import static java.util.Arrays.asList; +import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import java.io.IOException; +import java.net.Proxy; +import java.net.ProxySelector; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.URI; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import org.junit.Test; + +/** + * TODO + * + * @author David ‘Bombe’ Roden + */ +public class FakeTcpServerTest { + + private final ExecutorService sameThread = Executors.newSingleThreadExecutor(); + private final FakeTcpServer tcpServer; + + public FakeTcpServerTest() throws IOException { + this.tcpServer = new FakeTcpServer(sameThread); + } + + @Test + public void testConnect() throws IOException, ExecutionException, InterruptedException { + ProxySelector.setDefault(new ProxySelector() { + @Override + public List select(URI uri) { + return asList(Proxy.NO_PROXY); + } + + @Override + public void connectFailed(URI uri, SocketAddress sa, IOException ioe) { + } + }); + tcpServer.connect(); + try (TextSocket clientSocket = new TextSocket(new Socket("127.0.0.1", tcpServer.getPort()))) { + clientSocket.writeLine("Hello"); + clientSocket.writeLine("Bye"); + List receivedLines = tcpServer.collectUntil(is("Bye")); + assertThat(receivedLines, contains("Hello", "Bye")); + tcpServer.writeLine("Yes"); + tcpServer.writeLine("Quit"); + receivedLines = clientSocket.collectUntil(is("Quit")); + assertThat(receivedLines, contains("Yes", "Quit")); + } + } + +} diff --git a/src/test/java/net/pterodactylus/fcp/fake/TextSocket.java b/src/test/java/net/pterodactylus/fcp/fake/TextSocket.java new file mode 100644 index 0000000..c85f346 --- /dev/null +++ b/src/test/java/net/pterodactylus/fcp/fake/TextSocket.java @@ -0,0 +1,72 @@ +package net.pterodactylus.fcp.fake; + +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.net.Socket; +import java.util.ArrayList; +import java.util.List; + +import org.hamcrest.Matcher; + +/** + * TODO + * + * @author David ‘Bombe’ Roden + */ +class TextSocket implements Closeable { + + private final Socket socket; + private final InputStream socketInput; + private final OutputStream socketOutput; + private final BufferedReader inputReader; + private final Writer outputWriter; + + TextSocket(Socket socket) throws IOException { + this.socket = socket; + this.socketInput = socket.getInputStream(); + this.socketOutput = socket.getOutputStream(); + this.inputReader = new BufferedReader(new InputStreamReader(socketInput, "UTF-8")); + this.outputWriter = new OutputStreamWriter(socketOutput, "UTF-8"); + } + + public String readLine() throws IOException { + return inputReader.readLine(); + } + + public void writeLine(String line) throws IOException { + outputWriter.write(line + "\n"); + outputWriter.flush(); + } + + public List collectUntil(Matcher lineMatcher) throws IOException { + List collectedLines = new ArrayList(); + while (true) { + String line = readLine(); + if (line == null) { + throw new EOFException(); + } + collectedLines.add(line); + if (lineMatcher.matches(line)) { + break; + } + } + return collectedLines; + } + + @Override + public void close() throws IOException { + outputWriter.close(); + inputReader.close(); + socketOutput.close(); + socketInput.close(); + socket.close(); + } + +} diff --git a/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java b/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java new file mode 100644 index 0000000..86e7fae --- /dev/null +++ b/src/test/java/net/pterodactylus/fcp/quelaton/DefaultFcpClientTest.java @@ -0,0 +1,68 @@ +package net.pterodactylus.fcp.quelaton; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import net.pterodactylus.fcp.FcpKeyPair; +import net.pterodactylus.fcp.fake.FakeTcpServer; + +import org.junit.Test; + +/** + * Unit test for {@link DefaultFcpClient}. + * + * @author David ‘Bombe’ Roden + */ +public class DefaultFcpClientTest { + + private final ExecutorService threadPool = Executors.newCachedThreadPool(); + private final FakeTcpServer fcpServer; + private final DefaultFcpClient fcpClient; + + public DefaultFcpClientTest() throws IOException { + fcpServer = new FakeTcpServer(threadPool); + fcpClient = new DefaultFcpClient(threadPool, "localhost", fcpServer.getPort(), () -> "Test", () -> "2.0"); + } + + @Test + public void defaultFcpClientCanGenerateKeypair() throws ExecutionException, InterruptedException, IOException { + Future keyPairFuture = fcpClient.generateKeypair().execute(); + connectNode(); + fcpServer.collectUntil(is("EndMessage")); + fcpServer.writeLine("SSKKeypair\n" + + "InsertURI=freenet:SSK@AKTTKG6YwjrHzWo67laRcoPqibyiTdyYufjVg54fBlWr,AwUSJG5ZS-FDZTqnt6skTzhxQe08T-fbKXj8aEHZsXM/\n" + + "RequestURI=freenet:SSK@BnHXXv3Fa43w~~iz1tNUd~cj4OpUuDjVouOWZ5XlpX0,AwUSJG5ZS-FDZTqnt6skTzhxQe08T-fbKXj8aEHZsXM,AQABAAE/\n" + + "Identifier=My Identifier from GenerateSSK\n" + + "EndMessage"); + FcpKeyPair keyPair = keyPairFuture.get(); + assertThat(keyPair.getPublicKey(), + is("freenet:SSK@BnHXXv3Fa43w~~iz1tNUd~cj4OpUuDjVouOWZ5XlpX0,AwUSJG5ZS-FDZTqnt6skTzhxQe08T-fbKXj8aEHZsXM,AQABAAE/")); + assertThat(keyPair.getPrivateKey(), is( + "freenet:SSK@AKTTKG6YwjrHzWo67laRcoPqibyiTdyYufjVg54fBlWr,AwUSJG5ZS-FDZTqnt6skTzhxQe08T-fbKXj8aEHZsXM/")); + } + + private void connectNode() throws InterruptedException, ExecutionException, IOException { + fcpServer.connect().get(); + fcpServer.collectUntil(is("EndMessage")); + fcpServer.writeLine("NodeHello\n" + + "FCPVersion=2.0\n" + + "ConnectionIdentifier=754595fc35701d76096d8279d15c57e6\n" + + "Version=Fred,0.7,1.0,1231\n" + + "Node=Fred\n" + + "NodeLanguage=ENGLISH\n" + + "ExtRevision=23771\n" + + "Build=1231\n" + + "Testnet=false\n" + + "ExtBuild=26\n" + + "CompressionCodecs=3 - GZIP(0), BZIP2(1), LZMA(2)\n" + + "Revision=@custom@\n" + + "EndMessage"); + } + +} diff --git a/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java b/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java new file mode 100644 index 0000000..fb849fe --- /dev/null +++ b/src/test/java/net/pterodactylus/fcp/quelaton/FcpReplySequenceTest.java @@ -0,0 +1,382 @@ +package net.pterodactylus.fcp.quelaton; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import net.pterodactylus.fcp.AllData; +import net.pterodactylus.fcp.BaseMessage; +import net.pterodactylus.fcp.CloseConnectionDuplicateClientName; +import net.pterodactylus.fcp.ConfigData; +import net.pterodactylus.fcp.DataFound; +import net.pterodactylus.fcp.EndListPeerNotes; +import net.pterodactylus.fcp.EndListPeers; +import net.pterodactylus.fcp.EndListPersistentRequests; +import net.pterodactylus.fcp.FCPPluginReply; +import net.pterodactylus.fcp.FcpConnection; +import net.pterodactylus.fcp.FcpMessage; +import net.pterodactylus.fcp.FinishedCompression; +import net.pterodactylus.fcp.GetFailed; +import net.pterodactylus.fcp.IdentifierCollision; +import net.pterodactylus.fcp.NodeData; +import net.pterodactylus.fcp.NodeHello; +import net.pterodactylus.fcp.Peer; +import net.pterodactylus.fcp.PeerNote; +import net.pterodactylus.fcp.PeerRemoved; +import net.pterodactylus.fcp.PersistentGet; +import net.pterodactylus.fcp.PersistentPut; +import net.pterodactylus.fcp.PersistentPutDir; +import net.pterodactylus.fcp.PersistentRequestModified; +import net.pterodactylus.fcp.PersistentRequestRemoved; +import net.pterodactylus.fcp.PluginInfo; +import net.pterodactylus.fcp.ProtocolError; +import net.pterodactylus.fcp.PutFailed; +import net.pterodactylus.fcp.PutFetchable; +import net.pterodactylus.fcp.PutSuccessful; +import net.pterodactylus.fcp.ReceivedBookmarkFeed; +import net.pterodactylus.fcp.SSKKeypair; +import net.pterodactylus.fcp.SentFeed; +import net.pterodactylus.fcp.SimpleProgress; +import net.pterodactylus.fcp.StartedCompression; +import net.pterodactylus.fcp.SubscribedUSKUpdate; +import net.pterodactylus.fcp.TestDDAComplete; +import net.pterodactylus.fcp.TestDDAReply; +import net.pterodactylus.fcp.URIGenerated; +import net.pterodactylus.fcp.UnknownNodeIdentifier; +import net.pterodactylus.fcp.UnknownPeerNoteType; + +import org.junit.Test; + +/** + * Unit test for {@link FcpReplySequence}. + * + * @author David ‘Bombe’ Roden + */ +public class FcpReplySequenceTest { + + private final FcpConnection fcpConnection = mock(FcpConnection.class); + private final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private final FcpReplySequence replyWaiter = new FcpReplySequence(executorService, fcpConnection); + private final FcpMessage fcpMessage = new FcpMessage("Test"); + + @Test + public void canSendMessage() throws IOException { + replyWaiter.send(fcpMessage); + verify(fcpConnection).sendMessage(fcpMessage); + } + + @Test + public void sendingAMessageRegistersTheWaiterAsFcpListener() throws IOException { + replyWaiter.send(fcpMessage); + verify(fcpConnection).addFcpListener(replyWaiter); + } + + @Test + public void closingTheReplyWaiterRemovesTheFcpListener() throws IOException { + replyWaiter.send(fcpMessage); + replyWaiter.close(); + verify(fcpConnection).removeFcpListener(replyWaiter); + } + + private void waitForASpecificMessage(MessageReceiver messageReceiver, + Class messageClass, Supplier message) throws IOException, InterruptedException, ExecutionException { + AtomicBoolean gotMessage = setupMessage(messageClass); + Future result = replyWaiter.send(fcpMessage); + sendMessage(messageReceiver, message.get()); + result.get(); + assertThat(gotMessage.get(), is(true)); + } + + private void sendMessage(MessageReceiver messageReceiver, M message) { + messageReceiver.receive(fcpConnection, message); + } + + private interface MessageReceiver { + + void receive(FcpConnection fcpConnection, M message); + } + + private AtomicBoolean setupMessage(Class messageClass) { + AtomicBoolean gotMessage = new AtomicBoolean(); + replyWaiter.handle(messageClass).with((message) -> gotMessage.set(true)); + replyWaiter.waitFor(() -> gotMessage.get()); + return gotMessage; + } + + @Test + public void waitingForNodeHelloWorks() throws IOException, ExecutionException, InterruptedException { + waitForASpecificMessage(replyWaiter::receivedNodeHello, NodeHello.class, + () -> new NodeHello(new FcpMessage("NodeHello"))); + } + + @Test + public void waitingForConnectionClosedDuplicateClientNameWorks() + throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedCloseConnectionDuplicateClientName, + CloseConnectionDuplicateClientName.class, + () -> new CloseConnectionDuplicateClientName(new FcpMessage("CloseConnectionDuplicateClientName"))); + } + + @Test + public void waitingForSSKKeypairWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedSSKKeypair, SSKKeypair.class, + () -> new SSKKeypair(new FcpMessage("SSKKeypair"))); + } + + @Test + public void waitForPeerWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedPeer, Peer.class, () -> new Peer(new FcpMessage("Peer"))); + } + + @Test + public void waitForEndListPeersWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedEndListPeers, EndListPeers.class, + () -> new EndListPeers(new FcpMessage("EndListPeers"))); + } + + @Test + public void waitForPeerNoteWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedPeerNote, PeerNote.class, + () -> new PeerNote(new FcpMessage("PeerNote"))); + } + + @Test + public void waitForEndListPeerNotesWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedEndListPeerNotes, EndListPeerNotes.class, + () -> new EndListPeerNotes(new FcpMessage("EndListPeerNotes"))); + } + + @Test + public void waitForPeerRemovedWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedPeerRemoved, PeerRemoved.class, + () -> new PeerRemoved(new FcpMessage("PeerRemoved"))); + } + + @Test + public void waitForNodeDataWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedNodeData, NodeData.class, + () -> new NodeData(new FcpMessage("NodeData").put("ark.pubURI", "") + .put( + "ark.number", "0") + .put("auth.negTypes", "") + .put("version", "0,0,0,0") + .put("lastGoodVersion", "0,0,0,0"))); + } + + @Test + public void waitForTestDDAReplyWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedTestDDAReply, TestDDAReply.class, + () -> new TestDDAReply(new FcpMessage("TestDDAReply"))); + } + + @Test + public void waitForTestDDACompleteWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedTestDDAComplete, TestDDAComplete.class, + () -> new TestDDAComplete(new FcpMessage("TestDDAComplete"))); + } + + @Test + public void waitForPersistentGetWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedPersistentGet, PersistentGet.class, + () -> new PersistentGet(new FcpMessage("PersistentGet"))); + } + + @Test + public void waitForPersistentPutWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedPersistentPut, PersistentPut.class, + () -> new PersistentPut(new FcpMessage("PersistentPut"))); + } + + @Test + public void waitForEndListPersistentRequestsWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedEndListPersistentRequests, EndListPersistentRequests.class, + () -> new EndListPersistentRequests(new FcpMessage("EndListPersistentRequests"))); + } + + @Test + public void waitForURIGeneratedWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedURIGenerated, URIGenerated.class, + () -> new URIGenerated(new FcpMessage("URIGenerated"))); + } + + @Test + public void waitForDataFoundWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedDataFound, DataFound.class, + () -> new DataFound(new FcpMessage("DataFound"))); + } + + @Test + public void waitForAllDataWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedAllData, AllData.class, + () -> new AllData(new FcpMessage("AllData"), null)); + } + + @Test + public void waitForSimpleProgressWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedSimpleProgress, SimpleProgress.class, + () -> new SimpleProgress(new FcpMessage("SimpleProgress"))); + } + + @Test + public void waitForStartedCompressionWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedStartedCompression, StartedCompression.class, + () -> new StartedCompression(new FcpMessage("StartedCompression"))); + } + + @Test + public void waitForFinishedCompressionWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedFinishedCompression, FinishedCompression.class, + () -> new FinishedCompression(new FcpMessage("FinishedCompression"))); + } + + @Test + public void waitForUnknownPeerNoteTypeWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedUnknownPeerNoteType, UnknownPeerNoteType.class, + () -> new UnknownPeerNoteType(new FcpMessage("UnknownPeerNoteType"))); + } + + @Test + public void waitForUnknownNodeIdentifierWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedUnknownNodeIdentifier, UnknownNodeIdentifier.class, + () -> new UnknownNodeIdentifier(new FcpMessage("UnknownNodeIdentifier"))); + } + + @Test + public void waitForConfigDataWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedConfigData, ConfigData.class, + () -> new ConfigData(new FcpMessage("ConfigData"))); + } + + @Test + public void waitForGetFailedWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedGetFailed, GetFailed.class, + () -> new GetFailed(new FcpMessage("GetFailed"))); + } + + @Test + public void waitForPutFailedWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedPutFailed, PutFailed.class, + () -> new PutFailed(new FcpMessage("PutFailed"))); + } + + @Test + public void waitForIdentifierCollisionWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedIdentifierCollision, IdentifierCollision.class, + () -> new IdentifierCollision(new FcpMessage("IdentifierCollision"))); + } + + @Test + public void waitForPersistentPutDirWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedPersistentPutDir, PersistentPutDir.class, + () -> new PersistentPutDir(new FcpMessage("PersistentPutDir"))); + } + + @Test + public void waitForPersistentRequestRemovedWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedPersistentRequestRemoved, PersistentRequestRemoved.class, + () -> new PersistentRequestRemoved(new FcpMessage("PersistentRequestRemoved"))); + } + + @Test + public void waitForSubscribedUSKUpdateWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedSubscribedUSKUpdate, SubscribedUSKUpdate.class, + () -> new SubscribedUSKUpdate(new FcpMessage("SubscribedUSKUpdate"))); + } + + @Test + public void waitForPluginInfoWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedPluginInfo, PluginInfo.class, + () -> new PluginInfo(new FcpMessage("PluginInfo"))); + } + + @Test + public void waitForFCPPluginReply() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedFCPPluginReply, FCPPluginReply.class, + () -> new FCPPluginReply(new FcpMessage("FCPPluginReply"), null)); + } + + @Test + public void waitForPersistentRequestModifiedWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedPersistentRequestModified, PersistentRequestModified.class, + () -> new PersistentRequestModified(new FcpMessage("PersistentRequestModified"))); + } + + @Test + public void waitForPutSuccessfulWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedPutSuccessful, PutSuccessful.class, + () -> new PutSuccessful(new FcpMessage("PutSuccessful"))); + } + + @Test + public void waitForPutFetchableWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedPutFetchable, PutFetchable.class, + () -> new PutFetchable(new FcpMessage("PutFetchable"))); + } + + @Test + public void waitForSentFeedWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedSentFeed, SentFeed.class, + () -> new SentFeed(new FcpMessage("SentFeed"))); + } + + @Test + public void waitForReceivedBookmarkFeedWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedBookmarkFeed, ReceivedBookmarkFeed.class, + () -> new ReceivedBookmarkFeed(new FcpMessage("ReceivedBookmarkFeed"))); + } + + @Test + public void waitForProtocolErrorWorks() throws InterruptedException, ExecutionException, IOException { + waitForASpecificMessage(replyWaiter::receivedProtocolError, ProtocolError.class, + () -> new ProtocolError(new FcpMessage("ProtocolError"))); + } + + @Test + public void waitForUnknownMessageWorks() throws IOException, ExecutionException, InterruptedException { + AtomicReference receivedMessage = new AtomicReference<>(); + replyWaiter.handleUnknown().with((message) -> receivedMessage.set(message)); + replyWaiter.waitFor(() -> receivedMessage.get() != null); + Future result = replyWaiter.send(fcpMessage); + replyWaiter.receivedMessage(fcpConnection, fcpMessage); + result.get(); + assertThat(receivedMessage.get(), is(fcpMessage)); + } + + @Test + public void waitingForMultipleMessagesWorks() throws IOException, ExecutionException, InterruptedException { + AtomicBoolean gotPutFailed = new AtomicBoolean(); + replyWaiter.handle(PutFailed.class).with((getFailed) -> gotPutFailed.set(true)); + AtomicBoolean gotGetFailed = new AtomicBoolean(); + replyWaiter.handle(GetFailed.class).with((getFailed) -> gotGetFailed.set(true)); + replyWaiter.waitFor(() -> gotGetFailed.get() && gotPutFailed.get()); + Future result = replyWaiter.send(fcpMessage); + assertThat(result.isDone(), is(false)); + replyWaiter.receivedGetFailed(fcpConnection, new GetFailed(new FcpMessage("GetFailed"))); + assertThat(result.isDone(), is(false)); + replyWaiter.receivedPutFailed(fcpConnection, new PutFailed(new FcpMessage("PutFailed"))); + result.get(); + } + + @Test + public void waitingForConnectionClosureWorks() throws IOException, ExecutionException, InterruptedException { + AtomicReference receivedThrowable = new AtomicReference<>(); + replyWaiter.handleClose().with((e) -> receivedThrowable.set(e)); + replyWaiter.waitFor(() -> receivedThrowable.get() != null); + Future result = replyWaiter.send(fcpMessage); + Throwable throwable = new Throwable(); + replyWaiter.connectionClosed(fcpConnection, throwable); + result.get(); + assertThat(receivedThrowable.get(), is(throwable)); + } + +} -- 2.7.4