current state
authorDavid ‘Bombe’ Roden <bombe@pterodactylus.net>
Wed, 9 Apr 2008 06:44:56 +0000 (06:44 +0000)
committerDavid ‘Bombe’ Roden <bombe@pterodactylus.net>
Wed, 9 Apr 2008 06:44:56 +0000 (06:44 +0000)
git-svn-id: http://trooper/svn/projects/jSite/trunk@653 c3eda9e8-030b-0410-8277-bc7414b0a119

src/net/pterodactylus/util/fcp/FcpConnection.java
src/net/pterodactylus/util/fcp/FcpConnectionHandler.java
src/net/pterodactylus/util/fcp/client/FcpHighLevelClient.java
src/net/pterodactylus/util/fcp/client/NodeHelloCallback.java

index 1b4d249..484b46c 100644 (file)
@@ -56,6 +56,7 @@ public class FcpConnection {
        private Socket remoteSocket;
        private InputStream remoteInputStream;
        private OutputStream remoteOutputStream;
+       private FcpConnectionHandler connectionHandler;
        private boolean connected;
 
        public FcpConnection(String host, String clientName) throws UnknownHostException {
@@ -80,17 +81,36 @@ public class FcpConnection {
        // LISTENER MANAGEMENT
        //
 
+       /**
+        * Adds the given listener to the list of listeners.
+        * 
+        * @param fcpListener
+        *            The listener to add
+        */
        public void addFcpListener(FcpListener fcpListener) {
                fcpListeners.add(fcpListener);
        }
 
+       /**
+        * Removes the given listener from the list of listeners.
+        * 
+        * @param fcpListener
+        *            The listener to remove
+        */
        public void removeFcpListener(FcpListener fcpListener) {
                fcpListeners.remove(fcpListener);
        }
 
-       private void fireNodeHello(Map<String, String> nodeProperties) {
+       /**
+        * Notifies all registered listeners that a message has been received.
+        * 
+        * @see FcpListener#receivedMessage(FcpConnection, FcpMessage)
+        * @param fcpMessage
+        *            The message that was received
+        */
+       private void fireMessageReceived(FcpMessage fcpMessage) {
                for (FcpListener fcpListener: fcpListeners) {
-                       fcpListener.fcpNodeHello(this, nodeProperties);
+                       fcpListener.receivedMessage(this, fcpMessage);
                }
        }
 
@@ -98,20 +118,42 @@ public class FcpConnection {
        // ACTIONS
        //
 
-       public synchronized void connect() throws FcpException, IOException {
-               System.out.println("connecting...");
+       /**
+        * Connects to the node.
+        * 
+        * @throws IOException
+        *             if an I/O error occurs
+        * @throws IllegalStateException
+        *             if there is already a connection to the node
+        */
+       public synchronized void connect() throws IOException, IllegalStateException {
+               if (connectionHandler != null) {
+                       throw new IllegalStateException("already connected, disconnect first");
+               }
                remoteSocket = new Socket(address, port);
                remoteInputStream = remoteSocket.getInputStream();
                remoteOutputStream = remoteSocket.getOutputStream();
                connected = true;
-               System.out.println("connected.");
-               new Thread(new FcpConnectionHandler(this, remoteInputStream)).start();
-               sendMessage(clientHelloMessage);
+               new Thread(connectionHandler = new FcpConnectionHandler(this, remoteInputStream)).start();
        }
 
+       /**
+        * Disconnects from the node. If there is no connection to the node, this
+        * method does nothing.
+        */
        public synchronized void disconnect() {
+               if (connectionHandler == null) {
+                       return;
+               }
                connected = false;
                Closer.close(remoteSocket);
+               connectionHandler.stop();
+               connectionHandler = null;
+       }
+
+       public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException {
+               System.out.println("sending message: " + fcpMessage.getName());
+               fcpMessage.write(remoteOutputStream);
        }
 
        /**
@@ -124,7 +166,7 @@ public class FcpConnection {
         * @return The properties of the peer, or <code>null</code> if the peer is
         *         unknown
         * @throws IOException
-        * @throws FcpException 
+        * @throws FcpException
         */
        public Map<String, String> sendListPeer(String nodeIdentifier) throws IOException, FcpException {
                FcpMessage listPeerMessage = new FcpMessage("ListPeer");
@@ -137,6 +179,10 @@ public class FcpConnection {
                return null;
        }
        
+       void handleMessage(FcpMessage fcpMessage) {
+               fireMessageReceived(fcpMessage);
+       }
+
        public List<Map<String, String>> sendListPeers(boolean withMetadata, boolean withVolatile) throws IOException, FcpException {
                FcpMessage listPeersMessage = new FcpMessage("ListPeers");
                listPeersMessage.setField("WithMetadata", String.valueOf(withMetadata));
@@ -167,7 +213,7 @@ public class FcpConnection {
                }
                return peerNotes;
        }
-       
+
        public void sendTestDDARequest(String directory, boolean wantReadDirectory, boolean wantWriteDirectory) throws IOException, FcpException {
                FcpMessage testDDARequestMessage = new FcpMessage("TestDDARequest");
                testDDARequestMessage.setField("Directory", directory);
@@ -175,7 +221,7 @@ public class FcpConnection {
                testDDARequestMessage.setField("WantWriteDirectory", String.valueOf(wantWriteDirectory));
                sendMessage(testDDARequestMessage);
        }
-       
+
        public FcpKeyPair generateSSK() throws IOException, FcpException {
                FcpMessage generateSSKMessage = new FcpMessage("GenerateSSK");
                String identifier = hashCode() + String.valueOf(System.currentTimeMillis());
@@ -186,40 +232,11 @@ public class FcpConnection {
                String privateKey = returnMessage.getField("InsertURI");
                return new FcpKeyPair(publicKey, privateKey);
        }
-       
-       //
-       // PACKAGE-PRIVATE METHODS
-       //
-
-       void handleMessage(FcpMessage fcpMessage) {
-               synchronized (messageWaitSync) {
-                       while (receivedMessage != null) {
-                               /* previous message has not yet been consumed */
-                               System.out.println("waiting for message to be consumed...");
-                               try {
-                                       messageWaitSync.wait();
-                               } catch (InterruptedException ie1) {
-                               }
-                       }
-                       /* TODO - check whether to send events here or later. */
-                       if ("NodeHello".equals(fcpMessage.getName())) {
-                               fireNodeHello(fcpMessage.getFields());
-                       }
-                       System.out.println("setting receivedMessage");
-                       receivedMessage = fcpMessage;
-                       messageWaitSync.notifyAll();
-               }
-       }
 
        //
        // PRIVATE METHODS
        //
 
-       public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException {
-               System.out.println("sending message: " + fcpMessage.getName());
-               fcpMessage.write(remoteOutputStream);
-       }
-
        public FcpMessage waitForMessage(String... messageNames) throws FcpException {
                FcpMessage oldMessage = null;
                synchronized (messageWaitSync) {
index 2693c1f..cbea0e2 100644 (file)
@@ -25,26 +25,50 @@ import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 
 /**
- * TODO
+ * Handles an FCP connection to a node.
  * 
  * @author David ‘Bombe’ Roden &lt;bombe@freenetproject.org&gt;
  * @version $Id$
  */
 public class FcpConnectionHandler implements Runnable {
 
+       /** The underlying connection. */
        private final FcpConnection fcpConnection;
+
+       /** The input stream from the node. */
        private final InputStream remoteInputStream;
-       
+
+       /** Whether to stop the connection handler. */
+       private boolean shouldStop;
+
+       /** Whether the next read line feed should be ignored. */
        private boolean ignoreNextLinefeed;
 
+       /**
+        * Creates a new connection handler that operates on the given connection
+        * and input stream.
+        * 
+        * @param fcpConnection
+        *            The underlying FCP connection
+        * @param remoteInputStream
+        *            The input stream from the node
+        */
        public FcpConnectionHandler(FcpConnection fcpConnection, InputStream remoteInputStream) {
                this.fcpConnection = fcpConnection;
                this.remoteInputStream = remoteInputStream;
        }
 
+       /**
+        * {@inheritDoc}
+        */
        public void run() {
                FcpMessage fcpMessage = null;
                while (true) {
+                       synchronized (this) {
+                               if (shouldStop) {
+                                       break;
+                               }
+                       }
                        try {
                                String line = readLine();
                                System.out.println("read line: " + line);
@@ -72,11 +96,25 @@ public class FcpConnectionHandler implements Runnable {
                                String value = line.substring(equalSign + 1);
                                fcpMessage.setField(field, value);
                        } catch (IOException e) {
+                               break;
                        }
                }
        }
 
        /**
+        * Stops the connection handler.
+        */
+       public void stop() {
+               synchronized (this) {
+                       shouldStop = true;
+               }
+       }
+
+       //
+       // PRIVATE METHODS
+       //
+
+       /**
         * Reads bytes from {@link #remoteInputStream} until ‘\r’ or ‘\n’ are
         * encountered and decodes the read bytes using UTF-8.
         * 
index 8fd85b5..5e02dc6 100644 (file)
 package net.pterodactylus.util.fcp.client;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 import net.pterodactylus.util.fcp.FcpConnection;
 import net.pterodactylus.util.fcp.FcpException;
+import net.pterodactylus.util.fcp.FcpListener;
 import net.pterodactylus.util.fcp.FcpMessage;
 
 /**
@@ -31,11 +36,17 @@ import net.pterodactylus.util.fcp.FcpMessage;
  * @author David Roden &lt;droden@gmail.com&gt;
  * @version $Id$
  */
-public class FcpHighLevelClient {
+public class FcpHighLevelClient implements FcpListener {
 
        /** The FCP connection of this client. */
        private final FcpConnection fcpConnection;
 
+       /** Storage for incoming messages. */
+       private final List<FcpMessage> incomingMessages = Collections.synchronizedList(new ArrayList<FcpMessage>());
+
+       /** Registered handlers for incoming messages. */
+       private final List<Callback<?>> incomingMessageCallbacks = Collections.synchronizedList(new ArrayList<Callback<?>>());
+       
        /**
         * Creates a new high-level client that operates on the given FCP
         * connection.
@@ -45,17 +56,147 @@ public class FcpHighLevelClient {
         */
        public FcpHighLevelClient(FcpConnection fcpConnection) {
                this.fcpConnection = fcpConnection;
+               fcpConnection.addFcpListener(this);
        }
-       
-       public FcpNodeInformation login(String clientName, NodeHelloCallback nodeHelloCallback) throws FcpException, IOException {
+
+       //
+       // ACTIONS
+       //
+
+       /**
+        * Stops the high-level client.
+        */
+       public void stop() {
+               fcpConnection.removeFcpListener(this);
+       }
+
+       public Map<String, String> login(String clientName, final NodeHelloCallback nodeHelloCallback) throws FcpException, IOException {
                FcpMessage clientHelloMessage = new FcpMessage("ClientHello");
                clientHelloMessage.setField("Name", clientName);
                clientHelloMessage.setField("ExpectedVersion", "2.0");
-               fcpConnection.sendMessage(clientHelloMessage);
-               FcpMessage nodeHelloMessage = fcpConnection.waitForMessage("NodeHello", "CloseConnectionDuplicateClientName");
-               if (nodeHelloMessage.getName().equals("CloseConnectionDuplicateClientName")) {
-                       throw new FcpException("duplicate client name");
+               Callback<Map<String, String>> callback = new Callback<Map<String, String>>() {
+                       /**
+                        * {@inheritDoc}
+                        */
+                       @SuppressWarnings("synthetic-access")
+                       public CallbackContainer<Map<String, String>> consumeMessage(FcpMessage fcpMessage) throws FcpException {
+                               String messageName = fcpMessage.getName();
+                               if ("CloseConnectionDuplicateClientName".equals(messageName)) {
+                                       incomingMessages.remove(0);
+                                       if (nodeHelloCallback == null) {
+                                               throw new FcpException("duplicate client name");
+                                       }
+                                       nodeHelloCallback.closeConnectionDuplicateClientName();
+                               }
+                               if ("NodeHello".equals(messageName)) {
+                                       incomingMessages.remove(0);
+                                       return new CallbackContainer<Map<String, String>>(true, fcpMessage.getFields());
+                               }
+                               return new CallbackContainer<Map<String, String>>(false, null);
+                       }
+               };
+               synchronized (incomingMessages) {
+                       synchronized (incomingMessageCallbacks) {
+                               fcpConnection.sendMessage(clientHelloMessage);
+                               if (nodeHelloCallback != null) {
+                                       incomingMessageCallbacks.add(callback);
+                                       return null;
+                               }
+                       }
+                       return processMessages(callback);
+               }
+       }
+
+       //
+       // PRIVATE METHODS
+       //
+
+       private <T> T processMessages(Callback<T> callback) throws FcpException {
+               while (true) {
+                       FcpMessage fcpMessage;
+                       synchronized (incomingMessages) {
+                               while (incomingMessages.isEmpty()) {
+                                       try {
+                                               incomingMessages.wait();
+                                       } catch (InterruptedException ie1) {
+                                       }
+                               }
+                               fcpMessage = incomingMessages.get(0);
+                       }
+                       CallbackContainer<T> callbackContainer = callback.consumeMessage(fcpMessage);
+                       if (callbackContainer.isConsumed()) {
+                               incomingMessages.remove(0);
+                               return callbackContainer.getContent();
+                       }
+               }
+       }
+
+       //
+       // INTERFACE FcpListener
+       //
+
+       /**
+        * {@inheritDoc}
+        */
+       public void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
+               if (this.fcpConnection != fcpConnection) {
+                       return;
+               }
+               synchronized (incomingMessages) {
+                       incomingMessages.add(fcpMessage);
+                       incomingMessages.notifyAll();
+               }
+               synchronized (incomingMessageCallbacks) {
+                       for (Callback<?> callback: incomingMessageCallbacks) {
+                               try {
+                                       CallbackContainer<?> callbackContainer = callback.consumeMessage(fcpMessage);
+                               } catch (FcpException e) {
+                               }
+                               
+                       }
                }
        }
 
+       public static interface Callback<T> {
+               
+               /**
+                * TODO
+                * 
+                * @param fcpMessage
+                * @return
+                */
+               public CallbackContainer<T> consumeMessage(FcpMessage fcpMessage) throws FcpException;
+               
+       }
+       
+       public static class CallbackContainer<T> {
+               
+               private final boolean consumed;
+               private final T content;
+               
+               public CallbackContainer(boolean consumed, T content) {
+                       this.consumed = consumed;
+                       this.content = content;
+               }
+               
+               /**
+                * TODO
+                * 
+                * @return
+                */
+               public boolean isConsumed() {
+                       return consumed;
+               }
+               
+               /**
+                * TODO
+                * 
+                * @return
+                */
+               public T getContent() {
+                       return content;
+               }
+               
+       }
+       
 }
index 6586b03..ee0b893 100644 (file)
@@ -28,5 +28,6 @@ package net.pterodactylus.util.fcp.client;
 public interface NodeHelloCallback {
 
        public void nodeHello(FcpNodeInformation fcpNodeInformation);
+       public void closeConnectionDuplicateClientName();
        
 }