add FCPPluginReply
[jSite2.git] / src / net / pterodactylus / util / fcp / FcpConnection.java
index 484b46c..9a13264 100644 (file)
@@ -26,55 +26,111 @@ import java.net.InetAddress;
 import java.net.Socket;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.Map.Entry;
 
 import net.pterodactylus.util.io.Closer;
+import net.pterodactylus.util.io.LimitedInputStream;
 
 /**
- * TODO
+ * An FCP connection to a Freenet node.
  * 
  * @author David ‘Bombe’ Roden <bombe@freenetproject.org>
  * @version $Id$
  */
 public class FcpConnection {
 
+       /** The default port for FCP v2. */
        public static final int DEFAULT_PORT = 9481;
 
-       private final Object messageWaitSync = new Object();
-       private FcpMessage receivedMessage = null;
-
+       /** The list of FCP listeners. */
        private final List<FcpListener> fcpListeners = new ArrayList<FcpListener>();
 
+       /** The address of the node. */
        private final InetAddress address;
+
+       /** The port number of the node’s FCP port. */
        private final int port;
-       private final String clientName;
 
+       /** The remote socket. */
        private Socket remoteSocket;
+
+       /** The input stream from the node. */
        private InputStream remoteInputStream;
+
+       /** The output stream to the node. */
        private OutputStream remoteOutputStream;
+
+       /** The connection handler. */
        private FcpConnectionHandler connectionHandler;
-       private boolean connected;
 
-       public FcpConnection(String host, String clientName) throws UnknownHostException {
-               this(host, DEFAULT_PORT, clientName);
+       /** Incoming message statistics. */
+       private Map<String, Integer> incomingMessageStatistics = Collections.synchronizedMap(new HashMap<String, Integer>());
+
+       /**
+        * Creates a new FCP connection to the freenet node running on localhost,
+        * using the default port.
+        * 
+        * @throws UnknownHostException
+        *             if the hostname can not be resolved
+        */
+       public FcpConnection() throws UnknownHostException {
+               this(InetAddress.getLocalHost());
        }
 
-       public FcpConnection(String host, int port, String clientName) throws UnknownHostException {
-               this(InetAddress.getByName(host), port, clientName);
+       /**
+        * Creates a new FCP connection to the Freenet node running on the given
+        * host, listening on the default port.
+        * 
+        * @param host
+        *            The hostname of the Freenet node
+        * @throws UnknownHostException
+        *             if <code>host</code> can not be resolved
+        */
+       public FcpConnection(String host) throws UnknownHostException {
+               this(host, DEFAULT_PORT);
        }
 
-       public FcpConnection(InetAddress address, String clientName) {
-               this(address, DEFAULT_PORT, clientName);
+       /**
+        * Creates a new FCP connection to the Freenet node running on the given
+        * host, listening on the given port.
+        * 
+        * @param host
+        *            The hostname of the Freenet node
+        * @param port
+        *            The port number of the node’s FCP port
+        * @throws UnknownHostException
+        *             if <code>host</code> can not be resolved
+        */
+       public FcpConnection(String host, int port) throws UnknownHostException {
+               this(InetAddress.getByName(host), port);
        }
 
-       public FcpConnection(InetAddress address, int port, String clientName) {
+       /**
+        * Creates a new FCP connection to the Freenet node running at the given
+        * address, listening on the default port.
+        * 
+        * @param address
+        *            The address of the Freenet node
+        */
+       public FcpConnection(InetAddress address) {
+               this(address, DEFAULT_PORT);
+       }
+
+       /**
+        * Creates a new FCP connection to the Freenet node running at the given
+        * address, listening on the given port.
+        * 
+        * @param address
+        *            The address of the Freenet node
+        * @param port
+        *            The port number of the node’s FCP port
+        */
+       public FcpConnection(InetAddress address, int port) {
                this.address = address;
                this.port = port;
-               this.clientName = clientName;
        }
 
        //
@@ -102,6 +158,409 @@ public class FcpConnection {
        }
 
        /**
+        * Notifies listeners that a “NodeHello” message was received.
+        * 
+        * @see FcpListener#receivedNodeHello(FcpConnection, NodeHello)
+        * @param nodeHello
+        *            The “NodeHello” message
+        */
+       private void fireReceivedNodeHello(NodeHello nodeHello) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedNodeHello(this, nodeHello);
+               }
+       }
+
+       /**
+        * Notifies listeners that a “CloseConnectionDuplicateClientName” message
+        * was received.
+        * 
+        * @see FcpListener#receivedCloseConnectionDuplicateClientName(FcpConnection,
+        *      CloseConnectionDuplicateClientName)
+        * @param closeConnectionDuplicateClientName
+        *            The “CloseConnectionDuplicateClientName” message
+        */
+       private void fireReceivedCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedCloseConnectionDuplicateClientName(this, closeConnectionDuplicateClientName);
+               }
+       }
+
+       /**
+        * Notifies listeners that a “SSKKeypair” message was received.
+        * 
+        * @see FcpListener#receivedSSKKeypair(FcpConnection, SSKKeypair)
+        * @param sskKeypair
+        *            The “SSKKeypair” message
+        */
+       private void fireReceivedSSKKeypair(SSKKeypair sskKeypair) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedSSKKeypair(this, sskKeypair);
+               }
+       }
+
+       /**
+        * Notifies listeners that a “Peer” message was received.
+        * 
+        * @see FcpListener#receivedPeer(FcpConnection, Peer)
+        * @param peer
+        *            The “Peer” message
+        */
+       private void fireReceivedPeer(Peer peer) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedPeer(this, peer);
+               }
+       }
+
+       /**
+        * Notifies all listeners that an “EndListPeers” message was received.
+        * 
+        * @see FcpListener#receivedEndListPeers(FcpConnection, EndListPeers)
+        * @param endListPeers
+        *            The “EndListPeers” message
+        */
+       private void fireReceivedEndListPeers(EndListPeers endListPeers) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedEndListPeers(this, endListPeers);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “PeerNote” message was received.
+        * 
+        * @see FcpListener#receivedPeerNote(FcpConnection, PeerNote)
+        * @param peerNote
+        */
+       private void fireReceivedPeerNote(PeerNote peerNote) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedPeerNote(this, peerNote);
+               }
+       }
+
+       /**
+        * Notifies all listeners that an “EndListPeerNotes” message was received.
+        * 
+        * @see FcpListener#receivedEndListPeerNotes(FcpConnection,
+        *      EndListPeerNotes)
+        * @param endListPeerNotes
+        *            The “EndListPeerNotes” message
+        */
+       private void fireReceivedEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedEndListPeerNotes(this, endListPeerNotes);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “PeerRemoved” message was received.
+        * 
+        * @see FcpListener#receivedPeerRemoved(FcpConnection, PeerRemoved)
+        * @param peerRemoved
+        *            The “PeerRemoved” message
+        */
+       private void fireReceivedPeerRemoved(PeerRemoved peerRemoved) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedPeerRemoved(this, peerRemoved);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “NodeData” message was received.
+        * 
+        * @see FcpListener#receivedNodeData(FcpConnection, NodeData)
+        * @param nodeData
+        *            The “NodeData” message
+        */
+       private void fireReceivedNodeData(NodeData nodeData) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedNodeData(this, nodeData);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “TestDDAReply” message was received.
+        * 
+        * @see FcpListener#receivedTestDDAReply(FcpConnection, TestDDAReply)
+        * @param testDDAReply
+        *            The “TestDDAReply” message
+        */
+       private void fireReceivedTestDDAReply(TestDDAReply testDDAReply) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedTestDDAReply(this, testDDAReply);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “TestDDAComplete” message was received.
+        * 
+        * @see FcpListener#receivedTestDDAComplete(FcpConnection, TestDDAComplete)
+        * @param testDDAComplete
+        *            The “TestDDAComplete” message
+        */
+       private void fireReceivedTestDDAComplete(TestDDAComplete testDDAComplete) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedTestDDAComplete(this, testDDAComplete);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “PersistentGet” message was received.
+        * 
+        * @param persistentGet
+        *            The “PersistentGet” message
+        */
+       private void fireReceivedPersistentGet(PersistentGet persistentGet) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedPersistentGet(this, persistentGet);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “PersistentPut” message was received.
+        * 
+        * @see FcpListener#receivedPersistentPut(FcpConnection, PersistentPut)
+        * @param persistentPut
+        *            The “PersistentPut” message
+        */
+       private void fireReceivedPersistentPut(PersistentPut persistentPut) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedPersistentPut(this, persistentPut);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “EndListPersistentRequests” message was
+        * received.
+        * 
+        * @param endListPersistentRequests
+        *            The “EndListPersistentRequests” message
+        */
+       private void fireReceivedEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedEndListPersistentRequests(this, endListPersistentRequests);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “URIGenerated” message was received.
+        * 
+        * @param uriGenerated
+        *            The “URIGenerated” message
+        */
+       private void fireReceivedURIGenerated(URIGenerated uriGenerated) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedURIGenerated(this, uriGenerated);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “DataFound” message was received.
+        * 
+        * @param dataFound
+        *            The “DataFound” message
+        */
+       private void fireReceivedDataFound(DataFound dataFound) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedDataFound(this, dataFound);
+               }
+       }
+
+       /**
+        * Notifies all listeners that an “AllData” message was received.
+        * 
+        * @param allData
+        *            The “AllData” message
+        */
+       private void fireReceivedAllData(AllData allData) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedAllData(this, allData);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “SimpleProgress” message was received.
+        * 
+        * @param simpleProgress
+        *            The “SimpleProgress” message
+        */
+       private void fireReceivedSimpleProgress(SimpleProgress simpleProgress) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedSimpleProgress(this, simpleProgress);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “StartedCompression” message was received.
+        * 
+        * @param startedCompression
+        *            The “StartedCompression” message
+        */
+       private void fireReceivedStartedCompression(StartedCompression startedCompression) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedStartedCompression(this, startedCompression);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “FinishedCompression” message was received.
+        * 
+        * @param finishedCompression
+        *            The “FinishedCompression” message
+        */
+       private void fireReceivedFinishedCompression(FinishedCompression finishedCompression) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receviedFinishedCompression(this, finishedCompression);
+               }
+       }
+
+       /**
+        * Notifies all listeners that an “UnknownPeerNoteType” message was
+        * received.
+        * 
+        * @param unknownPeerNoteType
+        *            The “UnknownPeerNoteType” message
+        */
+       private void fireReceivedUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedUnknownPeerNoteType(this, unknownPeerNoteType);
+               }
+       }
+
+       /**
+        * Notifies all listeners that an “UnknownNodeIdentifier” message was
+        * received.
+        * 
+        * @param unknownNodeIdentifier
+        *            The “UnknownNodeIdentifier” message
+        */
+       private void fireReceivedUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedUnknownNodeIdentifier(this, unknownNodeIdentifier);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “ConfigData” message was received.
+        * 
+        * @param configData
+        *            The “ConfigData” message
+        */
+       private void fireReceivedConfigData(ConfigData configData) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedConfigData(this, configData);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “GetFailed” message was received.
+        * 
+        * @param getFailed
+        *            The “GetFailed” message
+        */
+       private void fireReceivedGetFailed(GetFailed getFailed) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedGetFailed(this, getFailed);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “PutFailed” message was received.
+        * 
+        * @param putFailed
+        *            The “PutFailed” message
+        */
+       private void fireReceivedPutFailed(PutFailed putFailed) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedPutFailed(this, putFailed);
+               }
+       }
+
+       /**
+        * Notifies all listeners that an “IdentifierCollision” message was
+        * received.
+        * 
+        * @param identifierCollision
+        *            The “IdentifierCollision” message
+        */
+       private void fireReceivedIdentifierCollision(IdentifierCollision identifierCollision) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedIdentifierCollision(this, identifierCollision);
+               }
+       }
+
+       /**
+        * Notifies all listeners that an “PersistentPutDir” message was received.
+        * 
+        * @param persistentPutDir
+        *            The “PersistentPutDir” message
+        */
+       private void fireReceivedPersistentPutDir(PersistentPutDir persistentPutDir) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedPersistentPutDir(this, persistentPutDir);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “PersistentRequestRemoved” message was
+        * received.
+        * 
+        * @param persistentRequestRemoved
+        *            The “PersistentRequestRemoved” message
+        */
+       private void fireReceivedPersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedPersistentRequestRemoved(this, persistentRequestRemoved);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “SubscribedUSKUpdate” message was received.
+        * 
+        * @param subscribedUSKUpdate
+        *            The “SubscribedUSKUpdate” message
+        */
+       private void fireReceivedSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedSubscribedUSKUpdate(this, subscribedUSKUpdate);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “PluginInfo” message was received.
+        * 
+        * @param pluginInfo
+        *            The “PluginInfo” message
+        */
+       private void fireReceivedPluginInfo(PluginInfo pluginInfo) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedPluginInfo(this, pluginInfo);
+               }
+       }
+
+       /**
+        * Notifies all listeners that an “FCPPluginReply” message was received.
+        * 
+        * @param fcpPluginReply
+        *            The “FCPPluginReply” message
+        */
+       private void fireReceivedFCPPluginReply(FCPPluginReply fcpPluginReply) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedFCPPluginReply(this, fcpPluginReply);
+               }
+       }
+
+       /**
+        * Notifies all listeners that a “ProtocolError” message was received.
+        * 
+        * @param protocolError
+        *            The “ProtocolError” message
+        */
+       private void fireReceivedProtocolError(ProtocolError protocolError) {
+               for (FcpListener fcpListener: fcpListeners) {
+                       fcpListener.receivedProtocolError(this, protocolError);
+               }
+       }
+
+       /**
         * Notifies all registered listeners that a message has been received.
         * 
         * @see FcpListener#receivedMessage(FcpConnection, FcpMessage)
@@ -133,7 +592,6 @@ public class FcpConnection {
                remoteSocket = new Socket(address, port);
                remoteInputStream = remoteSocket.getInputStream();
                remoteOutputStream = remoteSocket.getOutputStream();
-               connected = true;
                new Thread(connectionHandler = new FcpConnectionHandler(this, remoteInputStream)).start();
        }
 
@@ -145,159 +603,160 @@ public class FcpConnection {
                if (connectionHandler == null) {
                        return;
                }
-               connected = false;
                Closer.close(remoteSocket);
                connectionHandler.stop();
                connectionHandler = null;
        }
 
+       /**
+        * Sends the given FCP message.
+        * 
+        * @param fcpMessage
+        *            The FCP message to send
+        * @throws IOException
+        *             if an I/O error occurs
+        */
        public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException {
                System.out.println("sending message: " + fcpMessage.getName());
                fcpMessage.write(remoteOutputStream);
        }
 
