import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.StringTokenizer;
-import java.util.Map.Entry;
import net.pterodactylus.util.io.Closer;
+import net.pterodactylus.util.io.LimitedInputStream;
/**
- * TODO
+ * An FCP connection to a Freenet node.
*
* @author David ‘Bombe’ Roden <bombe@freenetproject.org>
* @version $Id$
*/
public class FcpConnection {
+ /** The default port for FCP v2. */
public static final int DEFAULT_PORT = 9481;
- private final Object messageWaitSync = new Object();
- private FcpMessage receivedMessage = null;
-
+ /** The list of FCP listeners. */
private final List<FcpListener> fcpListeners = new ArrayList<FcpListener>();
+ /** The address of the node. */
private final InetAddress address;
+
+ /** The port number of the node’s FCP port. */
private final int port;
- private final String clientName;
+ /** The remote socket. */
private Socket remoteSocket;
+
+ /** The input stream from the node. */
private InputStream remoteInputStream;
+
+ /** The output stream to the node. */
private OutputStream remoteOutputStream;
- private boolean connected;
- public FcpConnection(String host, String clientName) throws UnknownHostException {
- this(host, DEFAULT_PORT, clientName);
+ /** The connection handler. */
+ private FcpConnectionHandler connectionHandler;
+
+ /** Incoming message statistics. */
+ private Map<String, Integer> incomingMessageStatistics = Collections.synchronizedMap(new HashMap<String, Integer>());
+
+ /**
+ * Creates a new FCP connection to the freenet node running on localhost,
+ * using the default port.
+ *
+ * @throws UnknownHostException
+ * if the hostname can not be resolved
+ */
+ public FcpConnection() throws UnknownHostException {
+ this(InetAddress.getLocalHost());
+ }
+
+ /**
+ * Creates a new FCP connection to the Freenet node running on the given
+ * host, listening on the default port.
+ *
+ * @param host
+ * The hostname of the Freenet node
+ * @throws UnknownHostException
+ * if <code>host</code> can not be resolved
+ */
+ public FcpConnection(String host) throws UnknownHostException {
+ this(host, DEFAULT_PORT);
}
- public FcpConnection(String host, int port, String clientName) throws UnknownHostException {
- this(InetAddress.getByName(host), port, clientName);
+ /**
+ * Creates a new FCP connection to the Freenet node running on the given
+ * host, listening on the given port.
+ *
+ * @param host
+ * The hostname of the Freenet node
+ * @param port
+ * The port number of the node’s FCP port
+ * @throws UnknownHostException
+ * if <code>host</code> can not be resolved
+ */
+ public FcpConnection(String host, int port) throws UnknownHostException {
+ this(InetAddress.getByName(host), port);
}
- public FcpConnection(InetAddress address, String clientName) {
- this(address, DEFAULT_PORT, clientName);
+ /**
+ * Creates a new FCP connection to the Freenet node running at the given
+ * address, listening on the default port.
+ *
+ * @param address
+ * The address of the Freenet node
+ */
+ public FcpConnection(InetAddress address) {
+ this(address, DEFAULT_PORT);
}
- public FcpConnection(InetAddress address, int port, String clientName) {
+ /**
+ * Creates a new FCP connection to the Freenet node running at the given
+ * address, listening on the given port.
+ *
+ * @param address
+ * The address of the Freenet node
+ * @param port
+ * The port number of the node’s FCP port
+ */
+ public FcpConnection(InetAddress address, int port) {
this.address = address;
this.port = port;
- this.clientName = clientName;
}
//
// LISTENER MANAGEMENT
//
+ /**
+ * Adds the given listener to the list of listeners.
+ *
+ * @param fcpListener
+ * The listener to add
+ */
public void addFcpListener(FcpListener fcpListener) {
fcpListeners.add(fcpListener);
}
+ /**
+ * Removes the given listener from the list of listeners.
+ *
+ * @param fcpListener
+ * The listener to remove
+ */
public void removeFcpListener(FcpListener fcpListener) {
fcpListeners.remove(fcpListener);
}
- private void fireNodeHello(Map<String, String> nodeProperties) {
+ /**
+ * Notifies listeners that a “NodeHello” message was received.
+ *
+ * @see FcpListener#receivedNodeHello(FcpConnection, NodeHello)
+ * @param nodeHello
+ * The “NodeHello” message
+ */
+ private void fireReceivedNodeHello(NodeHello nodeHello) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedNodeHello(this, nodeHello);
+ }
+ }
+
+ /**
+ * Notifies listeners that a “CloseConnectionDuplicateClientName” message
+ * was received.
+ *
+ * @see FcpListener#receivedCloseConnectionDuplicateClientName(FcpConnection,
+ * CloseConnectionDuplicateClientName)
+ * @param closeConnectionDuplicateClientName
+ * The “CloseConnectionDuplicateClientName” message
+ */
+ private void fireReceivedCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedCloseConnectionDuplicateClientName(this, closeConnectionDuplicateClientName);
+ }
+ }
+
+ /**
+ * Notifies listeners that a “SSKKeypair” message was received.
+ *
+ * @see FcpListener#receivedSSKKeypair(FcpConnection, SSKKeypair)
+ * @param sskKeypair
+ * The “SSKKeypair” message
+ */
+ private void fireReceivedSSKKeypair(SSKKeypair sskKeypair) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedSSKKeypair(this, sskKeypair);
+ }
+ }
+
+ /**
+ * Notifies listeners that a “Peer” message was received.
+ *
+ * @see FcpListener#receivedPeer(FcpConnection, Peer)
+ * @param peer
+ * The “Peer” message
+ */
+ private void fireReceivedPeer(Peer peer) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedPeer(this, peer);
+ }
+ }
+
+ /**
+ * Notifies all listeners that an “EndListPeers” message was received.
+ *
+ * @see FcpListener#receivedEndListPeers(FcpConnection, EndListPeers)
+ * @param endListPeers
+ * The “EndListPeers” message
+ */
+ private void fireReceivedEndListPeers(EndListPeers endListPeers) {
for (FcpListener fcpListener: fcpListeners) {
- fcpListener.fcpNodeHello(this, nodeProperties);
+ fcpListener.receivedEndListPeers(this, endListPeers);
+ }
+ }
+
+ /**
+ * Notifies all listeners that a “PeerNote” message was received.
+ *
+ * @see FcpListener#receivedPeerNote(FcpConnection, PeerNote)
+ * @param peerNote
+ */
+ private void fireReceivedPeerNote(PeerNote peerNote) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedPeerNote(this, peerNote);
+ }
+ }
+
+ /**
+ * Notifies all listeners that an “EndListPeerNotes” message was received.
+ *
+ * @see FcpListener#receivedEndListPeerNotes(FcpConnection,
+ * EndListPeerNotes)
+ * @param endListPeerNotes
+ * The “EndListPeerNotes” message
+ */
+ private void fireReceivedEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedEndListPeerNotes(this, endListPeerNotes);
+ }
+ }
+
+ /**
+ * Notifies all listeners that a “PeerRemoved” message was received.
+ *
+ * @see FcpListener#receivedPeerRemoved(FcpConnection, PeerRemoved)
+ * @param peerRemoved
+ * The “PeerRemoved” message
+ */
+ private void fireReceivedPeerRemoved(PeerRemoved peerRemoved) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedPeerRemoved(this, peerRemoved);
+ }
+ }
+
+ /**
+ * Notifies all listeners that a “NodeData” message was received.
+ *
+ * @see FcpListener#receivedNodeData(FcpConnection, NodeData)
+ * @param nodeData
+ * The “NodeData” message
+ */
+ private void fireReceivedNodeData(NodeData nodeData) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedNodeData(this, nodeData);
+ }
+ }
+
+ /**
+ * Notifies all listeners that a “TestDDAReply” message was received.
+ *
+ * @see FcpListener#receivedTestDDAReply(FcpConnection, TestDDAReply)
+ * @param testDDAReply
+ * The “TestDDAReply” message
+ */
+ private void fireReceivedTestDDAReply(TestDDAReply testDDAReply) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedTestDDAReply(this, testDDAReply);
+ }
+ }
+
+ /**
+ * Notifies all listeners that a “TestDDAComplete” message was received.
+ *
+ * @see FcpListener#receivedTestDDAComplete(FcpConnection, TestDDAComplete)
+ * @param testDDAComplete
+ * The “TestDDAComplete” message
+ */
+ private void fireReceivedTestDDAComplete(TestDDAComplete testDDAComplete) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedTestDDAComplete(this, testDDAComplete);
+ }
+ }
+
+ /**
+ * Notifies all listeners that a “PersistentGet” message was received.
+ *
+ * @param persistentGet
+ * The “PersistentGet” message
+ */
+ private void fireReceivedPersistentGet(PersistentGet persistentGet) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedPersistentGet(this, persistentGet);
+ }
+ }
+
+ /**
+ * Notifies all listeners that a “PersistentPut” message was received.
+ *
+ * @see FcpListener#receivedPersistentPut(FcpConnection, PersistentPut)
+ * @param persistentPut
+ * The “PersistentPut” message
+ */
+ private void fireReceivedPersistentPut(PersistentPut persistentPut) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedPersistentPut(this, persistentPut);
+ }
+ }
+
+ /**
+ * Notifies all listeners that a “EndListPersistentRequests” message was
+ * received.
+ *
+ * @param endListPersistentRequests
+ * The “EndListPersistentRequests” message
+ */
+ private void fireReceivedEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedEndListPersistentRequests(this, endListPersistentRequests);
+ }
+ }
+
+ /**
+ * Notifies all listeners that a “URIGenerated” message was received.
+ *
+ * @param uriGenerated
+ * The “URIGenerated” message
+ */
+ private void fireReceivedURIGenerated(URIGenerated uriGenerated) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedURIGenerated(this, uriGenerated);
+ }
+ }
+
+ /**
+ * Notifies all listeners that a “DataFound” message was received.
+ *
+ * @param dataFound
+ * The “DataFound” message
+ */
+ private void fireReceivedDataFound(DataFound dataFound) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedDataFound(this, dataFound);
+ }
+ }
+
+ /**
+ * Notifies all listeners that an “AllData” message was received.
+ *
+ * @param allData
+ * The “AllData” message
+ */
+ private void fireReceivedAllData(AllData allData) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedAllData(this, allData);
+ }
+ }
+
+ /**
+ * Notifies all listeners that a “SimpleProgress” message was received.
+ *
+ * @param simpleProgress
+ * The “SimpleProgress” message
+ */
+ private void fireReceivedSimpleProgress(SimpleProgress simpleProgress) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedSimpleProgress(this, simpleProgress);
+ }
+ }
+
+ /**
+ * Notifies all listeners that a “StartedCompression” message was received.
+ *
+ * @param startedCompression
+ * The “StartedCompression” message
+ */
+ private void fireReceivedStartedCompression(StartedCompression startedCompression) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedStartedCompression(this, startedCompression);
+ }
+ }
+
+ /**
+ * Notifies all listeners that a “FinishedCompression” message was received.
+ *
+ * @param finishedCompression
+ * The “FinishedCompression” message
+ */
+ private void fireReceivedFinishedCompression(FinishedCompression finishedCompression) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receviedFinishedCompression(this, finishedCompression);
+ }
+ }
+
+ /**
+ * Notifies all listeners that an “UnknownPeerNoteType” message was
+ * received.
+ *
+ * @param unknownPeerNoteType
+ * The “UnknownPeerNoteType” message
+ */
+ private void fireReceivedUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedUnknownPeerNoteType(this, unknownPeerNoteType);
+ }
+ }
+
+ /**
+ * Notifies all listeners that an “UnknownNodeIdentifier” message was
+ * received.
+ *
+ * @param unknownNodeIdentifier
+ * The “UnknownNodeIdentifier” message
+ */
+ private void fireReceivedUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedUnknownNodeIdentifier(this, unknownNodeIdentifier);
+ }
+ }
+
+ /**
+ * Notifies all listeners that a “ConfigData” message was received.
+ *
+ * @param configData
+ * The “ConfigData” message
+ */
+ private void fireReceivedConfigData(ConfigData configData) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedConfigData(this, configData);
+ }
+ }
+
+ /**
+ * Notifies all listeners that a “GetFailed” message was received.
+ *
+ * @param getFailed
+ * The “GetFailed” message
+ */
+ private void fireReceivedGetFailed(GetFailed getFailed) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedGetFailed(this, getFailed);
+ }
+ }
+
+ /**
+ * Notifies all listeners that a “PutFailed” message was received.
+ *
+ * @param putFailed
+ * The “PutFailed” message
+ */
+ private void fireReceivedPutFailed(PutFailed putFailed) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedPutFailed(this, putFailed);
+ }
+ }
+
+ /**
+ * Notifies all listeners that an “IdentifierCollision” message was
+ * received.
+ *
+ * @param identifierCollision
+ * The “IdentifierCollision” message
+ */
+ private void fireReceivedIdentifierCollision(IdentifierCollision identifierCollision) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedIdentifierCollision(this, identifierCollision);
+ }
+ }
+
+ /**
+ * Notifies all listeners that a “ProtocolError” message was received.
+ *
+ * @param protocolError
+ * The “ProtocolError” message
+ */
+ private void fireReceivedProtocolError(ProtocolError protocolError) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedProtocolError(this, protocolError);
+ }
+ }
+
+ /**
+ * Notifies all registered listeners that a message has been received.
+ *
+ * @see FcpListener#receivedMessage(FcpConnection, FcpMessage)
+ * @param fcpMessage
+ * The message that was received
+ */
+ private void fireMessageReceived(FcpMessage fcpMessage) {
+ for (FcpListener fcpListener: fcpListeners) {
+ fcpListener.receivedMessage(this, fcpMessage);
}
}
// ACTIONS
//
- public synchronized void connect() throws FcpException, IOException {
- System.out.println("connecting...");
+ /**
+ * Connects to the node.
+ *
+ * @throws IOException
+ * if an I/O error occurs
+ * @throws IllegalStateException
+ * if there is already a connection to the node
+ */
+ public synchronized void connect() throws IOException, IllegalStateException {
+ if (connectionHandler != null) {
+ throw new IllegalStateException("already connected, disconnect first");
+ }
remoteSocket = new Socket(address, port);
remoteInputStream = remoteSocket.getInputStream();
remoteOutputStream = remoteSocket.getOutputStream();
- connected = true;
- System.out.println("connected.");
- new Thread(new FcpConnectionHandler(this, remoteInputStream)).start();
- sendMessage(clientHelloMessage);
+ new Thread(connectionHandler = new FcpConnectionHandler(this, remoteInputStream)).start();
}
+ /**
+ * Disconnects from the node. If there is no connection to the node, this
+ * method does nothing.
+ */
public synchronized void disconnect() {
- connected = false;
+ if (connectionHandler == null) {
+ return;
+ }
Closer.close(remoteSocket);
+ connectionHandler.stop();
+ connectionHandler = null;
}
/**
- * Sends a “ListPeer” command to the node and returns the properties of the
- * peer.
+ * Sends the given FCP message.
*
- * @param nodeIdentifier
- * The name (except for OpenNet nodes), the identity or the
- * node’s “address:port” pair
- * @return The properties of the peer, or <code>null</code> if the peer is
- * unknown
+ * @param fcpMessage
+ * The FCP message to send
* @throws IOException
- * @throws FcpException
- */
- public Map<String, String> sendListPeer(String nodeIdentifier) throws IOException, FcpException {
- FcpMessage listPeerMessage = new FcpMessage("ListPeer");
- listPeerMessage.setField("NodeIdentifier", nodeIdentifier);
- sendMessage(listPeerMessage);
- FcpMessage returnMessage = waitForMessage("Peer", "UnknownNodeIdentifier");
- if (returnMessage.getName().equals("Peer")) {
- return returnMessage.getFields();
- }
- return null;
- }
-
- public List<Map<String, String>> sendListPeers(boolean withMetadata, boolean withVolatile) throws IOException, FcpException {
- FcpMessage listPeersMessage = new FcpMessage("ListPeers");
- listPeersMessage.setField("WithMetadata", String.valueOf(withMetadata));
- listPeersMessage.setField("WithVolatile", String.valueOf(withVolatile));
- sendMessage(listPeersMessage);
- List<Map<String, String>> peers = new ArrayList<Map<String, String>>();
- while (true) {
- FcpMessage returnMessage = waitForMessage("Peer", "EndListPeers");
- if (returnMessage.getName().equals("EndListPeers")) {
- break;
- }
- peers.add(returnMessage.getFields());
- }
- return peers;
+ * if an I/O error occurs
+ */
+ public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException {
+ System.out.println("sending message: " + fcpMessage.getName());
+ fcpMessage.write(remoteOutputStream);
}
- public List<Map<String, String>> sendListPeerNotes(String nodeIdentifier) throws IOException, FcpException {
- FcpMessage listPeerNotesMessage = new FcpMessage("ListPeerNotes");
- listPeerNotesMessage.setField("NodeIdentifier", nodeIdentifier);
- sendMessage(listPeerNotesMessage);
- List<Map<String, String>> peerNotes = new ArrayList<Map<String, String>>();
- while (true) {
- FcpMessage returnMessage = waitForMessage("PeerNote", "EndListPeerNotes");
- if (returnMessage.getName().equals("EndListPeerNotes")) {
- break;
- }
- peerNotes.add(returnMessage.getFields());
- }
- return peerNotes;
- }
-
- public void sendTestDDARequest(String directory, boolean wantReadDirectory, boolean wantWriteDirectory) throws IOException, FcpException {
- FcpMessage testDDARequestMessage = new FcpMessage("TestDDARequest");
- testDDARequestMessage.setField("Directory", directory);
- testDDARequestMessage.setField("WantReadDirectory", String.valueOf(wantReadDirectory));
- testDDARequestMessage.setField("WantWriteDirectory", String.valueOf(wantWriteDirectory));
- sendMessage(testDDARequestMessage);
- }
-
- public FcpKeyPair generateSSK() throws IOException, FcpException {
- FcpMessage generateSSKMessage = new FcpMessage("GenerateSSK");
- String identifier = hashCode() + String.valueOf(System.currentTimeMillis());
- generateSSKMessage.setField("Identifier", identifier);
- sendMessage(generateSSKMessage);
- FcpMessage returnMessage = waitForMessage("SSKKeypair(Identifier=" + identifier + ")");
- String publicKey = returnMessage.getField("RequestURI");
- String privateKey = returnMessage.getField("InsertURI");
- return new FcpKeyPair(publicKey, privateKey);
- }
-
//
// PACKAGE-PRIVATE METHODS
//
+ /**
+ * Handles the given message, notifying listeners. This message should only
+ * be called by {@link FcpConnectionHandler}.
+ *
+ * @param fcpMessage
+ * The received message
+ */
void handleMessage(FcpMessage fcpMessage) {
- synchronized (messageWaitSync) {
- while (receivedMessage != null) {
- /* previous message has not yet been consumed */
- System.out.println("waiting for message to be consumed...");
- try {
- messageWaitSync.wait();
- } catch (InterruptedException ie1) {
- }
+ String messageName = fcpMessage.getName();
+ countMessage(messageName);
+ if ("SimpleProgress".equals(messageName)) {
+ fireReceivedSimpleProgress(new SimpleProgress(fcpMessage));
+ } else if ("ProtocolError".equals(messageName)) {
+ fireReceivedProtocolError(new ProtocolError(fcpMessage));
+ } else if ("PersistentGet".equals(messageName)) {
+ fireReceivedPersistentGet(new PersistentGet(fcpMessage));
+ } else if ("PersistentPut".equals(messageName)) {
+ fireReceivedPersistentPut(new PersistentPut(fcpMessage));
+ } else if ("URIGenerated".equals(messageName)) {
+ fireReceivedURIGenerated(new URIGenerated(fcpMessage));
+ } else if ("EndListPersistentRequests".equals(messageName)) {
+ fireReceivedEndListPersistentRequests(new EndListPersistentRequests(fcpMessage));
+ } else if ("Peer".equals(messageName)) {
+ fireReceivedPeer(new Peer(fcpMessage));
+ } else if ("PeerNote".equals(messageName)) {
+ fireReceivedPeerNote(new PeerNote(fcpMessage));
+ } else if ("StartedCompression".equals(messageName)) {
+ fireReceivedStartedCompression(new StartedCompression(fcpMessage));
+ } else if ("FinishedCompression".equals(messageName)) {
+ fireReceivedFinishedCompression(new FinishedCompression(fcpMessage));
+ } else if ("GetFailed".equals(messageName)) {
+ fireReceivedGetFailed(new GetFailed(fcpMessage));
+ } else if ("PutFailed".equals(messageName)) {
+ fireReceivedPutFailed(new PutFailed(fcpMessage));
+ } else if ("DataFound".equals(messageName)) {
+ fireReceivedDataFound(new DataFound(fcpMessage));
+ } else if ("IdentifierCollision".equals(messageName)) {
+ fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage));
+ } else if ("AllData".equals(messageName)) {
+ long dataLength;
+ try {
+ dataLength = Long.valueOf(fcpMessage.getField("DataLength"));
+ } catch (NumberFormatException nfe1) {
+ dataLength = -1;
}
- /* TODO - check whether to send events here or later. */
- if ("NodeHello".equals(fcpMessage.getName())) {
- fireNodeHello(fcpMessage.getFields());
+ LimitedInputStream payloadInputStream = new LimitedInputStream(remoteInputStream, dataLength);
+ fireReceivedAllData(new AllData(fcpMessage, payloadInputStream));
+ try {
+ payloadInputStream.consume();
+ } catch (IOException ioe1) {
+ /* FIXME - what now? */
+ /* well, ignore. when the connection handler fails, all fails. */
}
- System.out.println("setting receivedMessage");
- receivedMessage = fcpMessage;
- messageWaitSync.notifyAll();
+ } else if ("EndListPeerNotes".equals(messageName)) {
+ fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage));
+ } else if ("EndListPeers".equals(messageName)) {
+ fireReceivedEndListPeers(new EndListPeers(fcpMessage));
+ } else if ("SSKKeypair".equals(messageName)) {
+ fireReceivedSSKKeypair(new SSKKeypair(fcpMessage));
+ } else if ("PeerRemoved".equals(messageName)) {
+ fireReceivedPeerRemoved(new PeerRemoved(fcpMessage));
+ } else if ("UnknownPeerNoteType".equals(messageName)) {
+ fireReceivedUnknownPeerNoteType(new UnknownPeerNoteType(fcpMessage));
+ } else if ("UnknownNodeIdentifier".equals(messageName)) {
+ fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage));
+ } else if ("NodeData".equals(messageName)) {
+ fireReceivedNodeData(new NodeData(fcpMessage));
+ } else if ("TestDDAReply".equals(messageName)) {
+ fireReceivedTestDDAReply(new TestDDAReply(fcpMessage));
+ } else if ("TestDDAComplete".equals(messageName)) {
+ fireReceivedTestDDAComplete(new TestDDAComplete(fcpMessage));
+ } else if ("ConfigData".equals(messageName)) {
+ fireReceivedConfigData(new ConfigData(fcpMessage));
+ } else if ("NodeHello".equals(messageName)) {
+ fireReceivedNodeHello(new NodeHello(fcpMessage));
+ } else if ("CloseConnectionDuplicateClientName".equals(messageName)) {
+ fireReceivedCloseConnectionDuplicateClientName(new CloseConnectionDuplicateClientName(fcpMessage));
+ } else {
+ fireMessageReceived(fcpMessage);
}
}
+ /**
+ * Handles a disconnect from the node.
+ */
+ synchronized void handleDisconnect() {
+ Closer.close(remoteInputStream);
+ Closer.close(remoteOutputStream);
+ Closer.close(remoteSocket);
+ connectionHandler = null;
+ }
+
//
// PRIVATE METHODS
//
- public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException {
- System.out.println("sending message: " + fcpMessage.getName());
- fcpMessage.write(remoteOutputStream);
- }
-
- public FcpMessage waitForMessage(String... messageNames) throws FcpException {
- FcpMessage oldMessage = null;
- synchronized (messageWaitSync) {
- while (true) {
- while (receivedMessage == oldMessage) {
- System.out.println("waiting for receivedMessage");
- try {
- messageWaitSync.wait();
- } catch (InterruptedException ie1) {
- }
- }
- System.out.println("got message: " + receivedMessage.getName());
- String receivedMessageName = receivedMessage.getName();
- if ("ProtocolError".equals(receivedMessageName)) {
- int code = Integer.valueOf(receivedMessage.getField("Code"));
- boolean fatal = Boolean.valueOf(receivedMessage.getField("Fatal"));
- boolean global = Boolean.valueOf(receivedMessage.getField("Global"));
- String codeDescription = receivedMessage.getField("CodeDescription");
- String extraDescription = receivedMessage.getField("ExtraDescription");
- String identifier = receivedMessage.getField("Identifier");
- FcpProtocolException fcpProtocolException = new FcpProtocolException(code, fatal, global);
- fcpProtocolException.setCodeDescription(codeDescription);
- fcpProtocolException.setExtraDescription(extraDescription);
- fcpProtocolException.setIdentifier(identifier);
- throw fcpProtocolException;
- }
- for (String messageName: messageNames) {
- int firstBracket = messageName.indexOf('(');
- Map<String, String> wantedIdentifiers = new HashMap<String, String>();
- if (firstBracket > -1) {
- StringTokenizer identifierTokens = new StringTokenizer(messageName.substring(firstBracket), "()");
- while (identifierTokens.hasMoreTokens()) {
- String identifierToken = identifierTokens.nextToken();
- int equalSign = identifierToken.indexOf('=');
- if (equalSign > -1) {
- wantedIdentifiers.put(identifierToken.substring(0, equalSign), identifierToken.substring(equalSign + 1));
- }
- }
- messageName = messageName.substring(0, firstBracket);
- }
- if (receivedMessageName.equals(messageName)) {
- boolean found = true;
- for (Entry<String, String> wantedIdentifier: wantedIdentifiers.entrySet()) {
- System.out.println("key: " + wantedIdentifier.getKey() + ", value: " + wantedIdentifier.getValue() + ", msg: " + receivedMessage.getField(wantedIdentifier.getKey()));
- if (!wantedIdentifier.getValue().equals(receivedMessage.getField(wantedIdentifier.getKey()))) {
- found = false;
- break;
- }
- }
- if (found) {
- System.out.println("message found");
- FcpMessage foundMessage = receivedMessage;
- receivedMessage = null;
- messageWaitSync.notifyAll();
- return foundMessage;
- }
- }
- }
- oldMessage = receivedMessage;
- }
+ /**
+ * Incremets the counter in {@link #incomingMessageStatistics} by <cod>1</code>
+ * for the given message name.
+ *
+ * @param name
+ * The name of the message to count
+ */
+ private void countMessage(String name) {
+ int oldValue = 0;
+ if (incomingMessageStatistics.containsKey(name)) {
+ oldValue = incomingMessageStatistics.get(name);
}
+ incomingMessageStatistics.put(name, oldValue + 1);
}
}