X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fnet%2Fpterodactylus%2Futil%2Ffcp%2FFcpConnection.java;h=748bb8f18bff478f2f58b1a6dbd977ba4045b587;hb=ebd531bc775036dd66a7e20abebbcb480af35491;hp=815e45ee7b019d9264ca007ab6abe0a88a016be6;hpb=321147d4c7eeeccd114a83225ef15fe1d13efa27;p=jSite2.git diff --git a/src/net/pterodactylus/util/fcp/FcpConnection.java b/src/net/pterodactylus/util/fcp/FcpConnection.java index 815e45e..748bb8f 100644 --- a/src/net/pterodactylus/util/fcp/FcpConnection.java +++ b/src/net/pterodactylus/util/fcp/FcpConnection.java @@ -26,7 +26,10 @@ import java.net.InetAddress; import java.net.Socket; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import net.pterodactylus.util.io.Closer; import net.pterodactylus.util.io.LimitedInputStream; @@ -63,6 +66,20 @@ public class FcpConnection { /** The connection handler. */ private FcpConnectionHandler connectionHandler; + /** Incoming message statistics. */ + private Map incomingMessageStatistics = Collections.synchronizedMap(new HashMap()); + + /** + * Creates a new FCP connection to the freenet node running on localhost, + * using the default port. + * + * @throws UnknownHostException + * if the hostname can not be resolved + */ + public FcpConnection() throws UnknownHostException { + this(InetAddress.getLocalHost()); + } + /** * Creates a new FCP connection to the Freenet node running on the given * host, listening on the default port. @@ -286,6 +303,19 @@ public class FcpConnection { } /** + * Notifies all listeners that a “PersistentGet” message was received. + * + * @see FcpListener#receivedPersistentGet(FcpConnection, PersistentGet) + * @param persistentGet + * The “PersistentGet” message + */ + private void fireReceivedPersistentGet(PersistentGet persistentGet) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPersistentGet(this, persistentGet); + } + } + + /** * Notifies all listeners that a “PersistentPut” message was received. * * @see FcpListener#receivedPersistentPut(FcpConnection, PersistentPut) @@ -302,6 +332,8 @@ public class FcpConnection { * Notifies all listeners that a “EndListPersistentRequests” message was * received. * + * @see FcpListener#receivedEndListPersistentRequests(FcpConnection, + * EndListPersistentRequests) * @param endListPersistentRequests * The “EndListPersistentRequests” message */ @@ -314,6 +346,7 @@ public class FcpConnection { /** * Notifies all listeners that a “URIGenerated” message was received. * + * @see FcpListener#receivedURIGenerated(FcpConnection, URIGenerated) * @param uriGenerated * The “URIGenerated” message */ @@ -324,8 +357,22 @@ public class FcpConnection { } /** + * Notifies all listeners that a “DataFound” message was received. + * + * @see FcpListener#receivedDataFound(FcpConnection, DataFound) + * @param dataFound + * The “DataFound” message + */ + private void fireReceivedDataFound(DataFound dataFound) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedDataFound(this, dataFound); + } + } + + /** * Notifies all listeners that an “AllData” message was received. * + * @see FcpListener#receivedAllData(FcpConnection, AllData) * @param allData * The “AllData” message */ @@ -338,6 +385,7 @@ public class FcpConnection { /** * Notifies all listeners that a “SimpleProgress” message was received. * + * @see FcpListener#receivedSimpleProgress(FcpConnection, SimpleProgress) * @param simpleProgress * The “SimpleProgress” message */ @@ -348,8 +396,231 @@ public class FcpConnection { } /** + * Notifies all listeners that a “StartedCompression” message was received. + * + * @see FcpListener#receivedStartedCompression(FcpConnection, + * StartedCompression) + * @param startedCompression + * The “StartedCompression” message + */ + private void fireReceivedStartedCompression(StartedCompression startedCompression) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedStartedCompression(this, startedCompression); + } + } + + /** + * Notifies all listeners that a “FinishedCompression” message was received. + * + * @see FcpListener#receviedFinishedCompression(FcpConnection, + * FinishedCompression) + * @param finishedCompression + * The “FinishedCompression” message + */ + private void fireReceivedFinishedCompression(FinishedCompression finishedCompression) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receviedFinishedCompression(this, finishedCompression); + } + } + + /** + * Notifies all listeners that an “UnknownPeerNoteType” message was + * received. + * + * @see FcpListener#receivedUnknownPeerNoteType(FcpConnection, + * UnknownPeerNoteType) + * @param unknownPeerNoteType + * The “UnknownPeerNoteType” message + */ + private void fireReceivedUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedUnknownPeerNoteType(this, unknownPeerNoteType); + } + } + + /** + * Notifies all listeners that an “UnknownNodeIdentifier” message was + * received. + * + * @see FcpListener#receivedUnknownNodeIdentifier(FcpConnection, + * UnknownNodeIdentifier) + * @param unknownNodeIdentifier + * The “UnknownNodeIdentifier” message + */ + private void fireReceivedUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedUnknownNodeIdentifier(this, unknownNodeIdentifier); + } + } + + /** + * Notifies all listeners that a “ConfigData” message was received. + * + * @see FcpListener#receivedConfigData(FcpConnection, ConfigData) + * @param configData + * The “ConfigData” message + */ + private void fireReceivedConfigData(ConfigData configData) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedConfigData(this, configData); + } + } + + /** + * Notifies all listeners that a “GetFailed” message was received. + * + * @see FcpListener#receivedGetFailed(FcpConnection, GetFailed) + * @param getFailed + * The “GetFailed” message + */ + private void fireReceivedGetFailed(GetFailed getFailed) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedGetFailed(this, getFailed); + } + } + + /** + * Notifies all listeners that a “PutFailed” message was received. + * + * @see FcpListener#receivedPutFailed(FcpConnection, PutFailed) + * @param putFailed + * The “PutFailed” message + */ + private void fireReceivedPutFailed(PutFailed putFailed) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPutFailed(this, putFailed); + } + } + + /** + * Notifies all listeners that an “IdentifierCollision” message was + * received. + * + * @see FcpListener#receivedIdentifierCollision(FcpConnection, + * IdentifierCollision) + * @param identifierCollision + * The “IdentifierCollision” message + */ + private void fireReceivedIdentifierCollision(IdentifierCollision identifierCollision) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedIdentifierCollision(this, identifierCollision); + } + } + + /** + * Notifies all listeners that an “PersistentPutDir” message was received. + * + * @see FcpListener#receivedPersistentPutDir(FcpConnection, + * PersistentPutDir) + * @param persistentPutDir + * The “PersistentPutDir” message + */ + private void fireReceivedPersistentPutDir(PersistentPutDir persistentPutDir) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPersistentPutDir(this, persistentPutDir); + } + } + + /** + * Notifies all listeners that a “PersistentRequestRemoved” message was + * received. + * + * @see FcpListener#receivedPersistentRequestRemoved(FcpConnection, + * PersistentRequestRemoved) + * @param persistentRequestRemoved + * The “PersistentRequestRemoved” message + */ + private void fireReceivedPersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPersistentRequestRemoved(this, persistentRequestRemoved); + } + } + + /** + * Notifies all listeners that a “SubscribedUSKUpdate” message was received. + * + * @see FcpListener#receivedSubscribedUSKUpdate(FcpConnection, + * SubscribedUSKUpdate) + * @param subscribedUSKUpdate + * The “SubscribedUSKUpdate” message + */ + private void fireReceivedSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedSubscribedUSKUpdate(this, subscribedUSKUpdate); + } + } + + /** + * Notifies all listeners that a “PluginInfo” message was received. + * + * @see FcpListener#receivedPluginInfo(FcpConnection, PluginInfo) + * @param pluginInfo + * The “PluginInfo” message + */ + private void fireReceivedPluginInfo(PluginInfo pluginInfo) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPluginInfo(this, pluginInfo); + } + } + + /** + * Notifies all listeners that an “FCPPluginReply” message was received. + * + * @see FcpListener#receivedFCPPluginReply(FcpConnection, FCPPluginReply) + * @param fcpPluginReply + * The “FCPPluginReply” message + */ + private void fireReceivedFCPPluginReply(FCPPluginReply fcpPluginReply) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedFCPPluginReply(this, fcpPluginReply); + } + } + + /** + * Notifies all listeners that a “PersistentRequestModified” message was + * received. + * + * @see FcpListener#receivedPersistentRequestModified(FcpConnection, + * PersistentRequestModified) + * @param persistentRequestModified + * The “PersistentRequestModified” message + */ + private void fireReceivedPersistentRequestModified(PersistentRequestModified persistentRequestModified) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPersistentRequestModified(this, persistentRequestModified); + } + } + + /** + * Notifies all listeners that a “PutSuccessful” message was received. + * + * @see FcpListener#receivedPutSuccessful(FcpConnection, PutSuccessful) + * @param putSuccessful + * The “PutSuccessful” message + */ + private void fireReceivedPutSuccessful(PutSuccessful putSuccessful) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPutSuccessful(this, putSuccessful); + } + } + + /** + * Notifies all listeners that a “PutFetchable” message was received. + * + * @see FcpListener#receivedPutFetchable(FcpConnection, PutFetchable) + * @param putFetchable + * The “PutFetchable” message + */ + private void fireReceivedPutFetchable(PutFetchable putFetchable) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPutFetchable(this, putFetchable); + } + } + + /** * Notifies all listeners that a “ProtocolError” message was received. * + * @see FcpListener#receivedProtocolError(FcpConnection, ProtocolError) * @param protocolError * The “ProtocolError” message */ @@ -372,6 +643,17 @@ public class FcpConnection { } } + /** + * Notifies all listeners that the connection to the node was closed. + * + * @see FcpListener#connectionClosed(FcpConnection) + */ + private void fireConnectionClosed() { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.connectionClosed(this); + } + } + // // ACTIONS // @@ -433,12 +715,17 @@ public class FcpConnection { */ void handleMessage(FcpMessage fcpMessage) { String messageName = fcpMessage.getName(); + countMessage(messageName); if ("SimpleProgress".equals(messageName)) { fireReceivedSimpleProgress(new SimpleProgress(fcpMessage)); } else if ("ProtocolError".equals(messageName)) { fireReceivedProtocolError(new ProtocolError(fcpMessage)); + } else if ("PersistentGet".equals(messageName)) { + fireReceivedPersistentGet(new PersistentGet(fcpMessage)); } else if ("PersistentPut".equals(messageName)) { fireReceivedPersistentPut(new PersistentPut(fcpMessage)); + } else if ("PersistentPutDir".equals(messageName)) { + fireReceivedPersistentPutDir(new PersistentPutDir(fcpMessage)); } else if ("URIGenerated".equals(messageName)) { fireReceivedURIGenerated(new URIGenerated(fcpMessage)); } else if ("EndListPersistentRequests".equals(messageName)) { @@ -447,19 +734,30 @@ public class FcpConnection { fireReceivedPeer(new Peer(fcpMessage)); } else if ("PeerNote".equals(messageName)) { fireReceivedPeerNote(new PeerNote(fcpMessage)); + } else if ("StartedCompression".equals(messageName)) { + fireReceivedStartedCompression(new StartedCompression(fcpMessage)); + } else if ("FinishedCompression".equals(messageName)) { + fireReceivedFinishedCompression(new FinishedCompression(fcpMessage)); + } else if ("GetFailed".equals(messageName)) { + fireReceivedGetFailed(new GetFailed(fcpMessage)); + } else if ("PutFetchable".equals(messageName)) { + fireReceivedPutFetchable(new PutFetchable(fcpMessage)); + } else if ("PutSuccessful".equals(messageName)) { + fireReceivedPutSuccessful(new PutSuccessful(fcpMessage)); + } else if ("PutFailed".equals(messageName)) { + fireReceivedPutFailed(new PutFailed(fcpMessage)); + } else if ("DataFound".equals(messageName)) { + fireReceivedDataFound(new DataFound(fcpMessage)); + } else if ("SubscribedUSKUpdate".equals(messageName)) { + fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage)); + } else if ("IdentifierCollision".equals(messageName)) { + fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage)); } else if ("AllData".equals(messageName)) { - long dataLength; - try { - dataLength = Long.valueOf(fcpMessage.getField("DataLength")); - } catch (NumberFormatException nfe1) { - dataLength = -1; - } - LimitedInputStream payloadInputStream = new LimitedInputStream(remoteInputStream, dataLength); + LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); fireReceivedAllData(new AllData(fcpMessage, payloadInputStream)); try { payloadInputStream.consume(); } catch (IOException ioe1) { - /* FIXME - what now? */ /* well, ignore. when the connection handler fails, all fails. */ } } else if ("EndListPeerNotes".equals(messageName)) { @@ -470,12 +768,32 @@ public class FcpConnection { fireReceivedSSKKeypair(new SSKKeypair(fcpMessage)); } else if ("PeerRemoved".equals(messageName)) { fireReceivedPeerRemoved(new PeerRemoved(fcpMessage)); + } else if ("PersistentRequestModified".equals(messageName)) { + fireReceivedPersistentRequestModified(new PersistentRequestModified(fcpMessage)); + } else if ("PersistentRequestRemoved".equals(messageName)) { + fireReceivedPersistentRequestRemoved(new PersistentRequestRemoved(fcpMessage)); + } else if ("UnknownPeerNoteType".equals(messageName)) { + fireReceivedUnknownPeerNoteType(new UnknownPeerNoteType(fcpMessage)); + } else if ("UnknownNodeIdentifier".equals(messageName)) { + fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage)); + } else if ("FCPPluginReply".equals(messageName)) { + LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); + fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream)); + try { + payloadInputStream.consume(); + } catch (IOException ioe1) { + /* ignore. */ + } + } else if ("PluginInfo".equals(messageName)) { + fireReceivedPluginInfo(new PluginInfo(fcpMessage)); } else if ("NodeData".equals(messageName)) { fireReceivedNodeData(new NodeData(fcpMessage)); } else if ("TestDDAReply".equals(messageName)) { fireReceivedTestDDAReply(new TestDDAReply(fcpMessage)); } else if ("TestDDAComplete".equals(messageName)) { fireReceivedTestDDAComplete(new TestDDAComplete(fcpMessage)); + } else if ("ConfigData".equals(messageName)) { + fireReceivedConfigData(new ConfigData(fcpMessage)); } else if ("NodeHello".equals(messageName)) { fireReceivedNodeHello(new NodeHello(fcpMessage)); } else if ("CloseConnectionDuplicateClientName".equals(messageName)) { @@ -485,4 +803,48 @@ public class FcpConnection { } } + /** + * Handles a disconnect from the node. + */ + synchronized void handleDisconnect() { + Closer.close(remoteInputStream); + Closer.close(remoteOutputStream); + Closer.close(remoteSocket); + connectionHandler = null; + fireConnectionClosed(); + } + + // + // PRIVATE METHODS + // + + /** + * Incremets the counter in {@link #incomingMessageStatistics} by 1 + * for the given message name. + * + * @param name + * The name of the message to count + */ + private void countMessage(String name) { + int oldValue = 0; + if (incomingMessageStatistics.containsKey(name)) { + oldValue = incomingMessageStatistics.get(name); + } + incomingMessageStatistics.put(name, oldValue + 1); + } + + /** + * Returns a limited input stream from the node’s input stream. + * + * @param dataLength + * The length of the stream + * @return The limited input stream + */ + private LimitedInputStream getInputStream(long dataLength) { + if (dataLength <= 0) { + return new LimitedInputStream(null, 0); + } + return new LimitedInputStream(remoteInputStream, dataLength); + } + }