+       //
+       // PACKAGE-PRIVATE METHODS
+       //
+
        /**
-        * Sends a “ListPeer” command to the node and returns the properties of the
-        * peer.
+        * Handles the given message, notifying listeners. This message should only
+        * be called by {@link FcpConnectionHandler}.
         * 
-        * @param nodeIdentifier
-        *            The name (except for OpenNet nodes), the identity or the
-        *            node’s “address:port” pair
-        * @return The properties of the peer, or <code>null</code> if the peer is
-        *         unknown
-        * @throws IOException
-        * @throws FcpException
+        * @param fcpMessage
+        *            The received message
         */
-       public Map<String, String> sendListPeer(String nodeIdentifier) throws IOException, FcpException {
-               FcpMessage listPeerMessage = new FcpMessage("ListPeer");
-               listPeerMessage.setField("NodeIdentifier", nodeIdentifier);
-               sendMessage(listPeerMessage);
-               FcpMessage returnMessage = waitForMessage("Peer", "UnknownNodeIdentifier");
-               if (returnMessage.getName().equals("Peer")) {
-                       return returnMessage.getFields();
-               }
-               return null;
-       }
-       
        void handleMessage(FcpMessage fcpMessage) {
-               fireMessageReceived(fcpMessage);
-       }
-
-       public List<Map<String, String>> sendListPeers(boolean withMetadata, boolean withVolatile) throws IOException, FcpException {
-               FcpMessage listPeersMessage = new FcpMessage("ListPeers");
-               listPeersMessage.setField("WithMetadata", String.valueOf(withMetadata));
-               listPeersMessage.setField("WithVolatile", String.valueOf(withVolatile));
-               sendMessage(listPeersMessage);
-               List<Map<String, String>> peers = new ArrayList<Map<String, String>>();
-               while (true) {
-                       FcpMessage returnMessage = waitForMessage("Peer", "EndListPeers");
-                       if (returnMessage.getName().equals("EndListPeers")) {
-                               break;
+               String messageName = fcpMessage.getName();
+               countMessage(messageName);
+               if ("SimpleProgress".equals(messageName)) {
+                       fireReceivedSimpleProgress(new SimpleProgress(fcpMessage));
+               } else if ("ProtocolError".equals(messageName)) {
+                       fireReceivedProtocolError(new ProtocolError(fcpMessage));
+               } else if ("PersistentGet".equals(messageName)) {
+                       fireReceivedPersistentGet(new PersistentGet(fcpMessage));
+               } else if ("PersistentPut".equals(messageName)) {
+                       fireReceivedPersistentPut(new PersistentPut(fcpMessage));
+               } else if ("PersistentPutDir".equals(messageName)) {
+                       fireReceivedPersistentPutDir(new PersistentPutDir(fcpMessage));
+               } else if ("URIGenerated".equals(messageName)) {
+                       fireReceivedURIGenerated(new URIGenerated(fcpMessage));
+               } else if ("EndListPersistentRequests".equals(messageName)) {
+                       fireReceivedEndListPersistentRequests(new EndListPersistentRequests(fcpMessage));
+               } else if ("Peer".equals(messageName)) {
+                       fireReceivedPeer(new Peer(fcpMessage));
+               } else if ("PeerNote".equals(messageName)) {
+                       fireReceivedPeerNote(new PeerNote(fcpMessage));
+               } else if ("StartedCompression".equals(messageName)) {
+                       fireReceivedStartedCompression(new StartedCompression(fcpMessage));
+               } else if ("FinishedCompression".equals(messageName)) {
+                       fireReceivedFinishedCompression(new FinishedCompression(fcpMessage));
+               } else if ("GetFailed".equals(messageName)) {
+                       fireReceivedGetFailed(new GetFailed(fcpMessage));
+               } else if ("PutFailed".equals(messageName)) {
+                       fireReceivedPutFailed(new PutFailed(fcpMessage));
+               } else if ("DataFound".equals(messageName)) {
+                       fireReceivedDataFound(new DataFound(fcpMessage));
+               } else if ("SubscribedUSKUpdate".equals(messageName)) {
+                       fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage));
+               } else if ("IdentifierCollision".equals(messageName)) {
+                       fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage));
+               } else if ("AllData".equals(messageName)) {
+                       LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
+                       fireReceivedAllData(new AllData(fcpMessage, payloadInputStream));
+                       try {
+                               payloadInputStream.consume();
+                       } catch (IOException ioe1) {
+                               /* well, ignore. when the connection handler fails, all fails. */
                        }
-                       peers.add(returnMessage.getFields());
-               }
-               return peers;
-       }
-
-       public List<Map<String, String>> sendListPeerNotes(String nodeIdentifier) throws IOException, FcpException {
-               FcpMessage listPeerNotesMessage = new FcpMessage("ListPeerNotes");
-               listPeerNotesMessage.setField("NodeIdentifier", nodeIdentifier);
-               sendMessage(listPeerNotesMessage);
-               List<Map<String, String>> peerNotes = new ArrayList<Map<String, String>>();
-               while (true) {
-                       FcpMessage returnMessage = waitForMessage("PeerNote", "EndListPeerNotes");
-                       if (returnMessage.getName().equals("EndListPeerNotes")) {
-                               break;
+               } else if ("EndListPeerNotes".equals(messageName)) {
+                       fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage));
+               } else if ("EndListPeers".equals(messageName)) {
+                       fireReceivedEndListPeers(new EndListPeers(fcpMessage));
+               } else if ("SSKKeypair".equals(messageName)) {
+                       fireReceivedSSKKeypair(new SSKKeypair(fcpMessage));
+               } else if ("PeerRemoved".equals(messageName)) {
+                       fireReceivedPeerRemoved(new PeerRemoved(fcpMessage));
+               } else if ("PersistentRequestRemoved".equals(messageName)) {
+                       fireReceivedPersistentRequestRemoved(new PersistentRequestRemoved(fcpMessage));
+               } else if ("UnknownPeerNoteType".equals(messageName)) {
+                       fireReceivedUnknownPeerNoteType(new UnknownPeerNoteType(fcpMessage));
+               } else if ("UnknownNodeIdentifier".equals(messageName)) {
+                       fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage));
+               } else if ("FCPPluginReply".equals(messageName)) {
+                       LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
+                       fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream));
+                       try {
+                               payloadInputStream.consume();
+                       } catch (IOException ioe1) {
+                               /* ignore. */
                        }
