X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fnet%2Fpterodactylus%2Futil%2Ffcp%2FFcpConnection.java;h=12d52af112c0cfe360984762cf256a2bc98e1655;hb=561d15670607ad2fae532e71dfc25b35848f0775;hp=484b46c8060e4326177e432f1b52188201d3d8bd;hpb=72a6784258f756eb23b39a9f8021b400829cbcec;p=jSite2.git diff --git a/src/net/pterodactylus/util/fcp/FcpConnection.java b/src/net/pterodactylus/util/fcp/FcpConnection.java index 484b46c..12d52af 100644 --- a/src/net/pterodactylus/util/fcp/FcpConnection.java +++ b/src/net/pterodactylus/util/fcp/FcpConnection.java @@ -26,55 +26,111 @@ import java.net.InetAddress; import java.net.Socket; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.StringTokenizer; -import java.util.Map.Entry; import net.pterodactylus.util.io.Closer; +import net.pterodactylus.util.io.LimitedInputStream; /** - * TODO + * An FCP connection to a Freenet node. * * @author David ‘Bombe’ Roden <bombe@freenetproject.org> * @version $Id$ */ public class FcpConnection { + /** The default port for FCP v2. */ public static final int DEFAULT_PORT = 9481; - private final Object messageWaitSync = new Object(); - private FcpMessage receivedMessage = null; - + /** The list of FCP listeners. */ private final List fcpListeners = new ArrayList(); + /** The address of the node. */ private final InetAddress address; + + /** The port number of the node’s FCP port. */ private final int port; - private final String clientName; + /** The remote socket. */ private Socket remoteSocket; + + /** The input stream from the node. */ private InputStream remoteInputStream; + + /** The output stream to the node. */ private OutputStream remoteOutputStream; + + /** The connection handler. */ private FcpConnectionHandler connectionHandler; - private boolean connected; - public FcpConnection(String host, String clientName) throws UnknownHostException { - this(host, DEFAULT_PORT, clientName); + /** Incoming message statistics. */ + private Map incomingMessageStatistics = Collections.synchronizedMap(new HashMap()); + + /** + * Creates a new FCP connection to the freenet node running on localhost, + * using the default port. + * + * @throws UnknownHostException + * if the hostname can not be resolved + */ + public FcpConnection() throws UnknownHostException { + this(InetAddress.getLocalHost()); } - public FcpConnection(String host, int port, String clientName) throws UnknownHostException { - this(InetAddress.getByName(host), port, clientName); + /** + * Creates a new FCP connection to the Freenet node running on the given + * host, listening on the default port. + * + * @param host + * The hostname of the Freenet node + * @throws UnknownHostException + * if host can not be resolved + */ + public FcpConnection(String host) throws UnknownHostException { + this(host, DEFAULT_PORT); } - public FcpConnection(InetAddress address, String clientName) { - this(address, DEFAULT_PORT, clientName); + /** + * Creates a new FCP connection to the Freenet node running on the given + * host, listening on the given port. + * + * @param host + * The hostname of the Freenet node + * @param port + * The port number of the node’s FCP port + * @throws UnknownHostException + * if host can not be resolved + */ + public FcpConnection(String host, int port) throws UnknownHostException { + this(InetAddress.getByName(host), port); } - public FcpConnection(InetAddress address, int port, String clientName) { + /** + * Creates a new FCP connection to the Freenet node running at the given + * address, listening on the default port. + * + * @param address + * The address of the Freenet node + */ + public FcpConnection(InetAddress address) { + this(address, DEFAULT_PORT); + } + + /** + * Creates a new FCP connection to the Freenet node running at the given + * address, listening on the given port. + * + * @param address + * The address of the Freenet node + * @param port + * The port number of the node’s FCP port + */ + public FcpConnection(InetAddress address, int port) { this.address = address; this.port = port; - this.clientName = clientName; } // @@ -102,6 +158,397 @@ public class FcpConnection { } /** + * Notifies listeners that a “NodeHello” message was received. + * + * @see FcpListener#receivedNodeHello(FcpConnection, NodeHello) + * @param nodeHello + * The “NodeHello” message + */ + private void fireReceivedNodeHello(NodeHello nodeHello) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedNodeHello(this, nodeHello); + } + } + + /** + * Notifies listeners that a “CloseConnectionDuplicateClientName” message + * was received. + * + * @see FcpListener#receivedCloseConnectionDuplicateClientName(FcpConnection, + * CloseConnectionDuplicateClientName) + * @param closeConnectionDuplicateClientName + * The “CloseConnectionDuplicateClientName” message + */ + private void fireReceivedCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedCloseConnectionDuplicateClientName(this, closeConnectionDuplicateClientName); + } + } + + /** + * Notifies listeners that a “SSKKeypair” message was received. + * + * @see FcpListener#receivedSSKKeypair(FcpConnection, SSKKeypair) + * @param sskKeypair + * The “SSKKeypair” message + */ + private void fireReceivedSSKKeypair(SSKKeypair sskKeypair) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedSSKKeypair(this, sskKeypair); + } + } + + /** + * Notifies listeners that a “Peer” message was received. + * + * @see FcpListener#receivedPeer(FcpConnection, Peer) + * @param peer + * The “Peer” message + */ + private void fireReceivedPeer(Peer peer) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPeer(this, peer); + } + } + + /** + * Notifies all listeners that an “EndListPeers” message was received. + * + * @see FcpListener#receivedEndListPeers(FcpConnection, EndListPeers) + * @param endListPeers + * The “EndListPeers” message + */ + private void fireReceivedEndListPeers(EndListPeers endListPeers) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedEndListPeers(this, endListPeers); + } + } + + /** + * Notifies all listeners that a “PeerNote” message was received. + * + * @see FcpListener#receivedPeerNote(FcpConnection, PeerNote) + * @param peerNote + */ + private void fireReceivedPeerNote(PeerNote peerNote) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPeerNote(this, peerNote); + } + } + + /** + * Notifies all listeners that an “EndListPeerNotes” message was received. + * + * @see FcpListener#receivedEndListPeerNotes(FcpConnection, + * EndListPeerNotes) + * @param endListPeerNotes + * The “EndListPeerNotes” message + */ + private void fireReceivedEndListPeerNotes(EndListPeerNotes endListPeerNotes) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedEndListPeerNotes(this, endListPeerNotes); + } + } + + /** + * Notifies all listeners that a “PeerRemoved” message was received. + * + * @see FcpListener#receivedPeerRemoved(FcpConnection, PeerRemoved) + * @param peerRemoved + * The “PeerRemoved” message + */ + private void fireReceivedPeerRemoved(PeerRemoved peerRemoved) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPeerRemoved(this, peerRemoved); + } + } + + /** + * Notifies all listeners that a “NodeData” message was received. + * + * @see FcpListener#receivedNodeData(FcpConnection, NodeData) + * @param nodeData + * The “NodeData” message + */ + private void fireReceivedNodeData(NodeData nodeData) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedNodeData(this, nodeData); + } + } + + /** + * Notifies all listeners that a “TestDDAReply” message was received. + * + * @see FcpListener#receivedTestDDAReply(FcpConnection, TestDDAReply) + * @param testDDAReply + * The “TestDDAReply” message + */ + private void fireReceivedTestDDAReply(TestDDAReply testDDAReply) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedTestDDAReply(this, testDDAReply); + } + } + + /** + * Notifies all listeners that a “TestDDAComplete” message was received. + * + * @see FcpListener#receivedTestDDAComplete(FcpConnection, TestDDAComplete) + * @param testDDAComplete + * The “TestDDAComplete” message + */ + private void fireReceivedTestDDAComplete(TestDDAComplete testDDAComplete) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedTestDDAComplete(this, testDDAComplete); + } + } + + /** + * Notifies all listeners that a “PersistentGet” message was received. + * + * @param persistentGet + * The “PersistentGet” message + */ + private void fireReceivedPersistentGet(PersistentGet persistentGet) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPersistentGet(this, persistentGet); + } + } + + /** + * Notifies all listeners that a “PersistentPut” message was received. + * + * @see FcpListener#receivedPersistentPut(FcpConnection, PersistentPut) + * @param persistentPut + * The “PersistentPut” message + */ + private void fireReceivedPersistentPut(PersistentPut persistentPut) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPersistentPut(this, persistentPut); + } + } + + /** + * Notifies all listeners that a “EndListPersistentRequests” message was + * received. + * + * @param endListPersistentRequests + * The “EndListPersistentRequests” message + */ + private void fireReceivedEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedEndListPersistentRequests(this, endListPersistentRequests); + } + } + + /** + * Notifies all listeners that a “URIGenerated” message was received. + * + * @param uriGenerated + * The “URIGenerated” message + */ + private void fireReceivedURIGenerated(URIGenerated uriGenerated) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedURIGenerated(this, uriGenerated); + } + } + + /** + * Notifies all listeners that a “DataFound” message was received. + * + * @param dataFound + * The “DataFound” message + */ + private void fireReceivedDataFound(DataFound dataFound) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedDataFound(this, dataFound); + } + } + + /** + * Notifies all listeners that an “AllData” message was received. + * + * @param allData + * The “AllData” message + */ + private void fireReceivedAllData(AllData allData) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedAllData(this, allData); + } + } + + /** + * Notifies all listeners that a “SimpleProgress” message was received. + * + * @param simpleProgress + * The “SimpleProgress” message + */ + private void fireReceivedSimpleProgress(SimpleProgress simpleProgress) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedSimpleProgress(this, simpleProgress); + } + } + + /** + * Notifies all listeners that a “StartedCompression” message was received. + * + * @param startedCompression + * The “StartedCompression” message + */ + private void fireReceivedStartedCompression(StartedCompression startedCompression) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedStartedCompression(this, startedCompression); + } + } + + /** + * Notifies all listeners that a “FinishedCompression” message was received. + * + * @param finishedCompression + * The “FinishedCompression” message + */ + private void fireReceivedFinishedCompression(FinishedCompression finishedCompression) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receviedFinishedCompression(this, finishedCompression); + } + } + + /** + * Notifies all listeners that an “UnknownPeerNoteType” message was + * received. + * + * @param unknownPeerNoteType + * The “UnknownPeerNoteType” message + */ + private void fireReceivedUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedUnknownPeerNoteType(this, unknownPeerNoteType); + } + } + + /** + * Notifies all listeners that an “UnknownNodeIdentifier” message was + * received. + * + * @param unknownNodeIdentifier + * The “UnknownNodeIdentifier” message + */ + private void fireReceivedUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedUnknownNodeIdentifier(this, unknownNodeIdentifier); + } + } + + /** + * Notifies all listeners that a “ConfigData” message was received. + * + * @param configData + * The “ConfigData” message + */ + private void fireReceivedConfigData(ConfigData configData) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedConfigData(this, configData); + } + } + + /** + * Notifies all listeners that a “GetFailed” message was received. + * + * @param getFailed + * The “GetFailed” message + */ + private void fireReceivedGetFailed(GetFailed getFailed) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedGetFailed(this, getFailed); + } + } + + /** + * Notifies all listeners that a “PutFailed” message was received. + * + * @param putFailed + * The “PutFailed” message + */ + private void fireReceivedPutFailed(PutFailed putFailed) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPutFailed(this, putFailed); + } + } + + /** + * Notifies all listeners that an “IdentifierCollision” message was + * received. + * + * @param identifierCollision + * The “IdentifierCollision” message + */ + private void fireReceivedIdentifierCollision(IdentifierCollision identifierCollision) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedIdentifierCollision(this, identifierCollision); + } + } + + /** + * Notifies all listeners that an “PersistentPutDir” message was received. + * + * @param persistentPutDir + * The “PersistentPutDir” message + */ + private void fireReceivedPersistentPutDir(PersistentPutDir persistentPutDir) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPersistentPutDir(this, persistentPutDir); + } + } + + /** + * Notifies all listeners that a “PersistentRequestRemoved” message was + * received. + * + * @param persistentRequestRemoved + * The “PersistentRequestRemoved” message + */ + private void fireReceivedPersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPersistentRequestRemoved(this, persistentRequestRemoved); + } + } + + /** + * Notifies all listeners that a “SubscribedUSKUpdate” message was received. + * + * @param subscribedUSKUpdate + * The “SubscribedUSKUpdate” message + */ + private void fireReceivedSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedSubscribedUSKUpdate(this, subscribedUSKUpdate); + } + } + + /** + * Notifies all listeners that a “PluginInfo” message was received. + * + * @param pluginInfo + * The “PluginInfo” message + */ + private void fireReceivedPluginInfo(PluginInfo pluginInfo) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPluginInfo(this, pluginInfo); + } + } + + /** + * Notifies all listeners that a “ProtocolError” message was received. + * + * @param protocolError + * The “ProtocolError” message + */ + private void fireReceivedProtocolError(ProtocolError protocolError) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedProtocolError(this, protocolError); + } + } + + /** * Notifies all registered listeners that a message has been received. * * @see FcpListener#receivedMessage(FcpConnection, FcpMessage) @@ -133,7 +580,6 @@ public class FcpConnection { remoteSocket = new Socket(address, port); remoteInputStream = remoteSocket.getInputStream(); remoteOutputStream = remoteSocket.getOutputStream(); - connected = true; new Thread(connectionHandler = new FcpConnectionHandler(this, remoteInputStream)).start(); } @@ -145,159 +591,152 @@ public class FcpConnection { if (connectionHandler == null) { return; } - connected = false; Closer.close(remoteSocket); connectionHandler.stop(); connectionHandler = null; } + /** + * Sends the given FCP message. + * + * @param fcpMessage + * The FCP message to send + * @throws IOException + * if an I/O error occurs + */ public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException { System.out.println("sending message: " + fcpMessage.getName()); fcpMessage.write(remoteOutputStream); } + // + // PACKAGE-PRIVATE METHODS + // + /** - * Sends a “ListPeer” command to the node and returns the properties of the - * peer. + * Handles the given message, notifying listeners. This message should only + * be called by {@link FcpConnectionHandler}. * - * @param nodeIdentifier - * The name (except for OpenNet nodes), the identity or the - * node’s “address:port” pair - * @return The properties of the peer, or null if the peer is - * unknown - * @throws IOException - * @throws FcpException + * @param fcpMessage + * The received message */ - public Map sendListPeer(String nodeIdentifier) throws IOException, FcpException { - FcpMessage listPeerMessage = new FcpMessage("ListPeer"); - listPeerMessage.setField("NodeIdentifier", nodeIdentifier); - sendMessage(listPeerMessage); - FcpMessage returnMessage = waitForMessage("Peer", "UnknownNodeIdentifier"); - if (returnMessage.getName().equals("Peer")) { - return returnMessage.getFields(); - } - return null; - } - void handleMessage(FcpMessage fcpMessage) { - fireMessageReceived(fcpMessage); - } - - public List> sendListPeers(boolean withMetadata, boolean withVolatile) throws IOException, FcpException { - FcpMessage listPeersMessage = new FcpMessage("ListPeers"); - listPeersMessage.setField("WithMetadata", String.valueOf(withMetadata)); - listPeersMessage.setField("WithVolatile", String.valueOf(withVolatile)); - sendMessage(listPeersMessage); - List> peers = new ArrayList>(); - while (true) { - FcpMessage returnMessage = waitForMessage("Peer", "EndListPeers"); - if (returnMessage.getName().equals("EndListPeers")) { - break; - } - peers.add(returnMessage.getFields()); - } - return peers; - } - - public List> sendListPeerNotes(String nodeIdentifier) throws IOException, FcpException { - FcpMessage listPeerNotesMessage = new FcpMessage("ListPeerNotes"); - listPeerNotesMessage.setField("NodeIdentifier", nodeIdentifier); - sendMessage(listPeerNotesMessage); - List> peerNotes = new ArrayList>(); - while (true) { - FcpMessage returnMessage = waitForMessage("PeerNote", "EndListPeerNotes"); - if (returnMessage.getName().equals("EndListPeerNotes")) { - break; + String messageName = fcpMessage.getName(); + countMessage(messageName); + if ("SimpleProgress".equals(messageName)) { + fireReceivedSimpleProgress(new SimpleProgress(fcpMessage)); + } else if ("ProtocolError".equals(messageName)) { + fireReceivedProtocolError(new ProtocolError(fcpMessage)); + } else if ("PersistentGet".equals(messageName)) { + fireReceivedPersistentGet(new PersistentGet(fcpMessage)); + } else if ("PersistentPut".equals(messageName)) { + fireReceivedPersistentPut(new PersistentPut(fcpMessage)); + } else if ("PersistentPutDir".equals(messageName)) { + fireReceivedPersistentPutDir(new PersistentPutDir(fcpMessage)); + } else if ("URIGenerated".equals(messageName)) { + fireReceivedURIGenerated(new URIGenerated(fcpMessage)); + } else if ("EndListPersistentRequests".equals(messageName)) { + fireReceivedEndListPersistentRequests(new EndListPersistentRequests(fcpMessage)); + } else if ("Peer".equals(messageName)) { + fireReceivedPeer(new Peer(fcpMessage)); + } else if ("PeerNote".equals(messageName)) { + fireReceivedPeerNote(new PeerNote(fcpMessage)); + } else if ("StartedCompression".equals(messageName)) { + fireReceivedStartedCompression(new StartedCompression(fcpMessage)); + } else if ("FinishedCompression".equals(messageName)) { + fireReceivedFinishedCompression(new FinishedCompression(fcpMessage)); + } else if ("GetFailed".equals(messageName)) { + fireReceivedGetFailed(new GetFailed(fcpMessage)); + } else if ("PutFailed".equals(messageName)) { + fireReceivedPutFailed(new PutFailed(fcpMessage)); + } else if ("DataFound".equals(messageName)) { + fireReceivedDataFound(new DataFound(fcpMessage)); + } else if ("SubscribedUSKUpdate".equals(messageName)) { + fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage)); + } else if ("IdentifierCollision".equals(messageName)) { + fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage)); + } else if ("AllData".equals(messageName)) { + LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); + fireReceivedAllData(new AllData(fcpMessage, payloadInputStream)); + try { + payloadInputStream.consume(); + } catch (IOException ioe1) { + /* well, ignore. when the connection handler fails, all fails. */ } - peerNotes.add(returnMessage.getFields()); + } else if ("EndListPeerNotes".equals(messageName)) { + fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage)); + } else if ("EndListPeers".equals(messageName)) { + fireReceivedEndListPeers(new EndListPeers(fcpMessage)); + } else if ("SSKKeypair".equals(messageName)) { + fireReceivedSSKKeypair(new SSKKeypair(fcpMessage)); + } else if ("PeerRemoved".equals(messageName)) { + fireReceivedPeerRemoved(new PeerRemoved(fcpMessage)); + } else if ("PersistentRequestRemoved".equals(messageName)) { + fireReceivedPersistentRequestRemoved(new PersistentRequestRemoved(fcpMessage)); + } else if ("UnknownPeerNoteType".equals(messageName)) { + fireReceivedUnknownPeerNoteType(new UnknownPeerNoteType(fcpMessage)); + } else if ("UnknownNodeIdentifier".equals(messageName)) { + fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage)); + } else if ("PluginInfo".equals(messageName)) { + fireReceivedPluginInfo(new PluginInfo(fcpMessage)); + } else if ("NodeData".equals(messageName)) { + fireReceivedNodeData(new NodeData(fcpMessage)); + } else if ("TestDDAReply".equals(messageName)) { + fireReceivedTestDDAReply(new TestDDAReply(fcpMessage)); + } else if ("TestDDAComplete".equals(messageName)) { + fireReceivedTestDDAComplete(new TestDDAComplete(fcpMessage)); + } else if ("ConfigData".equals(messageName)) { + fireReceivedConfigData(new ConfigData(fcpMessage)); + } else if ("NodeHello".equals(messageName)) { + fireReceivedNodeHello(new NodeHello(fcpMessage)); + } else if ("CloseConnectionDuplicateClientName".equals(messageName)) { + fireReceivedCloseConnectionDuplicateClientName(new CloseConnectionDuplicateClientName(fcpMessage)); + } else { + fireMessageReceived(fcpMessage); } - return peerNotes; - } - - public void sendTestDDARequest(String directory, boolean wantReadDirectory, boolean wantWriteDirectory) throws IOException, FcpException { - FcpMessage testDDARequestMessage = new FcpMessage("TestDDARequest"); - testDDARequestMessage.setField("Directory", directory); - testDDARequestMessage.setField("WantReadDirectory", String.valueOf(wantReadDirectory)); - testDDARequestMessage.setField("WantWriteDirectory", String.valueOf(wantWriteDirectory)); - sendMessage(testDDARequestMessage); } - public FcpKeyPair generateSSK() throws IOException, FcpException { - FcpMessage generateSSKMessage = new FcpMessage("GenerateSSK"); - String identifier = hashCode() + String.valueOf(System.currentTimeMillis()); - generateSSKMessage.setField("Identifier", identifier); - sendMessage(generateSSKMessage); - FcpMessage returnMessage = waitForMessage("SSKKeypair(Identifier=" + identifier + ")"); - String publicKey = returnMessage.getField("RequestURI"); - String privateKey = returnMessage.getField("InsertURI"); - return new FcpKeyPair(publicKey, privateKey); + /** + * Handles a disconnect from the node. + */ + synchronized void handleDisconnect() { + Closer.close(remoteInputStream); + Closer.close(remoteOutputStream); + Closer.close(remoteSocket); + connectionHandler = null; } // // PRIVATE METHODS // - public FcpMessage waitForMessage(String... messageNames) throws FcpException { - FcpMessage oldMessage = null; - synchronized (messageWaitSync) { - while (true) { - while (receivedMessage == oldMessage) { - System.out.println("waiting for receivedMessage"); - try { - messageWaitSync.wait(); - } catch (InterruptedException ie1) { - } - } - System.out.println("got message: " + receivedMessage.getName()); - String receivedMessageName = receivedMessage.getName(); - if ("ProtocolError".equals(receivedMessageName)) { - int code = Integer.valueOf(receivedMessage.getField("Code")); - boolean fatal = Boolean.valueOf(receivedMessage.getField("Fatal")); - boolean global = Boolean.valueOf(receivedMessage.getField("Global")); - String codeDescription = receivedMessage.getField("CodeDescription"); - String extraDescription = receivedMessage.getField("ExtraDescription"); - String identifier = receivedMessage.getField("Identifier"); - FcpProtocolException fcpProtocolException = new FcpProtocolException(code, fatal, global); - fcpProtocolException.setCodeDescription(codeDescription); - fcpProtocolException.setExtraDescription(extraDescription); - fcpProtocolException.setIdentifier(identifier); - throw fcpProtocolException; - } - for (String messageName: messageNames) { - int firstBracket = messageName.indexOf('('); - Map wantedIdentifiers = new HashMap(); - if (firstBracket > -1) { - StringTokenizer identifierTokens = new StringTokenizer(messageName.substring(firstBracket), "()"); - while (identifierTokens.hasMoreTokens()) { - String identifierToken = identifierTokens.nextToken(); - int equalSign = identifierToken.indexOf('='); - if (equalSign > -1) { - wantedIdentifiers.put(identifierToken.substring(0, equalSign), identifierToken.substring(equalSign + 1)); - } - } - messageName = messageName.substring(0, firstBracket); - } - if (receivedMessageName.equals(messageName)) { - boolean found = true; - for (Entry wantedIdentifier: wantedIdentifiers.entrySet()) { - System.out.println("key: " + wantedIdentifier.getKey() + ", value: " + wantedIdentifier.getValue() + ", msg: " + receivedMessage.getField(wantedIdentifier.getKey())); - if (!wantedIdentifier.getValue().equals(receivedMessage.getField(wantedIdentifier.getKey()))) { - found = false; - break; - } - } - if (found) { - System.out.println("message found"); - FcpMessage foundMessage = receivedMessage; - receivedMessage = null; - messageWaitSync.notifyAll(); - return foundMessage; - } - } - } - oldMessage = receivedMessage; - } + /** + * Incremets the counter in {@link #incomingMessageStatistics} by 1 + * for the given message name. + * + * @param name + * The name of the message to count + */ + private void countMessage(String name) { + int oldValue = 0; + if (incomingMessageStatistics.containsKey(name)) { + oldValue = incomingMessageStatistics.get(name); + } + incomingMessageStatistics.put(name, oldValue + 1); + } + + /** + * Returns a limited input stream from the node’s input stream. + * + * @param dataLength + * The length of the stream + * @return The limited input stream + */ + private LimitedInputStream getInputStream(long dataLength) { + if (dataLength <= 0) { + return new LimitedInputStream(null, 0); } + return new LimitedInputStream(remoteInputStream, dataLength); } }