2 * jSite2 - FpcConnection.java -
3 * Copyright © 2008 David Roden
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 package net.pterodactylus.fcp;
22 import java.io.Closeable;
23 import java.io.FilterInputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.OutputStream;
27 import java.net.InetAddress;
28 import java.net.Socket;
29 import java.net.UnknownHostException;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
35 import java.util.logging.Logger;
38 * An FCP connection to a Freenet node.
40 * @author David ‘Bombe’ Roden <bombe@freenetproject.org>
42 public class FcpConnection implements Closeable {
45 private static final Logger logger = Logger.getLogger(FcpConnection.class.getName());
47 /** The default port for FCP v2. */
48 public static final int DEFAULT_PORT = 9481;
50 /** The list of FCP listeners. */
51 private final List<FcpListener> fcpListeners = new ArrayList<FcpListener>();
53 /** The address of the node. */
54 private final InetAddress address;
56 /** The port number of the node’s FCP port. */
57 private final int port;
59 /** The remote socket. */
60 private Socket remoteSocket;
62 /** The input stream from the node. */
63 private InputStream remoteInputStream;
65 /** The output stream to the node. */
66 private OutputStream remoteOutputStream;
68 /** The connection handler. */
69 private FcpConnectionHandler connectionHandler;
71 /** Incoming message statistics. */
72 private Map<String, Integer> incomingMessageStatistics = Collections.synchronizedMap(new HashMap<String, Integer>());
75 * Creates a new FCP connection to the freenet node running on localhost,
76 * using the default port.
78 * @throws UnknownHostException
79 * if the hostname can not be resolved
81 public FcpConnection() throws UnknownHostException {
82 this(InetAddress.getLocalHost());
86 * Creates a new FCP connection to the Freenet node running on the given
87 * host, listening on the default port.
90 * The hostname of the Freenet node
91 * @throws UnknownHostException
92 * if <code>host</code> can not be resolved
94 public FcpConnection(String host) throws UnknownHostException {
95 this(host, DEFAULT_PORT);
99 * Creates a new FCP connection to the Freenet node running on the given
100 * host, listening on the given port.
103 * The hostname of the Freenet node
105 * The port number of the node’s FCP port
106 * @throws UnknownHostException
107 * if <code>host</code> can not be resolved
109 public FcpConnection(String host, int port) throws UnknownHostException {
110 this(InetAddress.getByName(host), port);
114 * Creates a new FCP connection to the Freenet node running at the given
115 * address, listening on the default port.
118 * The address of the Freenet node
120 public FcpConnection(InetAddress address) {
121 this(address, DEFAULT_PORT);
125 * Creates a new FCP connection to the Freenet node running at the given
126 * address, listening on the given port.
129 * The address of the Freenet node
131 * The port number of the node’s FCP port
133 public FcpConnection(InetAddress address, int port) {
134 this.address = address;
139 // LISTENER MANAGEMENT
143 * Adds the given listener to the list of listeners.
146 * The listener to add
148 public void addFcpListener(FcpListener fcpListener) {
149 fcpListeners.add(fcpListener);
153 * Removes the given listener from the list of listeners.
156 * The listener to remove
158 public void removeFcpListener(FcpListener fcpListener) {
159 fcpListeners.remove(fcpListener);
163 * Notifies listeners that a “NodeHello” message was received.
165 * @see FcpListener#receivedNodeHello(FcpConnection, NodeHello)
167 * The “NodeHello” message
169 private void fireReceivedNodeHello(NodeHello nodeHello) {
170 for (FcpListener fcpListener : fcpListeners) {
171 fcpListener.receivedNodeHello(this, nodeHello);
176 * Notifies listeners that a “CloseConnectionDuplicateClientName” message
179 * @see FcpListener#receivedCloseConnectionDuplicateClientName(FcpConnection,
180 * CloseConnectionDuplicateClientName)
181 * @param closeConnectionDuplicateClientName
182 * The “CloseConnectionDuplicateClientName” message
184 private void fireReceivedCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
185 for (FcpListener fcpListener : fcpListeners) {
186 fcpListener.receivedCloseConnectionDuplicateClientName(this, closeConnectionDuplicateClientName);
191 * Notifies listeners that a “SSKKeypair” message was received.
193 * @see FcpListener#receivedSSKKeypair(FcpConnection, SSKKeypair)
195 * The “SSKKeypair” message
197 private void fireReceivedSSKKeypair(SSKKeypair sskKeypair) {
198 for (FcpListener fcpListener : fcpListeners) {
199 fcpListener.receivedSSKKeypair(this, sskKeypair);
204 * Notifies listeners that a “Peer” message was received.
206 * @see FcpListener#receivedPeer(FcpConnection, Peer)
210 private void fireReceivedPeer(Peer peer) {
211 for (FcpListener fcpListener : fcpListeners) {
212 fcpListener.receivedPeer(this, peer);
217 * Notifies all listeners that an “EndListPeers” message was received.
219 * @see FcpListener#receivedEndListPeers(FcpConnection, EndListPeers)
220 * @param endListPeers
221 * The “EndListPeers” message
223 private void fireReceivedEndListPeers(EndListPeers endListPeers) {
224 for (FcpListener fcpListener : fcpListeners) {
225 fcpListener.receivedEndListPeers(this, endListPeers);
230 * Notifies all listeners that a “PeerNote” message was received.
232 * @see FcpListener#receivedPeerNote(FcpConnection, PeerNote)
235 private void fireReceivedPeerNote(PeerNote peerNote) {
236 for (FcpListener fcpListener : fcpListeners) {
237 fcpListener.receivedPeerNote(this, peerNote);
242 * Notifies all listeners that an “EndListPeerNotes” message was received.
244 * @see FcpListener#receivedEndListPeerNotes(FcpConnection,
246 * @param endListPeerNotes
247 * The “EndListPeerNotes” message
249 private void fireReceivedEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
250 for (FcpListener fcpListener : fcpListeners) {
251 fcpListener.receivedEndListPeerNotes(this, endListPeerNotes);
256 * Notifies all listeners that a “PeerRemoved” message was received.
258 * @see FcpListener#receivedPeerRemoved(FcpConnection, PeerRemoved)
260 * The “PeerRemoved” message
262 private void fireReceivedPeerRemoved(PeerRemoved peerRemoved) {
263 for (FcpListener fcpListener : fcpListeners) {
264 fcpListener.receivedPeerRemoved(this, peerRemoved);
269 * Notifies all listeners that a “NodeData” message was received.
271 * @see FcpListener#receivedNodeData(FcpConnection, NodeData)
273 * The “NodeData” message
275 private void fireReceivedNodeData(NodeData nodeData) {
276 for (FcpListener fcpListener : fcpListeners) {
277 fcpListener.receivedNodeData(this, nodeData);
282 * Notifies all listeners that a “TestDDAReply” message was received.
284 * @see FcpListener#receivedTestDDAReply(FcpConnection, TestDDAReply)
285 * @param testDDAReply
286 * The “TestDDAReply” message
288 private void fireReceivedTestDDAReply(TestDDAReply testDDAReply) {
289 for (FcpListener fcpListener : fcpListeners) {
290 fcpListener.receivedTestDDAReply(this, testDDAReply);
295 * Notifies all listeners that a “TestDDAComplete” message was received.
297 * @see FcpListener#receivedTestDDAComplete(FcpConnection, TestDDAComplete)
298 * @param testDDAComplete
299 * The “TestDDAComplete” message
301 private void fireReceivedTestDDAComplete(TestDDAComplete testDDAComplete) {
302 for (FcpListener fcpListener : fcpListeners) {
303 fcpListener.receivedTestDDAComplete(this, testDDAComplete);
308 * Notifies all listeners that a “PersistentGet” message was received.
310 * @see FcpListener#receivedPersistentGet(FcpConnection, PersistentGet)
311 * @param persistentGet
312 * The “PersistentGet” message
314 private void fireReceivedPersistentGet(PersistentGet persistentGet) {
315 for (FcpListener fcpListener : fcpListeners) {
316 fcpListener.receivedPersistentGet(this, persistentGet);
321 * Notifies all listeners that a “PersistentPut” message was received.
323 * @see FcpListener#receivedPersistentPut(FcpConnection, PersistentPut)
324 * @param persistentPut
325 * The “PersistentPut” message
327 private void fireReceivedPersistentPut(PersistentPut persistentPut) {
328 for (FcpListener fcpListener : fcpListeners) {
329 fcpListener.receivedPersistentPut(this, persistentPut);
334 * Notifies all listeners that a “EndListPersistentRequests” message was
337 * @see FcpListener#receivedEndListPersistentRequests(FcpConnection,
338 * EndListPersistentRequests)
339 * @param endListPersistentRequests
340 * The “EndListPersistentRequests” message
342 private void fireReceivedEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
343 for (FcpListener fcpListener : fcpListeners) {
344 fcpListener.receivedEndListPersistentRequests(this, endListPersistentRequests);
349 * Notifies all listeners that a “URIGenerated” message was received.
351 * @see FcpListener#receivedURIGenerated(FcpConnection, URIGenerated)
352 * @param uriGenerated
353 * The “URIGenerated” message
355 private void fireReceivedURIGenerated(URIGenerated uriGenerated) {
356 for (FcpListener fcpListener : fcpListeners) {
357 fcpListener.receivedURIGenerated(this, uriGenerated);
362 * Notifies all listeners that a “DataFound” message was received.
364 * @see FcpListener#receivedDataFound(FcpConnection, DataFound)
366 * The “DataFound” message
368 private void fireReceivedDataFound(DataFound dataFound) {
369 for (FcpListener fcpListener : fcpListeners) {
370 fcpListener.receivedDataFound(this, dataFound);
375 * Notifies all listeners that an “AllData” message was received.
377 * @see FcpListener#receivedAllData(FcpConnection, AllData)
379 * The “AllData” message
381 private void fireReceivedAllData(AllData allData) {
382 for (FcpListener fcpListener : fcpListeners) {
383 fcpListener.receivedAllData(this, allData);
388 * Notifies all listeners that a “SimpleProgress” message was received.
390 * @see FcpListener#receivedSimpleProgress(FcpConnection, SimpleProgress)
391 * @param simpleProgress
392 * The “SimpleProgress” message
394 private void fireReceivedSimpleProgress(SimpleProgress simpleProgress) {
395 for (FcpListener fcpListener : fcpListeners) {
396 fcpListener.receivedSimpleProgress(this, simpleProgress);
401 * Notifies all listeners that a “StartedCompression” message was received.
403 * @see FcpListener#receivedStartedCompression(FcpConnection,
404 * StartedCompression)
405 * @param startedCompression
406 * The “StartedCompression” message
408 private void fireReceivedStartedCompression(StartedCompression startedCompression) {
409 for (FcpListener fcpListener : fcpListeners) {
410 fcpListener.receivedStartedCompression(this, startedCompression);
415 * Notifies all listeners that a “FinishedCompression” message was received.
417 * @see FcpListener#receviedFinishedCompression(FcpConnection,
418 * FinishedCompression)
419 * @param finishedCompression
420 * The “FinishedCompression” message
422 private void fireReceivedFinishedCompression(FinishedCompression finishedCompression) {
423 for (FcpListener fcpListener : fcpListeners) {
424 fcpListener.receviedFinishedCompression(this, finishedCompression);
429 * Notifies all listeners that an “UnknownPeerNoteType” message was
432 * @see FcpListener#receivedUnknownPeerNoteType(FcpConnection,
433 * UnknownPeerNoteType)
434 * @param unknownPeerNoteType
435 * The “UnknownPeerNoteType” message
437 private void fireReceivedUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
438 for (FcpListener fcpListener : fcpListeners) {
439 fcpListener.receivedUnknownPeerNoteType(this, unknownPeerNoteType);
444 * Notifies all listeners that an “UnknownNodeIdentifier” message was
447 * @see FcpListener#receivedUnknownNodeIdentifier(FcpConnection,
448 * UnknownNodeIdentifier)
449 * @param unknownNodeIdentifier
450 * The “UnknownNodeIdentifier” message
452 private void fireReceivedUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
453 for (FcpListener fcpListener : fcpListeners) {
454 fcpListener.receivedUnknownNodeIdentifier(this, unknownNodeIdentifier);
459 * Notifies all listeners that a “ConfigData” message was received.
461 * @see FcpListener#receivedConfigData(FcpConnection, ConfigData)
463 * The “ConfigData” message
465 private void fireReceivedConfigData(ConfigData configData) {
466 for (FcpListener fcpListener : fcpListeners) {
467 fcpListener.receivedConfigData(this, configData);
472 * Notifies all listeners that a “GetFailed” message was received.
474 * @see FcpListener#receivedGetFailed(FcpConnection, GetFailed)
476 * The “GetFailed” message
478 private void fireReceivedGetFailed(GetFailed getFailed) {
479 for (FcpListener fcpListener : fcpListeners) {
480 fcpListener.receivedGetFailed(this, getFailed);
485 * Notifies all listeners that a “PutFailed” message was received.
487 * @see FcpListener#receivedPutFailed(FcpConnection, PutFailed)
489 * The “PutFailed” message
491 private void fireReceivedPutFailed(PutFailed putFailed) {
492 for (FcpListener fcpListener : fcpListeners) {
493 fcpListener.receivedPutFailed(this, putFailed);
498 * Notifies all listeners that an “IdentifierCollision” message was
501 * @see FcpListener#receivedIdentifierCollision(FcpConnection,
502 * IdentifierCollision)
503 * @param identifierCollision
504 * The “IdentifierCollision” message
506 private void fireReceivedIdentifierCollision(IdentifierCollision identifierCollision) {
507 for (FcpListener fcpListener : fcpListeners) {
508 fcpListener.receivedIdentifierCollision(this, identifierCollision);
513 * Notifies all listeners that an “PersistentPutDir” message was received.
515 * @see FcpListener#receivedPersistentPutDir(FcpConnection,
517 * @param persistentPutDir
518 * The “PersistentPutDir” message
520 private void fireReceivedPersistentPutDir(PersistentPutDir persistentPutDir) {
521 for (FcpListener fcpListener : fcpListeners) {
522 fcpListener.receivedPersistentPutDir(this, persistentPutDir);
527 * Notifies all listeners that a “PersistentRequestRemoved” message was
530 * @see FcpListener#receivedPersistentRequestRemoved(FcpConnection,
531 * PersistentRequestRemoved)
532 * @param persistentRequestRemoved
533 * The “PersistentRequestRemoved” message
535 private void fireReceivedPersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) {
536 for (FcpListener fcpListener : fcpListeners) {
537 fcpListener.receivedPersistentRequestRemoved(this, persistentRequestRemoved);
542 * Notifies all listeners that a “SubscribedUSKUpdate” message was received.
544 * @see FcpListener#receivedSubscribedUSKUpdate(FcpConnection,
545 * SubscribedUSKUpdate)
546 * @param subscribedUSKUpdate
547 * The “SubscribedUSKUpdate” message
549 private void fireReceivedSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) {
550 for (FcpListener fcpListener : fcpListeners) {
551 fcpListener.receivedSubscribedUSKUpdate(this, subscribedUSKUpdate);
556 * Notifies all listeners that a “PluginInfo” message was received.
558 * @see FcpListener#receivedPluginInfo(FcpConnection, PluginInfo)
560 * The “PluginInfo” message
562 private void fireReceivedPluginInfo(PluginInfo pluginInfo) {
563 for (FcpListener fcpListener : fcpListeners) {
564 fcpListener.receivedPluginInfo(this, pluginInfo);
569 * Notifies all listeners that an “FCPPluginReply” message was received.
571 * @see FcpListener#receivedFCPPluginReply(FcpConnection, FCPPluginReply)
572 * @param fcpPluginReply
573 * The “FCPPluginReply” message
575 private void fireReceivedFCPPluginReply(FCPPluginReply fcpPluginReply) {
576 for (FcpListener fcpListener : fcpListeners) {
577 fcpListener.receivedFCPPluginReply(this, fcpPluginReply);
582 * Notifies all listeners that a “PersistentRequestModified” message was
585 * @see FcpListener#receivedPersistentRequestModified(FcpConnection,
586 * PersistentRequestModified)
587 * @param persistentRequestModified
588 * The “PersistentRequestModified” message
590 private void fireReceivedPersistentRequestModified(PersistentRequestModified persistentRequestModified) {
591 for (FcpListener fcpListener : fcpListeners) {
592 fcpListener.receivedPersistentRequestModified(this, persistentRequestModified);
597 * Notifies all listeners that a “PutSuccessful” message was received.
599 * @see FcpListener#receivedPutSuccessful(FcpConnection, PutSuccessful)
600 * @param putSuccessful
601 * The “PutSuccessful” message
603 private void fireReceivedPutSuccessful(PutSuccessful putSuccessful) {
604 for (FcpListener fcpListener : fcpListeners) {
605 fcpListener.receivedPutSuccessful(this, putSuccessful);
610 * Notifies all listeners that a “PutFetchable” message was received.
612 * @see FcpListener#receivedPutFetchable(FcpConnection, PutFetchable)
613 * @param putFetchable
614 * The “PutFetchable” message
616 private void fireReceivedPutFetchable(PutFetchable putFetchable) {
617 for (FcpListener fcpListener : fcpListeners) {
618 fcpListener.receivedPutFetchable(this, putFetchable);
623 * Notifies all listeners that a “ProtocolError” message was received.
625 * @see FcpListener#receivedProtocolError(FcpConnection, ProtocolError)
626 * @param protocolError
627 * The “ProtocolError” message
629 private void fireReceivedProtocolError(ProtocolError protocolError) {
630 for (FcpListener fcpListener : fcpListeners) {
631 fcpListener.receivedProtocolError(this, protocolError);
636 * Notifies all registered listeners that a message has been received.
638 * @see FcpListener#receivedMessage(FcpConnection, FcpMessage)
640 * The message that was received
642 private void fireMessageReceived(FcpMessage fcpMessage) {
643 for (FcpListener fcpListener : fcpListeners) {
644 fcpListener.receivedMessage(this, fcpMessage);
649 * Notifies all listeners that the connection to the node was closed.
652 * The exception that caused the disconnect, or <code>null</code>
653 * if there was no exception
654 * @see FcpListener#connectionClosed(FcpConnection, Throwable)
656 private void fireConnectionClosed(Throwable throwable) {
657 for (FcpListener fcpListener : fcpListeners) {
658 fcpListener.connectionClosed(this, throwable);
667 * Connects to the node.
669 * @throws IOException
670 * if an I/O error occurs
671 * @throws IllegalStateException
672 * if there is already a connection to the node
674 public synchronized void connect() throws IOException, IllegalStateException {
675 if (connectionHandler != null) {
676 throw new IllegalStateException("already connected, disconnect first");
678 logger.info("connecting to " + address + ":" + port + "…");
679 remoteSocket = new Socket(address, port);
680 remoteInputStream = remoteSocket.getInputStream();
681 remoteOutputStream = remoteSocket.getOutputStream();
682 new Thread(connectionHandler = new FcpConnectionHandler(this, remoteInputStream)).start();
686 * Disconnects from the node. If there is no connection to the node, this
687 * method does nothing.
689 * @deprecated Use {@link #close()} instead
692 public synchronized void disconnect() {
697 * Closes the connection. If there is no connection to the node, this method
700 public void close() {
701 handleDisconnect(null);
705 * Sends the given FCP message.
708 * The FCP message to send
709 * @throws IOException
710 * if an I/O error occurs
712 public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException {
713 logger.fine("sending message: " + fcpMessage.getName());
714 fcpMessage.write(remoteOutputStream);
718 // PACKAGE-PRIVATE METHODS
722 * Handles the given message, notifying listeners. This message should only
723 * be called by {@link FcpConnectionHandler}.
726 * The received message
728 void handleMessage(FcpMessage fcpMessage) {
729 logger.fine("received message: " + fcpMessage.getName());
730 String messageName = fcpMessage.getName();
731 countMessage(messageName);
732 if ("SimpleProgress".equals(messageName)) {
733 fireReceivedSimpleProgress(new SimpleProgress(fcpMessage));
734 } else if ("ProtocolError".equals(messageName)) {
735 fireReceivedProtocolError(new ProtocolError(fcpMessage));
736 } else if ("PersistentGet".equals(messageName)) {
737 fireReceivedPersistentGet(new PersistentGet(fcpMessage));
738 } else if ("PersistentPut".equals(messageName)) {
739 fireReceivedPersistentPut(new PersistentPut(fcpMessage));
740 } else if ("PersistentPutDir".equals(messageName)) {
741 fireReceivedPersistentPutDir(new PersistentPutDir(fcpMessage));
742 } else if ("URIGenerated".equals(messageName)) {
743 fireReceivedURIGenerated(new URIGenerated(fcpMessage));
744 } else if ("EndListPersistentRequests".equals(messageName)) {
745 fireReceivedEndListPersistentRequests(new EndListPersistentRequests(fcpMessage));
746 } else if ("Peer".equals(messageName)) {
747 fireReceivedPeer(new Peer(fcpMessage));
748 } else if ("PeerNote".equals(messageName)) {
749 fireReceivedPeerNote(new PeerNote(fcpMessage));
750 } else if ("StartedCompression".equals(messageName)) {
751 fireReceivedStartedCompression(new StartedCompression(fcpMessage));
752 } else if ("FinishedCompression".equals(messageName)) {
753 fireReceivedFinishedCompression(new FinishedCompression(fcpMessage));
754 } else if ("GetFailed".equals(messageName)) {
755 fireReceivedGetFailed(new GetFailed(fcpMessage));
756 } else if ("PutFetchable".equals(messageName)) {
757 fireReceivedPutFetchable(new PutFetchable(fcpMessage));
758 } else if ("PutSuccessful".equals(messageName)) {
759 fireReceivedPutSuccessful(new PutSuccessful(fcpMessage));
760 } else if ("PutFailed".equals(messageName)) {
761 fireReceivedPutFailed(new PutFailed(fcpMessage));
762 } else if ("DataFound".equals(messageName)) {
763 fireReceivedDataFound(new DataFound(fcpMessage));
764 } else if ("SubscribedUSKUpdate".equals(messageName)) {
765 fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage));
766 } else if ("IdentifierCollision".equals(messageName)) {
767 fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage));
768 } else if ("AllData".equals(messageName)) {
769 LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
770 fireReceivedAllData(new AllData(fcpMessage, payloadInputStream));
772 payloadInputStream.consume();
773 } catch (IOException ioe1) {
774 /* well, ignore. when the connection handler fails, all fails. */
776 } else if ("EndListPeerNotes".equals(messageName)) {
777 fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage));
778 } else if ("EndListPeers".equals(messageName)) {
779 fireReceivedEndListPeers(new EndListPeers(fcpMessage));
780 } else if ("SSKKeypair".equals(messageName)) {
781 fireReceivedSSKKeypair(new SSKKeypair(fcpMessage));
782 } else if ("PeerRemoved".equals(messageName)) {
783 fireReceivedPeerRemoved(new PeerRemoved(fcpMessage));
784 } else if ("PersistentRequestModified".equals(messageName)) {
785 fireReceivedPersistentRequestModified(new PersistentRequestModified(fcpMessage));
786 } else if ("PersistentRequestRemoved".equals(messageName)) {
787 fireReceivedPersistentRequestRemoved(new PersistentRequestRemoved(fcpMessage));
788 } else if ("UnknownPeerNoteType".equals(messageName)) {
789 fireReceivedUnknownPeerNoteType(new UnknownPeerNoteType(fcpMessage));
790 } else if ("UnknownNodeIdentifier".equals(messageName)) {
791 fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage));
792 } else if ("FCPPluginReply".equals(messageName)) {
793 LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
794 fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream));
796 payloadInputStream.consume();
797 } catch (IOException ioe1) {
800 } else if ("PluginInfo".equals(messageName)) {
801 fireReceivedPluginInfo(new PluginInfo(fcpMessage));
802 } else if ("NodeData".equals(messageName)) {
803 fireReceivedNodeData(new NodeData(fcpMessage));
804 } else if ("TestDDAReply".equals(messageName)) {
805 fireReceivedTestDDAReply(new TestDDAReply(fcpMessage));
806 } else if ("TestDDAComplete".equals(messageName)) {
807 fireReceivedTestDDAComplete(new TestDDAComplete(fcpMessage));
808 } else if ("ConfigData".equals(messageName)) {
809 fireReceivedConfigData(new ConfigData(fcpMessage));
810 } else if ("NodeHello".equals(messageName)) {
811 fireReceivedNodeHello(new NodeHello(fcpMessage));
812 } else if ("CloseConnectionDuplicateClientName".equals(messageName)) {
813 fireReceivedCloseConnectionDuplicateClientName(new CloseConnectionDuplicateClientName(fcpMessage));
815 fireMessageReceived(fcpMessage);
820 * Handles a disconnect from the node.
823 * The exception that caused the disconnect, or <code>null</code>
824 * if there was no exception
826 synchronized void handleDisconnect(Throwable throwable) {
827 FcpUtils.close(remoteInputStream);
828 FcpUtils.close(remoteOutputStream);
829 FcpUtils.close(remoteSocket);
830 if (connectionHandler != null) {
831 connectionHandler.stop();
832 connectionHandler = null;
833 fireConnectionClosed(throwable);
842 * Incremets the counter in {@link #incomingMessageStatistics} by <cod>1</code>
843 * for the given message name.
846 * The name of the message to count
848 private void countMessage(String name) {
850 if (incomingMessageStatistics.containsKey(name)) {
851 oldValue = incomingMessageStatistics.get(name);
853 incomingMessageStatistics.put(name, oldValue + 1);
854 logger.finest("count for " + name + ": " + (oldValue + 1));
858 * Returns a limited input stream from the node’s input stream.
861 * The length of the stream
862 * @return The limited input stream
864 private LimitedInputStream getInputStream(long dataLength) {
865 if (dataLength <= 0) {
866 return new LimitedInputStream(null, 0);
868 return new LimitedInputStream(remoteInputStream, dataLength);
872 * A wrapper around an {@link InputStream} that only supplies a limit number
873 * of bytes from the underlying input stream.
875 * @author David ‘Bombe’ Roden <bombe@freenetproject.org>
877 private static class LimitedInputStream extends FilterInputStream {
879 /** The remaining number of bytes that can be read. */
880 private long remaining;
883 * Creates a new LimitedInputStream that supplies at most
884 * <code>length</code> bytes from the given input stream.
889 * The number of bytes to read
891 public LimitedInputStream(InputStream inputStream, long length) {
897 * @see java.io.FilterInputStream#available()
900 public synchronized int available() throws IOException {
901 if (remaining == 0) {
904 return (int) Math.min(super.available(), Math.min(Integer.MAX_VALUE, remaining));
908 * @see java.io.FilterInputStream#read()
911 public synchronized int read() throws IOException {
921 * @see java.io.FilterInputStream#read(byte[], int, int)
924 public synchronized int read(byte[] b, int off, int len) throws IOException {
925 if (remaining == 0) {
928 int toCopy = (int) Math.min(len, Math.min(remaining, Integer.MAX_VALUE));
929 int read = super.read(b, off, toCopy);
935 * @see java.io.FilterInputStream#skip(long)
938 public synchronized long skip(long n) throws IOException {
939 if ((n < 0) || (remaining == 0)) {
942 long skipped = super.skip(Math.min(n, remaining));
943 remaining -= skipped;
948 * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
949 * {@link #reset()} are not supported.
951 * @see java.io.FilterInputStream#mark(int)
954 public void mark(int readlimit) {
961 * @see java.io.FilterInputStream#markSupported()
962 * @return <code>false</code>
965 public boolean markSupported() {
970 * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
971 * {@link #reset()} are not supported.
973 * @see java.io.FilterInputStream#reset()
976 public void reset() throws IOException {
981 * Consumes the input stream, i.e. read all bytes until the limit is
984 * @throws IOException
985 * if an I/O error occurs
987 public void consume() throws IOException {
988 while (remaining > 0) {