-                       peerNotes.add(returnMessage.getFields());
+               } else if ("PluginInfo".equals(messageName)) {
+                       fireReceivedPluginInfo(new PluginInfo(fcpMessage));
+               } else if ("NodeData".equals(messageName)) {
+                       fireReceivedNodeData(new NodeData(fcpMessage));
+               } else if ("TestDDAReply".equals(messageName)) {
+                       fireReceivedTestDDAReply(new TestDDAReply(fcpMessage));
+               } else if ("TestDDAComplete".equals(messageName)) {
+                       fireReceivedTestDDAComplete(new TestDDAComplete(fcpMessage));
+               } else if ("ConfigData".equals(messageName)) {
+                       fireReceivedConfigData(new ConfigData(fcpMessage));
+               } else if ("NodeHello".equals(messageName)) {
+                       fireReceivedNodeHello(new NodeHello(fcpMessage));
+               } else if ("CloseConnectionDuplicateClientName".equals(messageName)) {
+                       fireReceivedCloseConnectionDuplicateClientName(new CloseConnectionDuplicateClientName(fcpMessage));
+               } else {
+                       fireMessageReceived(fcpMessage);
                }
-               return peerNotes;
-       }
-
-       public void sendTestDDARequest(String directory, boolean wantReadDirectory, boolean wantWriteDirectory) throws IOException, FcpException {
-               FcpMessage testDDARequestMessage = new FcpMessage("TestDDARequest");
-               testDDARequestMessage.setField("Directory", directory);
-               testDDARequestMessage.setField("WantReadDirectory", String.valueOf(wantReadDirectory));
-               testDDARequestMessage.setField("WantWriteDirectory", String.valueOf(wantWriteDirectory));
-               sendMessage(testDDARequestMessage);
        }
 
