From 72a6784258f756eb23b39a9f8021b400829cbcec Mon Sep 17 00:00:00 2001 From: =?utf8?q?David=20=E2=80=98Bombe=E2=80=99=20Roden?= Date: Wed, 9 Apr 2008 06:44:56 +0000 Subject: [PATCH] current state git-svn-id: http://trooper/svn/projects/jSite/trunk@653 c3eda9e8-030b-0410-8277-bc7414b0a119 --- src/net/pterodactylus/util/fcp/FcpConnection.java | 95 +++++++------ .../util/fcp/FcpConnectionHandler.java | 42 +++++- .../util/fcp/client/FcpHighLevelClient.java | 155 ++++++++++++++++++++- .../util/fcp/client/NodeHelloCallback.java | 1 + 4 files changed, 245 insertions(+), 48 deletions(-) diff --git a/src/net/pterodactylus/util/fcp/FcpConnection.java b/src/net/pterodactylus/util/fcp/FcpConnection.java index 1b4d249..484b46c 100644 --- a/src/net/pterodactylus/util/fcp/FcpConnection.java +++ b/src/net/pterodactylus/util/fcp/FcpConnection.java @@ -56,6 +56,7 @@ public class FcpConnection { private Socket remoteSocket; private InputStream remoteInputStream; private OutputStream remoteOutputStream; + private FcpConnectionHandler connectionHandler; private boolean connected; public FcpConnection(String host, String clientName) throws UnknownHostException { @@ -80,17 +81,36 @@ public class FcpConnection { // LISTENER MANAGEMENT // + /** + * Adds the given listener to the list of listeners. + * + * @param fcpListener + * The listener to add + */ public void addFcpListener(FcpListener fcpListener) { fcpListeners.add(fcpListener); } + /** + * Removes the given listener from the list of listeners. + * + * @param fcpListener + * The listener to remove + */ public void removeFcpListener(FcpListener fcpListener) { fcpListeners.remove(fcpListener); } - private void fireNodeHello(Map nodeProperties) { + /** + * Notifies all registered listeners that a message has been received. + * + * @see FcpListener#receivedMessage(FcpConnection, FcpMessage) + * @param fcpMessage + * The message that was received + */ + private void fireMessageReceived(FcpMessage fcpMessage) { for (FcpListener fcpListener: fcpListeners) { - fcpListener.fcpNodeHello(this, nodeProperties); + fcpListener.receivedMessage(this, fcpMessage); } } @@ -98,20 +118,42 @@ public class FcpConnection { // ACTIONS // - public synchronized void connect() throws FcpException, IOException { - System.out.println("connecting..."); + /** + * Connects to the node. + * + * @throws IOException + * if an I/O error occurs + * @throws IllegalStateException + * if there is already a connection to the node + */ + public synchronized void connect() throws IOException, IllegalStateException { + if (connectionHandler != null) { + throw new IllegalStateException("already connected, disconnect first"); + } remoteSocket = new Socket(address, port); remoteInputStream = remoteSocket.getInputStream(); remoteOutputStream = remoteSocket.getOutputStream(); connected = true; - System.out.println("connected."); - new Thread(new FcpConnectionHandler(this, remoteInputStream)).start(); - sendMessage(clientHelloMessage); + new Thread(connectionHandler = new FcpConnectionHandler(this, remoteInputStream)).start(); } + /** + * Disconnects from the node. If there is no connection to the node, this + * method does nothing. + */ public synchronized void disconnect() { + if (connectionHandler == null) { + return; + } connected = false; Closer.close(remoteSocket); + connectionHandler.stop(); + connectionHandler = null; + } + + public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException { + System.out.println("sending message: " + fcpMessage.getName()); + fcpMessage.write(remoteOutputStream); } /** @@ -124,7 +166,7 @@ public class FcpConnection { * @return The properties of the peer, or null if the peer is * unknown * @throws IOException - * @throws FcpException + * @throws FcpException */ public Map sendListPeer(String nodeIdentifier) throws IOException, FcpException { FcpMessage listPeerMessage = new FcpMessage("ListPeer"); @@ -137,6 +179,10 @@ public class FcpConnection { return null; } + void handleMessage(FcpMessage fcpMessage) { + fireMessageReceived(fcpMessage); + } + public List> sendListPeers(boolean withMetadata, boolean withVolatile) throws IOException, FcpException { FcpMessage listPeersMessage = new FcpMessage("ListPeers"); listPeersMessage.setField("WithMetadata", String.valueOf(withMetadata)); @@ -167,7 +213,7 @@ public class FcpConnection { } return peerNotes; } - + public void sendTestDDARequest(String directory, boolean wantReadDirectory, boolean wantWriteDirectory) throws IOException, FcpException { FcpMessage testDDARequestMessage = new FcpMessage("TestDDARequest"); testDDARequestMessage.setField("Directory", directory); @@ -175,7 +221,7 @@ public class FcpConnection { testDDARequestMessage.setField("WantWriteDirectory", String.valueOf(wantWriteDirectory)); sendMessage(testDDARequestMessage); } - + public FcpKeyPair generateSSK() throws IOException, FcpException { FcpMessage generateSSKMessage = new FcpMessage("GenerateSSK"); String identifier = hashCode() + String.valueOf(System.currentTimeMillis()); @@ -186,40 +232,11 @@ public class FcpConnection { String privateKey = returnMessage.getField("InsertURI"); return new FcpKeyPair(publicKey, privateKey); } - - // - // PACKAGE-PRIVATE METHODS - // - - void handleMessage(FcpMessage fcpMessage) { - synchronized (messageWaitSync) { - while (receivedMessage != null) { - /* previous message has not yet been consumed */ - System.out.println("waiting for message to be consumed..."); - try { - messageWaitSync.wait(); - } catch (InterruptedException ie1) { - } - } - /* TODO - check whether to send events here or later. */ - if ("NodeHello".equals(fcpMessage.getName())) { - fireNodeHello(fcpMessage.getFields()); - } - System.out.println("setting receivedMessage"); - receivedMessage = fcpMessage; - messageWaitSync.notifyAll(); - } - } // // PRIVATE METHODS // - public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException { - System.out.println("sending message: " + fcpMessage.getName()); - fcpMessage.write(remoteOutputStream); - } - public FcpMessage waitForMessage(String... messageNames) throws FcpException { FcpMessage oldMessage = null; synchronized (messageWaitSync) { diff --git a/src/net/pterodactylus/util/fcp/FcpConnectionHandler.java b/src/net/pterodactylus/util/fcp/FcpConnectionHandler.java index 2693c1f..cbea0e2 100644 --- a/src/net/pterodactylus/util/fcp/FcpConnectionHandler.java +++ b/src/net/pterodactylus/util/fcp/FcpConnectionHandler.java @@ -25,26 +25,50 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; /** - * TODO + * Handles an FCP connection to a node. * * @author David ‘Bombe’ Roden <bombe@freenetproject.org> * @version $Id$ */ public class FcpConnectionHandler implements Runnable { + /** The underlying connection. */ private final FcpConnection fcpConnection; + + /** The input stream from the node. */ private final InputStream remoteInputStream; - + + /** Whether to stop the connection handler. */ + private boolean shouldStop; + + /** Whether the next read line feed should be ignored. */ private boolean ignoreNextLinefeed; + /** + * Creates a new connection handler that operates on the given connection + * and input stream. + * + * @param fcpConnection + * The underlying FCP connection + * @param remoteInputStream + * The input stream from the node + */ public FcpConnectionHandler(FcpConnection fcpConnection, InputStream remoteInputStream) { this.fcpConnection = fcpConnection; this.remoteInputStream = remoteInputStream; } + /** + * {@inheritDoc} + */ public void run() { FcpMessage fcpMessage = null; while (true) { + synchronized (this) { + if (shouldStop) { + break; + } + } try { String line = readLine(); System.out.println("read line: " + line); @@ -72,11 +96,25 @@ public class FcpConnectionHandler implements Runnable { String value = line.substring(equalSign + 1); fcpMessage.setField(field, value); } catch (IOException e) { + break; } } } /** + * Stops the connection handler. + */ + public void stop() { + synchronized (this) { + shouldStop = true; + } + } + + // + // PRIVATE METHODS + // + + /** * Reads bytes from {@link #remoteInputStream} until ‘\r’ or ‘\n’ are * encountered and decodes the read bytes using UTF-8. * diff --git a/src/net/pterodactylus/util/fcp/client/FcpHighLevelClient.java b/src/net/pterodactylus/util/fcp/client/FcpHighLevelClient.java index 8fd85b5..5e02dc6 100644 --- a/src/net/pterodactylus/util/fcp/client/FcpHighLevelClient.java +++ b/src/net/pterodactylus/util/fcp/client/FcpHighLevelClient.java @@ -20,9 +20,14 @@ package net.pterodactylus.util.fcp.client; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; import net.pterodactylus.util.fcp.FcpConnection; import net.pterodactylus.util.fcp.FcpException; +import net.pterodactylus.util.fcp.FcpListener; import net.pterodactylus.util.fcp.FcpMessage; /** @@ -31,11 +36,17 @@ import net.pterodactylus.util.fcp.FcpMessage; * @author David Roden <droden@gmail.com> * @version $Id$ */ -public class FcpHighLevelClient { +public class FcpHighLevelClient implements FcpListener { /** The FCP connection of this client. */ private final FcpConnection fcpConnection; + /** Storage for incoming messages. */ + private final List incomingMessages = Collections.synchronizedList(new ArrayList()); + + /** Registered handlers for incoming messages. */ + private final List> incomingMessageCallbacks = Collections.synchronizedList(new ArrayList>()); + /** * Creates a new high-level client that operates on the given FCP * connection. @@ -45,17 +56,147 @@ public class FcpHighLevelClient { */ public FcpHighLevelClient(FcpConnection fcpConnection) { this.fcpConnection = fcpConnection; + fcpConnection.addFcpListener(this); } - - public FcpNodeInformation login(String clientName, NodeHelloCallback nodeHelloCallback) throws FcpException, IOException { + + // + // ACTIONS + // + + /** + * Stops the high-level client. + */ + public void stop() { + fcpConnection.removeFcpListener(this); + } + + public Map login(String clientName, final NodeHelloCallback nodeHelloCallback) throws FcpException, IOException { FcpMessage clientHelloMessage = new FcpMessage("ClientHello"); clientHelloMessage.setField("Name", clientName); clientHelloMessage.setField("ExpectedVersion", "2.0"); - fcpConnection.sendMessage(clientHelloMessage); - FcpMessage nodeHelloMessage = fcpConnection.waitForMessage("NodeHello", "CloseConnectionDuplicateClientName"); - if (nodeHelloMessage.getName().equals("CloseConnectionDuplicateClientName")) { - throw new FcpException("duplicate client name"); + Callback> callback = new Callback>() { + /** + * {@inheritDoc} + */ + @SuppressWarnings("synthetic-access") + public CallbackContainer> consumeMessage(FcpMessage fcpMessage) throws FcpException { + String messageName = fcpMessage.getName(); + if ("CloseConnectionDuplicateClientName".equals(messageName)) { + incomingMessages.remove(0); + if (nodeHelloCallback == null) { + throw new FcpException("duplicate client name"); + } + nodeHelloCallback.closeConnectionDuplicateClientName(); + } + if ("NodeHello".equals(messageName)) { + incomingMessages.remove(0); + return new CallbackContainer>(true, fcpMessage.getFields()); + } + return new CallbackContainer>(false, null); + } + }; + synchronized (incomingMessages) { + synchronized (incomingMessageCallbacks) { + fcpConnection.sendMessage(clientHelloMessage); + if (nodeHelloCallback != null) { + incomingMessageCallbacks.add(callback); + return null; + } + } + return processMessages(callback); + } + } + + // + // PRIVATE METHODS + // + + private T processMessages(Callback callback) throws FcpException { + while (true) { + FcpMessage fcpMessage; + synchronized (incomingMessages) { + while (incomingMessages.isEmpty()) { + try { + incomingMessages.wait(); + } catch (InterruptedException ie1) { + } + } + fcpMessage = incomingMessages.get(0); + } + CallbackContainer callbackContainer = callback.consumeMessage(fcpMessage); + if (callbackContainer.isConsumed()) { + incomingMessages.remove(0); + return callbackContainer.getContent(); + } + } + } + + // + // INTERFACE FcpListener + // + + /** + * {@inheritDoc} + */ + public void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) { + if (this.fcpConnection != fcpConnection) { + return; + } + synchronized (incomingMessages) { + incomingMessages.add(fcpMessage); + incomingMessages.notifyAll(); + } + synchronized (incomingMessageCallbacks) { + for (Callback callback: incomingMessageCallbacks) { + try { + CallbackContainer callbackContainer = callback.consumeMessage(fcpMessage); + } catch (FcpException e) { + } + + } } } + public static interface Callback { + + /** + * TODO + * + * @param fcpMessage + * @return + */ + public CallbackContainer consumeMessage(FcpMessage fcpMessage) throws FcpException; + + } + + public static class CallbackContainer { + + private final boolean consumed; + private final T content; + + public CallbackContainer(boolean consumed, T content) { + this.consumed = consumed; + this.content = content; + } + + /** + * TODO + * + * @return + */ + public boolean isConsumed() { + return consumed; + } + + /** + * TODO + * + * @return + */ + public T getContent() { + return content; + } + + } + } diff --git a/src/net/pterodactylus/util/fcp/client/NodeHelloCallback.java b/src/net/pterodactylus/util/fcp/client/NodeHelloCallback.java index 6586b03..ee0b893 100644 --- a/src/net/pterodactylus/util/fcp/client/NodeHelloCallback.java +++ b/src/net/pterodactylus/util/fcp/client/NodeHelloCallback.java @@ -28,5 +28,6 @@ package net.pterodactylus.util.fcp.client; public interface NodeHelloCallback { public void nodeHello(FcpNodeInformation fcpNodeInformation); + public void closeConnectionDuplicateClientName(); } -- 2.7.4