X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fnet%2Fpterodactylus%2Futil%2Ffcp%2FFcpConnection.java;h=748bb8f18bff478f2f58b1a6dbd977ba4045b587;hb=ebd531bc775036dd66a7e20abebbcb480af35491;hp=81b5c479bc43d039750dd3186eec15599f517f61;hpb=5a4bff024f037d288d4f34ed28c1dd769e06ffc5;p=jSite2.git diff --git a/src/net/pterodactylus/util/fcp/FcpConnection.java b/src/net/pterodactylus/util/fcp/FcpConnection.java index 81b5c47..748bb8f 100644 --- a/src/net/pterodactylus/util/fcp/FcpConnection.java +++ b/src/net/pterodactylus/util/fcp/FcpConnection.java @@ -305,6 +305,7 @@ public class FcpConnection { /** * Notifies all listeners that a “PersistentGet” message was received. * + * @see FcpListener#receivedPersistentGet(FcpConnection, PersistentGet) * @param persistentGet * The “PersistentGet” message */ @@ -331,6 +332,8 @@ public class FcpConnection { * Notifies all listeners that a “EndListPersistentRequests” message was * received. * + * @see FcpListener#receivedEndListPersistentRequests(FcpConnection, + * EndListPersistentRequests) * @param endListPersistentRequests * The “EndListPersistentRequests” message */ @@ -343,6 +346,7 @@ public class FcpConnection { /** * Notifies all listeners that a “URIGenerated” message was received. * + * @see FcpListener#receivedURIGenerated(FcpConnection, URIGenerated) * @param uriGenerated * The “URIGenerated” message */ @@ -355,6 +359,7 @@ public class FcpConnection { /** * Notifies all listeners that a “DataFound” message was received. * + * @see FcpListener#receivedDataFound(FcpConnection, DataFound) * @param dataFound * The “DataFound” message */ @@ -367,6 +372,7 @@ public class FcpConnection { /** * Notifies all listeners that an “AllData” message was received. * + * @see FcpListener#receivedAllData(FcpConnection, AllData) * @param allData * The “AllData” message */ @@ -379,6 +385,7 @@ public class FcpConnection { /** * Notifies all listeners that a “SimpleProgress” message was received. * + * @see FcpListener#receivedSimpleProgress(FcpConnection, SimpleProgress) * @param simpleProgress * The “SimpleProgress” message */ @@ -391,6 +398,8 @@ public class FcpConnection { /** * Notifies all listeners that a “StartedCompression” message was received. * + * @see FcpListener#receivedStartedCompression(FcpConnection, + * StartedCompression) * @param startedCompression * The “StartedCompression” message */ @@ -403,6 +412,8 @@ public class FcpConnection { /** * Notifies all listeners that a “FinishedCompression” message was received. * + * @see FcpListener#receviedFinishedCompression(FcpConnection, + * FinishedCompression) * @param finishedCompression * The “FinishedCompression” message */ @@ -416,6 +427,8 @@ public class FcpConnection { * Notifies all listeners that an “UnknownPeerNoteType” message was * received. * + * @see FcpListener#receivedUnknownPeerNoteType(FcpConnection, + * UnknownPeerNoteType) * @param unknownPeerNoteType * The “UnknownPeerNoteType” message */ @@ -429,6 +442,8 @@ public class FcpConnection { * Notifies all listeners that an “UnknownNodeIdentifier” message was * received. * + * @see FcpListener#receivedUnknownNodeIdentifier(FcpConnection, + * UnknownNodeIdentifier) * @param unknownNodeIdentifier * The “UnknownNodeIdentifier” message */ @@ -441,6 +456,7 @@ public class FcpConnection { /** * Notifies all listeners that a “ConfigData” message was received. * + * @see FcpListener#receivedConfigData(FcpConnection, ConfigData) * @param configData * The “ConfigData” message */ @@ -453,6 +469,7 @@ public class FcpConnection { /** * Notifies all listeners that a “GetFailed” message was received. * + * @see FcpListener#receivedGetFailed(FcpConnection, GetFailed) * @param getFailed * The “GetFailed” message */ @@ -465,6 +482,7 @@ public class FcpConnection { /** * Notifies all listeners that a “PutFailed” message was received. * + * @see FcpListener#receivedPutFailed(FcpConnection, PutFailed) * @param putFailed * The “PutFailed” message */ @@ -478,6 +496,8 @@ public class FcpConnection { * Notifies all listeners that an “IdentifierCollision” message was * received. * + * @see FcpListener#receivedIdentifierCollision(FcpConnection, + * IdentifierCollision) * @param identifierCollision * The “IdentifierCollision” message */ @@ -490,6 +510,8 @@ public class FcpConnection { /** * Notifies all listeners that an “PersistentPutDir” message was received. * + * @see FcpListener#receivedPersistentPutDir(FcpConnection, + * PersistentPutDir) * @param persistentPutDir * The “PersistentPutDir” message */ @@ -503,6 +525,8 @@ public class FcpConnection { * Notifies all listeners that a “PersistentRequestRemoved” message was * received. * + * @see FcpListener#receivedPersistentRequestRemoved(FcpConnection, + * PersistentRequestRemoved) * @param persistentRequestRemoved * The “PersistentRequestRemoved” message */ @@ -515,6 +539,8 @@ public class FcpConnection { /** * Notifies all listeners that a “SubscribedUSKUpdate” message was received. * + * @see FcpListener#receivedSubscribedUSKUpdate(FcpConnection, + * SubscribedUSKUpdate) * @param subscribedUSKUpdate * The “SubscribedUSKUpdate” message */ @@ -525,8 +551,76 @@ public class FcpConnection { } /** + * Notifies all listeners that a “PluginInfo” message was received. + * + * @see FcpListener#receivedPluginInfo(FcpConnection, PluginInfo) + * @param pluginInfo + * The “PluginInfo” message + */ + private void fireReceivedPluginInfo(PluginInfo pluginInfo) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPluginInfo(this, pluginInfo); + } + } + + /** + * Notifies all listeners that an “FCPPluginReply” message was received. + * + * @see FcpListener#receivedFCPPluginReply(FcpConnection, FCPPluginReply) + * @param fcpPluginReply + * The “FCPPluginReply” message + */ + private void fireReceivedFCPPluginReply(FCPPluginReply fcpPluginReply) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedFCPPluginReply(this, fcpPluginReply); + } + } + + /** + * Notifies all listeners that a “PersistentRequestModified” message was + * received. + * + * @see FcpListener#receivedPersistentRequestModified(FcpConnection, + * PersistentRequestModified) + * @param persistentRequestModified + * The “PersistentRequestModified” message + */ + private void fireReceivedPersistentRequestModified(PersistentRequestModified persistentRequestModified) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPersistentRequestModified(this, persistentRequestModified); + } + } + + /** + * Notifies all listeners that a “PutSuccessful” message was received. + * + * @see FcpListener#receivedPutSuccessful(FcpConnection, PutSuccessful) + * @param putSuccessful + * The “PutSuccessful” message + */ + private void fireReceivedPutSuccessful(PutSuccessful putSuccessful) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPutSuccessful(this, putSuccessful); + } + } + + /** + * Notifies all listeners that a “PutFetchable” message was received. + * + * @see FcpListener#receivedPutFetchable(FcpConnection, PutFetchable) + * @param putFetchable + * The “PutFetchable” message + */ + private void fireReceivedPutFetchable(PutFetchable putFetchable) { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.receivedPutFetchable(this, putFetchable); + } + } + + /** * Notifies all listeners that a “ProtocolError” message was received. * + * @see FcpListener#receivedProtocolError(FcpConnection, ProtocolError) * @param protocolError * The “ProtocolError” message */ @@ -549,6 +643,17 @@ public class FcpConnection { } } + /** + * Notifies all listeners that the connection to the node was closed. + * + * @see FcpListener#connectionClosed(FcpConnection) + */ + private void fireConnectionClosed() { + for (FcpListener fcpListener: fcpListeners) { + fcpListener.connectionClosed(this); + } + } + // // ACTIONS // @@ -635,6 +740,10 @@ public class FcpConnection { fireReceivedFinishedCompression(new FinishedCompression(fcpMessage)); } else if ("GetFailed".equals(messageName)) { fireReceivedGetFailed(new GetFailed(fcpMessage)); + } else if ("PutFetchable".equals(messageName)) { + fireReceivedPutFetchable(new PutFetchable(fcpMessage)); + } else if ("PutSuccessful".equals(messageName)) { + fireReceivedPutSuccessful(new PutSuccessful(fcpMessage)); } else if ("PutFailed".equals(messageName)) { fireReceivedPutFailed(new PutFailed(fcpMessage)); } else if ("DataFound".equals(messageName)) { @@ -644,18 +753,11 @@ public class FcpConnection { } else if ("IdentifierCollision".equals(messageName)) { fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage)); } else if ("AllData".equals(messageName)) { - long dataLength; - try { - dataLength = Long.valueOf(fcpMessage.getField("DataLength")); - } catch (NumberFormatException nfe1) { - dataLength = -1; - } - LimitedInputStream payloadInputStream = new LimitedInputStream(remoteInputStream, dataLength); + LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); fireReceivedAllData(new AllData(fcpMessage, payloadInputStream)); try { payloadInputStream.consume(); } catch (IOException ioe1) { - /* FIXME - what now? */ /* well, ignore. when the connection handler fails, all fails. */ } } else if ("EndListPeerNotes".equals(messageName)) { @@ -666,12 +768,24 @@ public class FcpConnection { fireReceivedSSKKeypair(new SSKKeypair(fcpMessage)); } else if ("PeerRemoved".equals(messageName)) { fireReceivedPeerRemoved(new PeerRemoved(fcpMessage)); + } else if ("PersistentRequestModified".equals(messageName)) { + fireReceivedPersistentRequestModified(new PersistentRequestModified(fcpMessage)); } else if ("PersistentRequestRemoved".equals(messageName)) { fireReceivedPersistentRequestRemoved(new PersistentRequestRemoved(fcpMessage)); } else if ("UnknownPeerNoteType".equals(messageName)) { fireReceivedUnknownPeerNoteType(new UnknownPeerNoteType(fcpMessage)); } else if ("UnknownNodeIdentifier".equals(messageName)) { fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage)); + } else if ("FCPPluginReply".equals(messageName)) { + LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); + fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream)); + try { + payloadInputStream.consume(); + } catch (IOException ioe1) { + /* ignore. */ + } + } else if ("PluginInfo".equals(messageName)) { + fireReceivedPluginInfo(new PluginInfo(fcpMessage)); } else if ("NodeData".equals(messageName)) { fireReceivedNodeData(new NodeData(fcpMessage)); } else if ("TestDDAReply".equals(messageName)) { @@ -697,6 +811,7 @@ public class FcpConnection { Closer.close(remoteOutputStream); Closer.close(remoteSocket); connectionHandler = null; + fireConnectionClosed(); } // @@ -718,4 +833,18 @@ public class FcpConnection { incomingMessageStatistics.put(name, oldValue + 1); } + /** + * Returns a limited input stream from the node’s input stream. + * + * @param dataLength + * The length of the stream + * @return The limited input stream + */ + private LimitedInputStream getInputStream(long dataLength) { + if (dataLength <= 0) { + return new LimitedInputStream(null, 0); + } + return new LimitedInputStream(remoteInputStream, dataLength); + } + }