2 * jSite2 - FpcConnection.java -
3 * Copyright © 2008 David Roden
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 package net.pterodactylus.fcp;
22 import java.io.Closeable;
23 import java.io.FilterInputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.OutputStream;
27 import java.net.InetAddress;
28 import java.net.Socket;
29 import java.net.UnknownHostException;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
35 import java.util.logging.Logger;
38 * An FCP connection to a Freenet node.
40 * @author David ‘Bombe’ Roden <bombe@freenetproject.org>
43 public class FcpConnection implements Closeable {
46 private static final Logger logger = Logger.getLogger(FcpConnection.class.getName());
48 /** The default port for FCP v2. */
49 public static final int DEFAULT_PORT = 9481;
51 /** The list of FCP listeners. */
52 private final List<FcpListener> fcpListeners = new ArrayList<FcpListener>();
54 /** The address of the node. */
55 private final InetAddress address;
57 /** The port number of the node’s FCP port. */
58 private final int port;
60 /** The remote socket. */
61 private Socket remoteSocket;
63 /** The input stream from the node. */
64 private InputStream remoteInputStream;
66 /** The output stream to the node. */
67 private OutputStream remoteOutputStream;
69 /** The connection handler. */
70 private FcpConnectionHandler connectionHandler;
72 /** Incoming message statistics. */
73 private Map<String, Integer> incomingMessageStatistics = Collections.synchronizedMap(new HashMap<String, Integer>());
76 * Creates a new FCP connection to the freenet node running on localhost,
77 * using the default port.
79 * @throws UnknownHostException
80 * if the hostname can not be resolved
82 public FcpConnection() throws UnknownHostException {
83 this(InetAddress.getLocalHost());
87 * Creates a new FCP connection to the Freenet node running on the given
88 * host, listening on the default port.
91 * The hostname of the Freenet node
92 * @throws UnknownHostException
93 * if <code>host</code> can not be resolved
95 public FcpConnection(String host) throws UnknownHostException {
96 this(host, DEFAULT_PORT);
100 * Creates a new FCP connection to the Freenet node running on the given
101 * host, listening on the given port.
104 * The hostname of the Freenet node
106 * The port number of the node’s FCP port
107 * @throws UnknownHostException
108 * if <code>host</code> can not be resolved
110 public FcpConnection(String host, int port) throws UnknownHostException {
111 this(InetAddress.getByName(host), port);
115 * Creates a new FCP connection to the Freenet node running at the given
116 * address, listening on the default port.
119 * The address of the Freenet node
121 public FcpConnection(InetAddress address) {
122 this(address, DEFAULT_PORT);
126 * Creates a new FCP connection to the Freenet node running at the given
127 * address, listening on the given port.
130 * The address of the Freenet node
132 * The port number of the node’s FCP port
134 public FcpConnection(InetAddress address, int port) {
135 this.address = address;
140 // LISTENER MANAGEMENT
144 * Adds the given listener to the list of listeners.
147 * The listener to add
149 public void addFcpListener(FcpListener fcpListener) {
150 fcpListeners.add(fcpListener);
154 * Removes the given listener from the list of listeners.
157 * The listener to remove
159 public void removeFcpListener(FcpListener fcpListener) {
160 fcpListeners.remove(fcpListener);
164 * Notifies listeners that a “NodeHello” message was received.
166 * @see FcpListener#receivedNodeHello(FcpConnection, NodeHello)
168 * The “NodeHello” message
170 private void fireReceivedNodeHello(NodeHello nodeHello) {
171 for (FcpListener fcpListener: fcpListeners) {
172 fcpListener.receivedNodeHello(this, nodeHello);
177 * Notifies listeners that a “CloseConnectionDuplicateClientName” message
180 * @see FcpListener#receivedCloseConnectionDuplicateClientName(FcpConnection,
181 * CloseConnectionDuplicateClientName)
182 * @param closeConnectionDuplicateClientName
183 * The “CloseConnectionDuplicateClientName” message
185 private void fireReceivedCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
186 for (FcpListener fcpListener: fcpListeners) {
187 fcpListener.receivedCloseConnectionDuplicateClientName(this, closeConnectionDuplicateClientName);
192 * Notifies listeners that a “SSKKeypair” message was received.
194 * @see FcpListener#receivedSSKKeypair(FcpConnection, SSKKeypair)
196 * The “SSKKeypair” message
198 private void fireReceivedSSKKeypair(SSKKeypair sskKeypair) {
199 for (FcpListener fcpListener: fcpListeners) {
200 fcpListener.receivedSSKKeypair(this, sskKeypair);
205 * Notifies listeners that a “Peer” message was received.
207 * @see FcpListener#receivedPeer(FcpConnection, Peer)
211 private void fireReceivedPeer(Peer peer) {
212 for (FcpListener fcpListener: fcpListeners) {
213 fcpListener.receivedPeer(this, peer);
218 * Notifies all listeners that an “EndListPeers” message was received.
220 * @see FcpListener#receivedEndListPeers(FcpConnection, EndListPeers)
221 * @param endListPeers
222 * The “EndListPeers” message
224 private void fireReceivedEndListPeers(EndListPeers endListPeers) {
225 for (FcpListener fcpListener: fcpListeners) {
226 fcpListener.receivedEndListPeers(this, endListPeers);
231 * Notifies all listeners that a “PeerNote” message was received.
233 * @see FcpListener#receivedPeerNote(FcpConnection, PeerNote)
236 private void fireReceivedPeerNote(PeerNote peerNote) {
237 for (FcpListener fcpListener: fcpListeners) {
238 fcpListener.receivedPeerNote(this, peerNote);
243 * Notifies all listeners that an “EndListPeerNotes” message was received.
245 * @see FcpListener#receivedEndListPeerNotes(FcpConnection,
247 * @param endListPeerNotes
248 * The “EndListPeerNotes” message
250 private void fireReceivedEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
251 for (FcpListener fcpListener: fcpListeners) {
252 fcpListener.receivedEndListPeerNotes(this, endListPeerNotes);
257 * Notifies all listeners that a “PeerRemoved” message was received.
259 * @see FcpListener#receivedPeerRemoved(FcpConnection, PeerRemoved)
261 * The “PeerRemoved” message
263 private void fireReceivedPeerRemoved(PeerRemoved peerRemoved) {
264 for (FcpListener fcpListener: fcpListeners) {
265 fcpListener.receivedPeerRemoved(this, peerRemoved);
270 * Notifies all listeners that a “NodeData” message was received.
272 * @see FcpListener#receivedNodeData(FcpConnection, NodeData)
274 * The “NodeData” message
276 private void fireReceivedNodeData(NodeData nodeData) {
277 for (FcpListener fcpListener: fcpListeners) {
278 fcpListener.receivedNodeData(this, nodeData);
283 * Notifies all listeners that a “TestDDAReply” message was received.
285 * @see FcpListener#receivedTestDDAReply(FcpConnection, TestDDAReply)
286 * @param testDDAReply
287 * The “TestDDAReply” message
289 private void fireReceivedTestDDAReply(TestDDAReply testDDAReply) {
290 for (FcpListener fcpListener: fcpListeners) {
291 fcpListener.receivedTestDDAReply(this, testDDAReply);
296 * Notifies all listeners that a “TestDDAComplete” message was received.
298 * @see FcpListener#receivedTestDDAComplete(FcpConnection, TestDDAComplete)
299 * @param testDDAComplete
300 * The “TestDDAComplete” message
302 private void fireReceivedTestDDAComplete(TestDDAComplete testDDAComplete) {
303 for (FcpListener fcpListener: fcpListeners) {
304 fcpListener.receivedTestDDAComplete(this, testDDAComplete);
309 * Notifies all listeners that a “PersistentGet” message was received.
311 * @see FcpListener#receivedPersistentGet(FcpConnection, PersistentGet)
312 * @param persistentGet
313 * The “PersistentGet” message
315 private void fireReceivedPersistentGet(PersistentGet persistentGet) {
316 for (FcpListener fcpListener: fcpListeners) {
317 fcpListener.receivedPersistentGet(this, persistentGet);
322 * Notifies all listeners that a “PersistentPut” message was received.
324 * @see FcpListener#receivedPersistentPut(FcpConnection, PersistentPut)
325 * @param persistentPut
326 * The “PersistentPut” message
328 private void fireReceivedPersistentPut(PersistentPut persistentPut) {
329 for (FcpListener fcpListener: fcpListeners) {
330 fcpListener.receivedPersistentPut(this, persistentPut);
335 * Notifies all listeners that a “EndListPersistentRequests” message was
338 * @see FcpListener#receivedEndListPersistentRequests(FcpConnection,
339 * EndListPersistentRequests)
340 * @param endListPersistentRequests
341 * The “EndListPersistentRequests” message
343 private void fireReceivedEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
344 for (FcpListener fcpListener: fcpListeners) {
345 fcpListener.receivedEndListPersistentRequests(this, endListPersistentRequests);
350 * Notifies all listeners that a “URIGenerated” message was received.
352 * @see FcpListener#receivedURIGenerated(FcpConnection, URIGenerated)
353 * @param uriGenerated
354 * The “URIGenerated” message
356 private void fireReceivedURIGenerated(URIGenerated uriGenerated) {
357 for (FcpListener fcpListener: fcpListeners) {
358 fcpListener.receivedURIGenerated(this, uriGenerated);
363 * Notifies all listeners that a “DataFound” message was received.
365 * @see FcpListener#receivedDataFound(FcpConnection, DataFound)
367 * The “DataFound” message
369 private void fireReceivedDataFound(DataFound dataFound) {
370 for (FcpListener fcpListener: fcpListeners) {
371 fcpListener.receivedDataFound(this, dataFound);
376 * Notifies all listeners that an “AllData” message was received.
378 * @see FcpListener#receivedAllData(FcpConnection, AllData)
380 * The “AllData” message
382 private void fireReceivedAllData(AllData allData) {
383 for (FcpListener fcpListener: fcpListeners) {
384 fcpListener.receivedAllData(this, allData);
389 * Notifies all listeners that a “SimpleProgress” message was received.
391 * @see FcpListener#receivedSimpleProgress(FcpConnection, SimpleProgress)
392 * @param simpleProgress
393 * The “SimpleProgress” message
395 private void fireReceivedSimpleProgress(SimpleProgress simpleProgress) {
396 for (FcpListener fcpListener: fcpListeners) {
397 fcpListener.receivedSimpleProgress(this, simpleProgress);
402 * Notifies all listeners that a “StartedCompression” message was received.
404 * @see FcpListener#receivedStartedCompression(FcpConnection,
405 * StartedCompression)
406 * @param startedCompression
407 * The “StartedCompression” message
409 private void fireReceivedStartedCompression(StartedCompression startedCompression) {
410 for (FcpListener fcpListener: fcpListeners) {
411 fcpListener.receivedStartedCompression(this, startedCompression);
416 * Notifies all listeners that a “FinishedCompression” message was received.
418 * @see FcpListener#receviedFinishedCompression(FcpConnection,
419 * FinishedCompression)
420 * @param finishedCompression
421 * The “FinishedCompression” message
423 private void fireReceivedFinishedCompression(FinishedCompression finishedCompression) {
424 for (FcpListener fcpListener: fcpListeners) {
425 fcpListener.receviedFinishedCompression(this, finishedCompression);
430 * Notifies all listeners that an “UnknownPeerNoteType” message was
433 * @see FcpListener#receivedUnknownPeerNoteType(FcpConnection,
434 * UnknownPeerNoteType)
435 * @param unknownPeerNoteType
436 * The “UnknownPeerNoteType” message
438 private void fireReceivedUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
439 for (FcpListener fcpListener: fcpListeners) {
440 fcpListener.receivedUnknownPeerNoteType(this, unknownPeerNoteType);
445 * Notifies all listeners that an “UnknownNodeIdentifier” message was
448 * @see FcpListener#receivedUnknownNodeIdentifier(FcpConnection,
449 * UnknownNodeIdentifier)
450 * @param unknownNodeIdentifier
451 * The “UnknownNodeIdentifier” message
453 private void fireReceivedUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
454 for (FcpListener fcpListener: fcpListeners) {
455 fcpListener.receivedUnknownNodeIdentifier(this, unknownNodeIdentifier);
460 * Notifies all listeners that a “ConfigData” message was received.
462 * @see FcpListener#receivedConfigData(FcpConnection, ConfigData)
464 * The “ConfigData” message
466 private void fireReceivedConfigData(ConfigData configData) {
467 for (FcpListener fcpListener: fcpListeners) {
468 fcpListener.receivedConfigData(this, configData);
473 * Notifies all listeners that a “GetFailed” message was received.
475 * @see FcpListener#receivedGetFailed(FcpConnection, GetFailed)
477 * The “GetFailed” message
479 private void fireReceivedGetFailed(GetFailed getFailed) {
480 for (FcpListener fcpListener: fcpListeners) {
481 fcpListener.receivedGetFailed(this, getFailed);
486 * Notifies all listeners that a “PutFailed” message was received.
488 * @see FcpListener#receivedPutFailed(FcpConnection, PutFailed)
490 * The “PutFailed” message
492 private void fireReceivedPutFailed(PutFailed putFailed) {
493 for (FcpListener fcpListener: fcpListeners) {
494 fcpListener.receivedPutFailed(this, putFailed);
499 * Notifies all listeners that an “IdentifierCollision” message was
502 * @see FcpListener#receivedIdentifierCollision(FcpConnection,
503 * IdentifierCollision)
504 * @param identifierCollision
505 * The “IdentifierCollision” message
507 private void fireReceivedIdentifierCollision(IdentifierCollision identifierCollision) {
508 for (FcpListener fcpListener: fcpListeners) {
509 fcpListener.receivedIdentifierCollision(this, identifierCollision);
514 * Notifies all listeners that an “PersistentPutDir” message was received.
516 * @see FcpListener#receivedPersistentPutDir(FcpConnection,
518 * @param persistentPutDir
519 * The “PersistentPutDir” message
521 private void fireReceivedPersistentPutDir(PersistentPutDir persistentPutDir) {
522 for (FcpListener fcpListener: fcpListeners) {
523 fcpListener.receivedPersistentPutDir(this, persistentPutDir);
528 * Notifies all listeners that a “PersistentRequestRemoved” message was
531 * @see FcpListener#receivedPersistentRequestRemoved(FcpConnection,
532 * PersistentRequestRemoved)
533 * @param persistentRequestRemoved
534 * The “PersistentRequestRemoved” message
536 private void fireReceivedPersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) {
537 for (FcpListener fcpListener: fcpListeners) {
538 fcpListener.receivedPersistentRequestRemoved(this, persistentRequestRemoved);
543 * Notifies all listeners that a “SubscribedUSKUpdate” message was received.
545 * @see FcpListener#receivedSubscribedUSKUpdate(FcpConnection,
546 * SubscribedUSKUpdate)
547 * @param subscribedUSKUpdate
548 * The “SubscribedUSKUpdate” message
550 private void fireReceivedSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) {
551 for (FcpListener fcpListener: fcpListeners) {
552 fcpListener.receivedSubscribedUSKUpdate(this, subscribedUSKUpdate);
557 * Notifies all listeners that a “PluginInfo” message was received.
559 * @see FcpListener#receivedPluginInfo(FcpConnection, PluginInfo)
561 * The “PluginInfo” message
563 private void fireReceivedPluginInfo(PluginInfo pluginInfo) {
564 for (FcpListener fcpListener: fcpListeners) {
565 fcpListener.receivedPluginInfo(this, pluginInfo);
570 * Notifies all listeners that an “FCPPluginReply” message was received.
572 * @see FcpListener#receivedFCPPluginReply(FcpConnection, FCPPluginReply)
573 * @param fcpPluginReply
574 * The “FCPPluginReply” message
576 private void fireReceivedFCPPluginReply(FCPPluginReply fcpPluginReply) {
577 for (FcpListener fcpListener: fcpListeners) {
578 fcpListener.receivedFCPPluginReply(this, fcpPluginReply);
583 * Notifies all listeners that a “PersistentRequestModified” message was
586 * @see FcpListener#receivedPersistentRequestModified(FcpConnection,
587 * PersistentRequestModified)
588 * @param persistentRequestModified
589 * The “PersistentRequestModified” message
591 private void fireReceivedPersistentRequestModified(PersistentRequestModified persistentRequestModified) {
592 for (FcpListener fcpListener: fcpListeners) {
593 fcpListener.receivedPersistentRequestModified(this, persistentRequestModified);
598 * Notifies all listeners that a “PutSuccessful” message was received.
600 * @see FcpListener#receivedPutSuccessful(FcpConnection, PutSuccessful)
601 * @param putSuccessful
602 * The “PutSuccessful” message
604 private void fireReceivedPutSuccessful(PutSuccessful putSuccessful) {
605 for (FcpListener fcpListener: fcpListeners) {
606 fcpListener.receivedPutSuccessful(this, putSuccessful);
611 * Notifies all listeners that a “PutFetchable” message was received.
613 * @see FcpListener#receivedPutFetchable(FcpConnection, PutFetchable)
614 * @param putFetchable
615 * The “PutFetchable” message
617 private void fireReceivedPutFetchable(PutFetchable putFetchable) {
618 for (FcpListener fcpListener: fcpListeners) {
619 fcpListener.receivedPutFetchable(this, putFetchable);
624 * Notifies all listeners that a “ProtocolError” message was received.
626 * @see FcpListener#receivedProtocolError(FcpConnection, ProtocolError)
627 * @param protocolError
628 * The “ProtocolError” message
630 private void fireReceivedProtocolError(ProtocolError protocolError) {
631 for (FcpListener fcpListener: fcpListeners) {
632 fcpListener.receivedProtocolError(this, protocolError);
637 * Notifies all registered listeners that a message has been received.
639 * @see FcpListener#receivedMessage(FcpConnection, FcpMessage)
641 * The message that was received
643 private void fireMessageReceived(FcpMessage fcpMessage) {
644 for (FcpListener fcpListener: fcpListeners) {
645 fcpListener.receivedMessage(this, fcpMessage);
650 * Notifies all listeners that the connection to the node was closed.
653 * The exception that caused the disconnect, or <code>null</code>
654 * if there was no exception
655 * @see FcpListener#connectionClosed(FcpConnection, Throwable)
657 private void fireConnectionClosed(Throwable throwable) {
658 for (FcpListener fcpListener: fcpListeners) {
659 fcpListener.connectionClosed(this, throwable);
668 * Connects to the node.
670 * @throws IOException
671 * if an I/O error occurs
672 * @throws IllegalStateException
673 * if there is already a connection to the node
675 public synchronized void connect() throws IOException, IllegalStateException {
676 if (connectionHandler != null) {
677 throw new IllegalStateException("already connected, disconnect first");
679 logger.info("connecting to " + address + ":" + port + "…");
680 remoteSocket = new Socket(address, port);
681 remoteInputStream = remoteSocket.getInputStream();
682 remoteOutputStream = remoteSocket.getOutputStream();
683 new Thread(connectionHandler = new FcpConnectionHandler(this, remoteInputStream)).start();
687 * Disconnects from the node. If there is no connection to the node, this
688 * method does nothing.
690 * @deprecated Use {@link #close()} instead
693 public synchronized void disconnect() {
698 * Closes the connection. If there is no connection to the node, this method
701 public void close() {
702 handleDisconnect(null);
706 * Sends the given FCP message.
709 * The FCP message to send
710 * @throws IOException
711 * if an I/O error occurs
713 public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException {
714 logger.fine("sending message: " + fcpMessage.getName());
715 fcpMessage.write(remoteOutputStream);
719 // PACKAGE-PRIVATE METHODS
723 * Handles the given message, notifying listeners. This message should only
724 * be called by {@link FcpConnectionHandler}.
727 * The received message
729 void handleMessage(FcpMessage fcpMessage) {
730 logger.fine("received message: " + fcpMessage.getName());
731 String messageName = fcpMessage.getName();
732 countMessage(messageName);
733 if ("SimpleProgress".equals(messageName)) {
734 fireReceivedSimpleProgress(new SimpleProgress(fcpMessage));
735 } else if ("ProtocolError".equals(messageName)) {
736 fireReceivedProtocolError(new ProtocolError(fcpMessage));
737 } else if ("PersistentGet".equals(messageName)) {
738 fireReceivedPersistentGet(new PersistentGet(fcpMessage));
739 } else if ("PersistentPut".equals(messageName)) {
740 fireReceivedPersistentPut(new PersistentPut(fcpMessage));
741 } else if ("PersistentPutDir".equals(messageName)) {
742 fireReceivedPersistentPutDir(new PersistentPutDir(fcpMessage));
743 } else if ("URIGenerated".equals(messageName)) {
744 fireReceivedURIGenerated(new URIGenerated(fcpMessage));
745 } else if ("EndListPersistentRequests".equals(messageName)) {
746 fireReceivedEndListPersistentRequests(new EndListPersistentRequests(fcpMessage));
747 } else if ("Peer".equals(messageName)) {
748 fireReceivedPeer(new Peer(fcpMessage));
749 } else if ("PeerNote".equals(messageName)) {
750 fireReceivedPeerNote(new PeerNote(fcpMessage));
751 } else if ("StartedCompression".equals(messageName)) {
752 fireReceivedStartedCompression(new StartedCompression(fcpMessage));
753 } else if ("FinishedCompression".equals(messageName)) {
754 fireReceivedFinishedCompression(new FinishedCompression(fcpMessage));
755 } else if ("GetFailed".equals(messageName)) {
756 fireReceivedGetFailed(new GetFailed(fcpMessage));
757 } else if ("PutFetchable".equals(messageName)) {
758 fireReceivedPutFetchable(new PutFetchable(fcpMessage));
759 } else if ("PutSuccessful".equals(messageName)) {
760 fireReceivedPutSuccessful(new PutSuccessful(fcpMessage));
761 } else if ("PutFailed".equals(messageName)) {
762 fireReceivedPutFailed(new PutFailed(fcpMessage));
763 } else if ("DataFound".equals(messageName)) {
764 fireReceivedDataFound(new DataFound(fcpMessage));
765 } else if ("SubscribedUSKUpdate".equals(messageName)) {
766 fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage));
767 } else if ("IdentifierCollision".equals(messageName)) {
768 fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage));
769 } else if ("AllData".equals(messageName)) {
770 LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
771 fireReceivedAllData(new AllData(fcpMessage, payloadInputStream));
773 payloadInputStream.consume();
774 } catch (IOException ioe1) {
775 /* well, ignore. when the connection handler fails, all fails. */
777 } else if ("EndListPeerNotes".equals(messageName)) {
778 fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage));
779 } else if ("EndListPeers".equals(messageName)) {
780 fireReceivedEndListPeers(new EndListPeers(fcpMessage));
781 } else if ("SSKKeypair".equals(messageName)) {
782 fireReceivedSSKKeypair(new SSKKeypair(fcpMessage));
783 } else if ("PeerRemoved".equals(messageName)) {
784 fireReceivedPeerRemoved(new PeerRemoved(fcpMessage));
785 } else if ("PersistentRequestModified".equals(messageName)) {
786 fireReceivedPersistentRequestModified(new PersistentRequestModified(fcpMessage));
787 } else if ("PersistentRequestRemoved".equals(messageName)) {
788 fireReceivedPersistentRequestRemoved(new PersistentRequestRemoved(fcpMessage));
789 } else if ("UnknownPeerNoteType".equals(messageName)) {
790 fireReceivedUnknownPeerNoteType(new UnknownPeerNoteType(fcpMessage));
791 } else if ("UnknownNodeIdentifier".equals(messageName)) {
792 fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage));
793 } else if ("FCPPluginReply".equals(messageName)) {
794 LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
795 fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream));
797 payloadInputStream.consume();
798 } catch (IOException ioe1) {
801 } else if ("PluginInfo".equals(messageName)) {
802 fireReceivedPluginInfo(new PluginInfo(fcpMessage));
803 } else if ("NodeData".equals(messageName)) {
804 fireReceivedNodeData(new NodeData(fcpMessage));
805 } else if ("TestDDAReply".equals(messageName)) {
806 fireReceivedTestDDAReply(new TestDDAReply(fcpMessage));
807 } else if ("TestDDAComplete".equals(messageName)) {
808 fireReceivedTestDDAComplete(new TestDDAComplete(fcpMessage));
809 } else if ("ConfigData".equals(messageName)) {
810 fireReceivedConfigData(new ConfigData(fcpMessage));
811 } else if ("NodeHello".equals(messageName)) {
812 fireReceivedNodeHello(new NodeHello(fcpMessage));
813 } else if ("CloseConnectionDuplicateClientName".equals(messageName)) {
814 fireReceivedCloseConnectionDuplicateClientName(new CloseConnectionDuplicateClientName(fcpMessage));
816 fireMessageReceived(fcpMessage);
821 * Handles a disconnect from the node.
824 * The exception that caused the disconnect, or <code>null</code>
825 * if there was no exception
827 synchronized void handleDisconnect(Throwable throwable) {
828 FcpUtils.close(remoteInputStream);
829 FcpUtils.close(remoteOutputStream);
830 FcpUtils.close(remoteSocket);
831 if (connectionHandler != null) {
832 connectionHandler.stop();
833 connectionHandler = null;
835 fireConnectionClosed(throwable);
843 * Incremets the counter in {@link #incomingMessageStatistics} by <cod>1</code>
844 * for the given message name.
847 * The name of the message to count
849 private void countMessage(String name) {
851 if (incomingMessageStatistics.containsKey(name)) {
852 oldValue = incomingMessageStatistics.get(name);
854 incomingMessageStatistics.put(name, oldValue + 1);
855 logger.finest("count for " + name + ": " + (oldValue + 1));
859 * Returns a limited input stream from the node’s input stream.
862 * The length of the stream
863 * @return The limited input stream
865 private LimitedInputStream getInputStream(long dataLength) {
866 if (dataLength <= 0) {
867 return new LimitedInputStream(null, 0);
869 return new LimitedInputStream(remoteInputStream, dataLength);
873 * A wrapper around an {@link InputStream} that only supplies a limit number
874 * of bytes from the underlying input stream.
876 * @author <a href="mailto:dr@ina-germany.de">David Roden</a>
879 private static class LimitedInputStream extends FilterInputStream {
881 /** The remaining number of bytes that can be read. */
882 private long remaining;
885 * Creates a new LimitedInputStream that supplies at most
886 * <code>length</code> bytes from the given input stream.
891 * The number of bytes to read
893 public LimitedInputStream(InputStream inputStream, long length) {
899 * @see java.io.FilterInputStream#available()
902 public synchronized int available() throws IOException {
903 if (remaining == 0) {
906 return (int) Math.min(super.available(), Math.min(Integer.MAX_VALUE, remaining));
910 * @see java.io.FilterInputStream#read()
913 public synchronized int read() throws IOException {
923 * @see java.io.FilterInputStream#read(byte[], int, int)
926 public synchronized int read(byte[] b, int off, int len) throws IOException {
927 if (remaining == 0) {
930 int toCopy = (int) Math.min(len, Math.min(remaining, Integer.MAX_VALUE));
931 int read = super.read(b, off, toCopy);
937 * @see java.io.FilterInputStream#skip(long)
940 public synchronized long skip(long n) throws IOException {
941 if ((n < 0) || (remaining == 0)) {
944 long skipped = super.skip(Math.min(n, remaining));
945 remaining -= skipped;
950 * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
951 * {@link #reset()} are not supported.
953 * @see java.io.FilterInputStream#mark(int)
956 public void mark(int readlimit) {
963 * @see java.io.FilterInputStream#markSupported()
964 * @return <code>false</code>
967 public boolean markSupported() {
972 * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
973 * {@link #reset()} are not supported.
975 * @see java.io.FilterInputStream#reset()
978 public void reset() throws IOException {
983 * Consumes the input stream, i.e. read all bytes until the limit is
986 * @throws IOException
987 * if an I/O error occurs
989 public void consume() throws IOException {
990 while (remaining > 0) {