-       public FcpKeyPair generateSSK() throws IOException, FcpException {
-               FcpMessage generateSSKMessage = new FcpMessage("GenerateSSK");
-               String identifier = hashCode() + String.valueOf(System.currentTimeMillis());
-               generateSSKMessage.setField("Identifier", identifier);
-               sendMessage(generateSSKMessage);
-               FcpMessage returnMessage = waitForMessage("SSKKeypair(Identifier=" + identifier + ")");
-               String publicKey = returnMessage.getField("RequestURI");
-               String privateKey = returnMessage.getField("InsertURI");
-               return new FcpKeyPair(publicKey, privateKey);
+       /**
+        * Handles a disconnect from the node.
+        */
+       synchronized void handleDisconnect() {
+               Closer.close(remoteInputStream);
+               Closer.close(remoteOutputStream);
+               Closer.close(remoteSocket);
+               connectionHandler = null;
        }
 
        //
        // PRIVATE METHODS
        //
 
-       public FcpMessage waitForMessage(String... messageNames) throws FcpException {
-               FcpMessage oldMessage = null;
-               synchronized (messageWaitSync) {
-                       while (true) {
-                               while (receivedMessage == oldMessage) {
-                                       System.out.println("waiting for receivedMessage");
-                                       try {
-                                               messageWaitSync.wait();
-                                       } catch (InterruptedException ie1) {
-                                       }
-                               }
-                               System.out.println("got message: " + receivedMessage.getName());
-                               String receivedMessageName = receivedMessage.getName();
-                               if ("ProtocolError".equals(receivedMessageName)) {
-                                       int code = Integer.valueOf(receivedMessage.getField("Code"));
-                                       boolean fatal = Boolean.valueOf(receivedMessage.getField("Fatal"));
-                                       boolean global = Boolean.valueOf(receivedMessage.getField("Global"));
-                                       String codeDescription = receivedMessage.getField("CodeDescription");
-                                       String extraDescription = receivedMessage.getField("ExtraDescription");
-                                       String identifier = receivedMessage.getField("Identifier");
-                                       FcpProtocolException fcpProtocolException = new FcpProtocolException(code, fatal, global);
-                                       fcpProtocolException.setCodeDescription(codeDescription);
-                                       fcpProtocolException.setExtraDescription(extraDescription);
-                                       fcpProtocolException.setIdentifier(identifier);
-                                       throw fcpProtocolException;
-                               }
-                               for (String messageName: messageNames) {
-                                       int firstBracket = messageName.indexOf('(');
-                                       Map<String, String> wantedIdentifiers = new HashMap<String, String>();
-                                       if (firstBracket > -1) {
-                                               StringTokenizer identifierTokens = new StringTokenizer(messageName.substring(firstBracket), "()");
-                                               while (identifierTokens.hasMoreTokens()) {
-                                                       String identifierToken = identifierTokens.nextToken();
-                                                       int equalSign = identifierToken.indexOf('=');
-                                                       if (equalSign > -1) {
-                                                               wantedIdentifiers.put(identifierToken.substring(0, equalSign), identifierToken.substring(equalSign + 1));
-                                                       }
-                                               }
-                                               messageName = messageName.substring(0, firstBracket);
-                                       }
-                                       if (receivedMessageName.equals(messageName)) {
-                                               boolean found = true;
-                                               for (Entry<String, String> wantedIdentifier: wantedIdentifiers.entrySet()) {
-                                                       System.out.println("key: " + wantedIdentifier.getKey() + ", value: " + wantedIdentifier.getValue() + ", msg: " + receivedMessage.getField(wantedIdentifier.getKey()));
-                                                       if (!wantedIdentifier.getValue().equals(receivedMessage.getField(wantedIdentifier.getKey()))) {
-                                                               found = false;
-                                                               break;
-                                                       }
-                                               }
-                                               if (found) {
-                                                       System.out.println("message found");
-                                                       FcpMessage foundMessage = receivedMessage;
-                                                       receivedMessage = null;
-                                                       messageWaitSync.notifyAll();
-                                                       return foundMessage;
-                                               }
-                                       }
-                               }
-                               oldMessage = receivedMessage;
-                       }
+       /**
+        * Incremets the counter in {@link #incomingMessageStatistics} by <cod>1</code>
+        * for the given message name.
+        * 
+        * @param name
+        *            The name of the message to count
+        */
+       private void countMessage(String name) {
+               int oldValue = 0;
+               if (incomingMessageStatistics.containsKey(name)) {
+                       oldValue = incomingMessageStatistics.get(name);
+               }
+               incomingMessageStatistics.put(name, oldValue + 1);
+       }
+
+       /**
+        * Returns a limited input stream from the node’s input stream.
+        * 
+        * @param dataLength
+        *            The length of the stream
+        * @return The limited input stream
+        */
+       private LimitedInputStream getInputStream(long dataLength) {
+               if (dataLength <= 0) {
+                       return new LimitedInputStream(null, 0);
                }
+               return new LimitedInputStream(remoteInputStream, dataLength);
        }
 
 }