d1a1be7cb17d4ea03847568f46ed3f592c4eb47c
[jFCPlib.git] / src / net / pterodactylus / fcp / FcpConnection.java
1 /*
2  * jSite2 - FpcConnection.java -
3  * Copyright © 2008 David Roden
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18  */
19
20 package net.pterodactylus.fcp;
21
22 import java.io.Closeable;
23 import java.io.FilterInputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.OutputStream;
27 import java.net.InetAddress;
28 import java.net.Socket;
29 import java.net.UnknownHostException;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.logging.Logger;
36
37 /**
38  * An FCP connection to a Freenet node.
39  *
40  * @author David ‘Bombe’ Roden <bombe@freenetproject.org>
41  * @version $Id$
42  */
43 public class FcpConnection implements Closeable {
44
45         /** Logger. */
46         private static final Logger logger = Logger.getLogger(FcpConnection.class.getName());
47
48         /** The default port for FCP v2. */
49         public static final int DEFAULT_PORT = 9481;
50
51         /** The list of FCP listeners. */
52         private final List<FcpListener> fcpListeners = new ArrayList<FcpListener>();
53
54         /** The address of the node. */
55         private final InetAddress address;
56
57         /** The port number of the node’s FCP port. */
58         private final int port;
59
60         /** The remote socket. */
61         private Socket remoteSocket;
62
63         /** The input stream from the node. */
64         private InputStream remoteInputStream;
65
66         /** The output stream to the node. */
67         private OutputStream remoteOutputStream;
68
69         /** The connection handler. */
70         private FcpConnectionHandler connectionHandler;
71
72         /** Incoming message statistics. */
73         private Map<String, Integer> incomingMessageStatistics = Collections.synchronizedMap(new HashMap<String, Integer>());
74
75         /**
76          * Creates a new FCP connection to the freenet node running on localhost,
77          * using the default port.
78          *
79          * @throws UnknownHostException
80          *             if the hostname can not be resolved
81          */
82         public FcpConnection() throws UnknownHostException {
83                 this(InetAddress.getLocalHost());
84         }
85
86         /**
87          * Creates a new FCP connection to the Freenet node running on the given
88          * host, listening on the default port.
89          *
90          * @param host
91          *            The hostname of the Freenet node
92          * @throws UnknownHostException
93          *             if <code>host</code> can not be resolved
94          */
95         public FcpConnection(String host) throws UnknownHostException {
96                 this(host, DEFAULT_PORT);
97         }
98
99         /**
100          * Creates a new FCP connection to the Freenet node running on the given
101          * host, listening on the given port.
102          *
103          * @param host
104          *            The hostname of the Freenet node
105          * @param port
106          *            The port number of the node’s FCP port
107          * @throws UnknownHostException
108          *             if <code>host</code> can not be resolved
109          */
110         public FcpConnection(String host, int port) throws UnknownHostException {
111                 this(InetAddress.getByName(host), port);
112         }
113
114         /**
115          * Creates a new FCP connection to the Freenet node running at the given
116          * address, listening on the default port.
117          *
118          * @param address
119          *            The address of the Freenet node
120          */
121         public FcpConnection(InetAddress address) {
122                 this(address, DEFAULT_PORT);
123         }
124
125         /**
126          * Creates a new FCP connection to the Freenet node running at the given
127          * address, listening on the given port.
128          *
129          * @param address
130          *            The address of the Freenet node
131          * @param port
132          *            The port number of the node’s FCP port
133          */
134         public FcpConnection(InetAddress address, int port) {
135                 this.address = address;
136                 this.port = port;
137         }
138
139         //
140         // LISTENER MANAGEMENT
141         //
142
143         /**
144          * Adds the given listener to the list of listeners.
145          *
146          * @param fcpListener
147          *            The listener to add
148          */
149         public void addFcpListener(FcpListener fcpListener) {
150                 fcpListeners.add(fcpListener);
151         }
152
153         /**
154          * Removes the given listener from the list of listeners.
155          *
156          * @param fcpListener
157          *            The listener to remove
158          */
159         public void removeFcpListener(FcpListener fcpListener) {
160                 fcpListeners.remove(fcpListener);
161         }
162
163         /**
164          * Notifies listeners that a “NodeHello” message was received.
165          *
166          * @see FcpListener#receivedNodeHello(FcpConnection, NodeHello)
167          * @param nodeHello
168          *            The “NodeHello” message
169          */
170         private void fireReceivedNodeHello(NodeHello nodeHello) {
171                 for (FcpListener fcpListener: fcpListeners) {
172                         fcpListener.receivedNodeHello(this, nodeHello);
173                 }
174         }
175
176         /**
177          * Notifies listeners that a “CloseConnectionDuplicateClientName” message
178          * was received.
179          *
180          * @see FcpListener#receivedCloseConnectionDuplicateClientName(FcpConnection,
181          *      CloseConnectionDuplicateClientName)
182          * @param closeConnectionDuplicateClientName
183          *            The “CloseConnectionDuplicateClientName” message
184          */
185         private void fireReceivedCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
186                 for (FcpListener fcpListener: fcpListeners) {
187                         fcpListener.receivedCloseConnectionDuplicateClientName(this, closeConnectionDuplicateClientName);
188                 }
189         }
190
191         /**
192          * Notifies listeners that a “SSKKeypair” message was received.
193          *
194          * @see FcpListener#receivedSSKKeypair(FcpConnection, SSKKeypair)
195          * @param sskKeypair
196          *            The “SSKKeypair” message
197          */
198         private void fireReceivedSSKKeypair(SSKKeypair sskKeypair) {
199                 for (FcpListener fcpListener: fcpListeners) {
200                         fcpListener.receivedSSKKeypair(this, sskKeypair);
201                 }
202         }
203
204         /**
205          * Notifies listeners that a “Peer” message was received.
206          *
207          * @see FcpListener#receivedPeer(FcpConnection, Peer)
208          * @param peer
209          *            The “Peer” message
210          */
211         private void fireReceivedPeer(Peer peer) {
212                 for (FcpListener fcpListener: fcpListeners) {
213                         fcpListener.receivedPeer(this, peer);
214                 }
215         }
216
217         /**
218          * Notifies all listeners that an “EndListPeers” message was received.
219          *
220          * @see FcpListener#receivedEndListPeers(FcpConnection, EndListPeers)
221          * @param endListPeers
222          *            The “EndListPeers” message
223          */
224         private void fireReceivedEndListPeers(EndListPeers endListPeers) {
225                 for (FcpListener fcpListener: fcpListeners) {
226                         fcpListener.receivedEndListPeers(this, endListPeers);
227                 }
228         }
229
230         /**
231          * Notifies all listeners that a “PeerNote” message was received.
232          *
233          * @see FcpListener#receivedPeerNote(FcpConnection, PeerNote)
234          * @param peerNote
235          */
236         private void fireReceivedPeerNote(PeerNote peerNote) {
237                 for (FcpListener fcpListener: fcpListeners) {
238                         fcpListener.receivedPeerNote(this, peerNote);
239                 }
240         }
241
242         /**
243          * Notifies all listeners that an “EndListPeerNotes” message was received.
244          *
245          * @see FcpListener#receivedEndListPeerNotes(FcpConnection,
246          *      EndListPeerNotes)
247          * @param endListPeerNotes
248          *            The “EndListPeerNotes” message
249          */
250         private void fireReceivedEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
251                 for (FcpListener fcpListener: fcpListeners) {
252                         fcpListener.receivedEndListPeerNotes(this, endListPeerNotes);
253                 }
254         }
255
256         /**
257          * Notifies all listeners that a “PeerRemoved” message was received.
258          *
259          * @see FcpListener#receivedPeerRemoved(FcpConnection, PeerRemoved)
260          * @param peerRemoved
261          *            The “PeerRemoved” message
262          */
263         private void fireReceivedPeerRemoved(PeerRemoved peerRemoved) {
264                 for (FcpListener fcpListener: fcpListeners) {
265                         fcpListener.receivedPeerRemoved(this, peerRemoved);
266                 }
267         }
268
269         /**
270          * Notifies all listeners that a “NodeData” message was received.
271          *
272          * @see FcpListener#receivedNodeData(FcpConnection, NodeData)
273          * @param nodeData
274          *            The “NodeData” message
275          */
276         private void fireReceivedNodeData(NodeData nodeData) {
277                 for (FcpListener fcpListener: fcpListeners) {
278                         fcpListener.receivedNodeData(this, nodeData);
279                 }
280         }
281
282         /**
283          * Notifies all listeners that a “TestDDAReply” message was received.
284          *
285          * @see FcpListener#receivedTestDDAReply(FcpConnection, TestDDAReply)
286          * @param testDDAReply
287          *            The “TestDDAReply” message
288          */
289         private void fireReceivedTestDDAReply(TestDDAReply testDDAReply) {
290                 for (FcpListener fcpListener: fcpListeners) {
291                         fcpListener.receivedTestDDAReply(this, testDDAReply);
292                 }
293         }
294
295         /**
296          * Notifies all listeners that a “TestDDAComplete” message was received.
297          *
298          * @see FcpListener#receivedTestDDAComplete(FcpConnection, TestDDAComplete)
299          * @param testDDAComplete
300          *            The “TestDDAComplete” message
301          */
302         private void fireReceivedTestDDAComplete(TestDDAComplete testDDAComplete) {
303                 for (FcpListener fcpListener: fcpListeners) {
304                         fcpListener.receivedTestDDAComplete(this, testDDAComplete);
305                 }
306         }
307
308         /**
309          * Notifies all listeners that a “PersistentGet” message was received.
310          *
311          * @see FcpListener#receivedPersistentGet(FcpConnection, PersistentGet)
312          * @param persistentGet
313          *            The “PersistentGet” message
314          */
315         private void fireReceivedPersistentGet(PersistentGet persistentGet) {
316                 for (FcpListener fcpListener: fcpListeners) {
317                         fcpListener.receivedPersistentGet(this, persistentGet);
318                 }
319         }
320
321         /**
322          * Notifies all listeners that a “PersistentPut” message was received.
323          *
324          * @see FcpListener#receivedPersistentPut(FcpConnection, PersistentPut)
325          * @param persistentPut
326          *            The “PersistentPut” message
327          */
328         private void fireReceivedPersistentPut(PersistentPut persistentPut) {
329                 for (FcpListener fcpListener: fcpListeners) {
330                         fcpListener.receivedPersistentPut(this, persistentPut);
331                 }
332         }
333
334         /**
335          * Notifies all listeners that a “EndListPersistentRequests” message was
336          * received.
337          *
338          * @see FcpListener#receivedEndListPersistentRequests(FcpConnection,
339          *      EndListPersistentRequests)
340          * @param endListPersistentRequests
341          *            The “EndListPersistentRequests” message
342          */
343         private void fireReceivedEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
344                 for (FcpListener fcpListener: fcpListeners) {
345                         fcpListener.receivedEndListPersistentRequests(this, endListPersistentRequests);
346                 }
347         }
348
349         /**
350          * Notifies all listeners that a “URIGenerated” message was received.
351          *
352          * @see FcpListener#receivedURIGenerated(FcpConnection, URIGenerated)
353          * @param uriGenerated
354          *            The “URIGenerated” message
355          */
356         private void fireReceivedURIGenerated(URIGenerated uriGenerated) {
357                 for (FcpListener fcpListener: fcpListeners) {
358                         fcpListener.receivedURIGenerated(this, uriGenerated);
359                 }
360         }
361
362         /**
363          * Notifies all listeners that a “DataFound” message was received.
364          *
365          * @see FcpListener#receivedDataFound(FcpConnection, DataFound)
366          * @param dataFound
367          *            The “DataFound” message
368          */
369         private void fireReceivedDataFound(DataFound dataFound) {
370                 for (FcpListener fcpListener: fcpListeners) {
371                         fcpListener.receivedDataFound(this, dataFound);
372                 }
373         }
374
375         /**
376          * Notifies all listeners that an “AllData” message was received.
377          *
378          * @see FcpListener#receivedAllData(FcpConnection, AllData)
379          * @param allData
380          *            The “AllData” message
381          */
382         private void fireReceivedAllData(AllData allData) {
383                 for (FcpListener fcpListener: fcpListeners) {
384                         fcpListener.receivedAllData(this, allData);
385                 }
386         }
387
388         /**
389          * Notifies all listeners that a “SimpleProgress” message was received.
390          *
391          * @see FcpListener#receivedSimpleProgress(FcpConnection, SimpleProgress)
392          * @param simpleProgress
393          *            The “SimpleProgress” message
394          */
395         private void fireReceivedSimpleProgress(SimpleProgress simpleProgress) {
396                 for (FcpListener fcpListener: fcpListeners) {
397                         fcpListener.receivedSimpleProgress(this, simpleProgress);
398                 }
399         }
400
401         /**
402          * Notifies all listeners that a “StartedCompression” message was received.
403          *
404          * @see FcpListener#receivedStartedCompression(FcpConnection,
405          *      StartedCompression)
406          * @param startedCompression
407          *            The “StartedCompression” message
408          */
409         private void fireReceivedStartedCompression(StartedCompression startedCompression) {
410                 for (FcpListener fcpListener: fcpListeners) {
411                         fcpListener.receivedStartedCompression(this, startedCompression);
412                 }
413         }
414
415         /**
416          * Notifies all listeners that a “FinishedCompression” message was received.
417          *
418          * @see FcpListener#receviedFinishedCompression(FcpConnection,
419          *      FinishedCompression)
420          * @param finishedCompression
421          *            The “FinishedCompression” message
422          */
423         private void fireReceivedFinishedCompression(FinishedCompression finishedCompression) {
424                 for (FcpListener fcpListener: fcpListeners) {
425                         fcpListener.receviedFinishedCompression(this, finishedCompression);
426                 }
427         }
428
429         /**
430          * Notifies all listeners that an “UnknownPeerNoteType” message was
431          * received.
432          *
433          * @see FcpListener#receivedUnknownPeerNoteType(FcpConnection,
434          *      UnknownPeerNoteType)
435          * @param unknownPeerNoteType
436          *            The “UnknownPeerNoteType” message
437          */
438         private void fireReceivedUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
439                 for (FcpListener fcpListener: fcpListeners) {
440                         fcpListener.receivedUnknownPeerNoteType(this, unknownPeerNoteType);
441                 }
442         }
443
444         /**
445          * Notifies all listeners that an “UnknownNodeIdentifier” message was
446          * received.
447          *
448          * @see FcpListener#receivedUnknownNodeIdentifier(FcpConnection,
449          *      UnknownNodeIdentifier)
450          * @param unknownNodeIdentifier
451          *            The “UnknownNodeIdentifier” message
452          */
453         private void fireReceivedUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
454                 for (FcpListener fcpListener: fcpListeners) {
455                         fcpListener.receivedUnknownNodeIdentifier(this, unknownNodeIdentifier);
456                 }
457         }
458
459         /**
460          * Notifies all listeners that a “ConfigData” message was received.
461          *
462          * @see FcpListener#receivedConfigData(FcpConnection, ConfigData)
463          * @param configData
464          *            The “ConfigData” message
465          */
466         private void fireReceivedConfigData(ConfigData configData) {
467                 for (FcpListener fcpListener: fcpListeners) {
468                         fcpListener.receivedConfigData(this, configData);
469                 }
470         }
471
472         /**
473          * Notifies all listeners that a “GetFailed” message was received.
474          *
475          * @see FcpListener#receivedGetFailed(FcpConnection, GetFailed)
476          * @param getFailed
477          *            The “GetFailed” message
478          */
479         private void fireReceivedGetFailed(GetFailed getFailed) {
480                 for (FcpListener fcpListener: fcpListeners) {
481                         fcpListener.receivedGetFailed(this, getFailed);
482                 }
483         }
484
485         /**
486          * Notifies all listeners that a “PutFailed” message was received.
487          *
488          * @see FcpListener#receivedPutFailed(FcpConnection, PutFailed)
489          * @param putFailed
490          *            The “PutFailed” message
491          */
492         private void fireReceivedPutFailed(PutFailed putFailed) {
493                 for (FcpListener fcpListener: fcpListeners) {
494                         fcpListener.receivedPutFailed(this, putFailed);
495                 }
496         }
497
498         /**
499          * Notifies all listeners that an “IdentifierCollision” message was
500          * received.
501          *
502          * @see FcpListener#receivedIdentifierCollision(FcpConnection,
503          *      IdentifierCollision)
504          * @param identifierCollision
505          *            The “IdentifierCollision” message
506          */
507         private void fireReceivedIdentifierCollision(IdentifierCollision identifierCollision) {
508                 for (FcpListener fcpListener: fcpListeners) {
509                         fcpListener.receivedIdentifierCollision(this, identifierCollision);
510                 }
511         }
512
513         /**
514          * Notifies all listeners that an “PersistentPutDir” message was received.
515          *
516          * @see FcpListener#receivedPersistentPutDir(FcpConnection,
517          *      PersistentPutDir)
518          * @param persistentPutDir
519          *            The “PersistentPutDir” message
520          */
521         private void fireReceivedPersistentPutDir(PersistentPutDir persistentPutDir) {
522                 for (FcpListener fcpListener: fcpListeners) {
523                         fcpListener.receivedPersistentPutDir(this, persistentPutDir);
524                 }
525         }
526
527         /**
528          * Notifies all listeners that a “PersistentRequestRemoved” message was
529          * received.
530          *
531          * @see FcpListener#receivedPersistentRequestRemoved(FcpConnection,
532          *      PersistentRequestRemoved)
533          * @param persistentRequestRemoved
534          *            The “PersistentRequestRemoved” message
535          */
536         private void fireReceivedPersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) {
537                 for (FcpListener fcpListener: fcpListeners) {
538                         fcpListener.receivedPersistentRequestRemoved(this, persistentRequestRemoved);
539                 }
540         }
541
542         /**
543          * Notifies all listeners that a “SubscribedUSKUpdate” message was received.
544          *
545          * @see FcpListener#receivedSubscribedUSKUpdate(FcpConnection,
546          *      SubscribedUSKUpdate)
547          * @param subscribedUSKUpdate
548          *            The “SubscribedUSKUpdate” message
549          */
550         private void fireReceivedSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) {
551                 for (FcpListener fcpListener: fcpListeners) {
552                         fcpListener.receivedSubscribedUSKUpdate(this, subscribedUSKUpdate);
553                 }
554         }
555
556         /**
557          * Notifies all listeners that a “PluginInfo” message was received.
558          *
559          * @see FcpListener#receivedPluginInfo(FcpConnection, PluginInfo)
560          * @param pluginInfo
561          *            The “PluginInfo” message
562          */
563         private void fireReceivedPluginInfo(PluginInfo pluginInfo) {
564                 for (FcpListener fcpListener: fcpListeners) {
565                         fcpListener.receivedPluginInfo(this, pluginInfo);
566                 }
567         }
568
569         /**
570          * Notifies all listeners that an “FCPPluginReply” message was received.
571          *
572          * @see FcpListener#receivedFCPPluginReply(FcpConnection, FCPPluginReply)
573          * @param fcpPluginReply
574          *            The “FCPPluginReply” message
575          */
576         private void fireReceivedFCPPluginReply(FCPPluginReply fcpPluginReply) {
577                 for (FcpListener fcpListener: fcpListeners) {
578                         fcpListener.receivedFCPPluginReply(this, fcpPluginReply);
579                 }
580         }
581
582         /**
583          * Notifies all listeners that a “PersistentRequestModified” message was
584          * received.
585          *
586          * @see FcpListener#receivedPersistentRequestModified(FcpConnection,
587          *      PersistentRequestModified)
588          * @param persistentRequestModified
589          *            The “PersistentRequestModified” message
590          */
591         private void fireReceivedPersistentRequestModified(PersistentRequestModified persistentRequestModified) {
592                 for (FcpListener fcpListener: fcpListeners) {
593                         fcpListener.receivedPersistentRequestModified(this, persistentRequestModified);
594                 }
595         }
596
597         /**
598          * Notifies all listeners that a “PutSuccessful” message was received.
599          *
600          * @see FcpListener#receivedPutSuccessful(FcpConnection, PutSuccessful)
601          * @param putSuccessful
602          *            The “PutSuccessful” message
603          */
604         private void fireReceivedPutSuccessful(PutSuccessful putSuccessful) {
605                 for (FcpListener fcpListener: fcpListeners) {
606                         fcpListener.receivedPutSuccessful(this, putSuccessful);
607                 }
608         }
609
610         /**
611          * Notifies all listeners that a “PutFetchable” message was received.
612          *
613          * @see FcpListener#receivedPutFetchable(FcpConnection, PutFetchable)
614          * @param putFetchable
615          *            The “PutFetchable” message
616          */
617         private void fireReceivedPutFetchable(PutFetchable putFetchable) {
618                 for (FcpListener fcpListener: fcpListeners) {
619                         fcpListener.receivedPutFetchable(this, putFetchable);
620                 }
621         }
622
623         /**
624          * Notifies all listeners that a “ProtocolError” message was received.
625          *
626          * @see FcpListener#receivedProtocolError(FcpConnection, ProtocolError)
627          * @param protocolError
628          *            The “ProtocolError” message
629          */
630         private void fireReceivedProtocolError(ProtocolError protocolError) {
631                 for (FcpListener fcpListener: fcpListeners) {
632                         fcpListener.receivedProtocolError(this, protocolError);
633                 }
634         }
635
636         /**
637          * Notifies all registered listeners that a message has been received.
638          *
639          * @see FcpListener#receivedMessage(FcpConnection, FcpMessage)
640          * @param fcpMessage
641          *            The message that was received
642          */
643         private void fireMessageReceived(FcpMessage fcpMessage) {
644                 for (FcpListener fcpListener: fcpListeners) {
645                         fcpListener.receivedMessage(this, fcpMessage);
646                 }
647         }
648
649         /**
650          * Notifies all listeners that the connection to the node was closed.
651          *
652          * @param throwable
653          *            The exception that caused the disconnect, or <code>null</code>
654          *            if there was no exception
655          * @see FcpListener#connectionClosed(FcpConnection, Throwable)
656          */
657         private void fireConnectionClosed(Throwable throwable) {
658                 for (FcpListener fcpListener: fcpListeners) {
659                         fcpListener.connectionClosed(this, throwable);
660                 }
661         }
662
663         //
664         // ACTIONS
665         //
666
667         /**
668          * Connects to the node.
669          *
670          * @throws IOException
671          *             if an I/O error occurs
672          * @throws IllegalStateException
673          *             if there is already a connection to the node
674          */
675         public synchronized void connect() throws IOException, IllegalStateException {
676                 if (connectionHandler != null) {
677                         throw new IllegalStateException("already connected, disconnect first");
678                 }
679                 logger.info("connecting to " + address + ":" + port + "…");
680                 remoteSocket = new Socket(address, port);
681                 remoteInputStream = remoteSocket.getInputStream();
682                 remoteOutputStream = remoteSocket.getOutputStream();
683                 new Thread(connectionHandler = new FcpConnectionHandler(this, remoteInputStream)).start();
684         }
685
686         /**
687          * Disconnects from the node. If there is no connection to the node, this
688          * method does nothing.
689          *
690          * @deprecated Use {@link #close()} instead
691          */
692         @Deprecated
693         public synchronized void disconnect() {
694                 close();
695         }
696
697         /**
698          * Closes the connection. If there is no connection to the node, this method
699          * does nothing.
700          */
701         public void close() {
702                 handleDisconnect(null);
703         }
704
705         /**
706          * Sends the given FCP message.
707          *
708          * @param fcpMessage
709          *            The FCP message to send
710          * @throws IOException
711          *             if an I/O error occurs
712          */
713         public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException {
714                 logger.fine("sending message: " + fcpMessage.getName());
715                 fcpMessage.write(remoteOutputStream);
716         }
717
718         //
719         // PACKAGE-PRIVATE METHODS
720         //
721
722         /**
723          * Handles the given message, notifying listeners. This message should only
724          * be called by {@link FcpConnectionHandler}.
725          *
726          * @param fcpMessage
727          *            The received message
728          */
729         void handleMessage(FcpMessage fcpMessage) {
730                 logger.fine("received message: " + fcpMessage.getName());
731                 String messageName = fcpMessage.getName();
732                 countMessage(messageName);
733                 if ("SimpleProgress".equals(messageName)) {
734                         fireReceivedSimpleProgress(new SimpleProgress(fcpMessage));
735                 } else if ("ProtocolError".equals(messageName)) {
736                         fireReceivedProtocolError(new ProtocolError(fcpMessage));
737                 } else if ("PersistentGet".equals(messageName)) {
738                         fireReceivedPersistentGet(new PersistentGet(fcpMessage));
739                 } else if ("PersistentPut".equals(messageName)) {
740                         fireReceivedPersistentPut(new PersistentPut(fcpMessage));
741                 } else if ("PersistentPutDir".equals(messageName)) {
742                         fireReceivedPersistentPutDir(new PersistentPutDir(fcpMessage));
743                 } else if ("URIGenerated".equals(messageName)) {
744                         fireReceivedURIGenerated(new URIGenerated(fcpMessage));
745                 } else if ("EndListPersistentRequests".equals(messageName)) {
746                         fireReceivedEndListPersistentRequests(new EndListPersistentRequests(fcpMessage));
747                 } else if ("Peer".equals(messageName)) {
748                         fireReceivedPeer(new Peer(fcpMessage));
749                 } else if ("PeerNote".equals(messageName)) {
750                         fireReceivedPeerNote(new PeerNote(fcpMessage));
751                 } else if ("StartedCompression".equals(messageName)) {
752                         fireReceivedStartedCompression(new StartedCompression(fcpMessage));
753                 } else if ("FinishedCompression".equals(messageName)) {
754                         fireReceivedFinishedCompression(new FinishedCompression(fcpMessage));
755                 } else if ("GetFailed".equals(messageName)) {
756                         fireReceivedGetFailed(new GetFailed(fcpMessage));
757                 } else if ("PutFetchable".equals(messageName)) {
758                         fireReceivedPutFetchable(new PutFetchable(fcpMessage));
759                 } else if ("PutSuccessful".equals(messageName)) {
760                         fireReceivedPutSuccessful(new PutSuccessful(fcpMessage));
761                 } else if ("PutFailed".equals(messageName)) {
762                         fireReceivedPutFailed(new PutFailed(fcpMessage));
763                 } else if ("DataFound".equals(messageName)) {
764                         fireReceivedDataFound(new DataFound(fcpMessage));
765                 } else if ("SubscribedUSKUpdate".equals(messageName)) {
766                         fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage));
767                 } else if ("IdentifierCollision".equals(messageName)) {
768                         fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage));
769                 } else if ("AllData".equals(messageName)) {
770                         LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
771                         fireReceivedAllData(new AllData(fcpMessage, payloadInputStream));
772                         try {
773                                 payloadInputStream.consume();
774                         } catch (IOException ioe1) {
775                                 /* well, ignore. when the connection handler fails, all fails. */
776                         }
777                 } else if ("EndListPeerNotes".equals(messageName)) {
778                         fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage));
779                 } else if ("EndListPeers".equals(messageName)) {
780                         fireReceivedEndListPeers(new EndListPeers(fcpMessage));
781                 } else if ("SSKKeypair".equals(messageName)) {
782                         fireReceivedSSKKeypair(new SSKKeypair(fcpMessage));
783                 } else if ("PeerRemoved".equals(messageName)) {
784                         fireReceivedPeerRemoved(new PeerRemoved(fcpMessage));
785                 } else if ("PersistentRequestModified".equals(messageName)) {
786                         fireReceivedPersistentRequestModified(new PersistentRequestModified(fcpMessage));
787                 } else if ("PersistentRequestRemoved".equals(messageName)) {
788                         fireReceivedPersistentRequestRemoved(new PersistentRequestRemoved(fcpMessage));
789                 } else if ("UnknownPeerNoteType".equals(messageName)) {
790                         fireReceivedUnknownPeerNoteType(new UnknownPeerNoteType(fcpMessage));
791                 } else if ("UnknownNodeIdentifier".equals(messageName)) {
792                         fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage));
793                 } else if ("FCPPluginReply".equals(messageName)) {
794                         LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
795                         fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream));
796                         try {
797                                 payloadInputStream.consume();
798                         } catch (IOException ioe1) {
799                                 /* ignore. */
800                         }
801                 } else if ("PluginInfo".equals(messageName)) {
802                         fireReceivedPluginInfo(new PluginInfo(fcpMessage));
803                 } else if ("NodeData".equals(messageName)) {
804                         fireReceivedNodeData(new NodeData(fcpMessage));
805                 } else if ("TestDDAReply".equals(messageName)) {
806                         fireReceivedTestDDAReply(new TestDDAReply(fcpMessage));
807                 } else if ("TestDDAComplete".equals(messageName)) {
808                         fireReceivedTestDDAComplete(new TestDDAComplete(fcpMessage));
809                 } else if ("ConfigData".equals(messageName)) {
810                         fireReceivedConfigData(new ConfigData(fcpMessage));
811                 } else if ("NodeHello".equals(messageName)) {
812                         fireReceivedNodeHello(new NodeHello(fcpMessage));
813                 } else if ("CloseConnectionDuplicateClientName".equals(messageName)) {
814                         fireReceivedCloseConnectionDuplicateClientName(new CloseConnectionDuplicateClientName(fcpMessage));
815                 } else {
816                         fireMessageReceived(fcpMessage);
817                 }
818         }
819
820         /**
821          * Handles a disconnect from the node.
822          *
823          * @param throwable
824          *            The exception that caused the disconnect, or <code>null</code>
825          *            if there was no exception
826          */
827         synchronized void handleDisconnect(Throwable throwable) {
828                 FcpUtils.close(remoteInputStream);
829                 FcpUtils.close(remoteOutputStream);
830                 FcpUtils.close(remoteSocket);
831                 if (connectionHandler != null) {
832                         connectionHandler.stop();
833                         connectionHandler = null;
834                         fireConnectionClosed(throwable);
835                 }
836         }
837
838         //
839         // PRIVATE METHODS
840         //
841
842         /**
843          * Incremets the counter in {@link #incomingMessageStatistics} by <cod>1</code>
844          * for the given message name.
845          *
846          * @param name
847          *            The name of the message to count
848          */
849         private void countMessage(String name) {
850                 int oldValue = 0;
851                 if (incomingMessageStatistics.containsKey(name)) {
852                         oldValue = incomingMessageStatistics.get(name);
853                 }
854                 incomingMessageStatistics.put(name, oldValue + 1);
855                 logger.finest("count for " + name + ": " + (oldValue + 1));
856         }
857
858         /**
859          * Returns a limited input stream from the node’s input stream.
860          *
861          * @param dataLength
862          *            The length of the stream
863          * @return The limited input stream
864          */
865         private LimitedInputStream getInputStream(long dataLength) {
866                 if (dataLength <= 0) {
867                         return new LimitedInputStream(null, 0);
868                 }
869                 return new LimitedInputStream(remoteInputStream, dataLength);
870         }
871
872         /**
873          * A wrapper around an {@link InputStream} that only supplies a limit number
874          * of bytes from the underlying input stream.
875          *
876          * @author <a href="mailto:dr@ina-germany.de">David Roden</a>
877          * @version $Id$
878          */
879         private static class LimitedInputStream extends FilterInputStream {
880
881                 /** The remaining number of bytes that can be read. */
882                 private long remaining;
883
884                 /**
885                  * Creates a new LimitedInputStream that supplies at most
886                  * <code>length</code> bytes from the given input stream.
887                  *
888                  * @param inputStream
889                  *            The input stream
890                  * @param length
891                  *            The number of bytes to read
892                  */
893                 public LimitedInputStream(InputStream inputStream, long length) {
894                         super(inputStream);
895                         remaining = length;
896                 }
897
898                 /**
899                  * @see java.io.FilterInputStream#available()
900                  */
901                 @Override
902                 public synchronized int available() throws IOException {
903                         if (remaining == 0) {
904                                 return 0;
905                         }
906                         return (int) Math.min(super.available(), Math.min(Integer.MAX_VALUE, remaining));
907                 }
908
909                 /**
910                  * @see java.io.FilterInputStream#read()
911                  */
912                 @Override
913                 public synchronized int read() throws IOException {
914                         int read = -1;
915                         if (remaining > 0) {
916                                 read = super.read();
917                                 remaining--;
918                         }
919                         return read;
920                 }
921
922                 /**
923                  * @see java.io.FilterInputStream#read(byte[], int, int)
924                  */
925                 @Override
926                 public synchronized int read(byte[] b, int off, int len) throws IOException {
927                         if (remaining == 0) {
928                                 return -1;
929                         }
930                         int toCopy = (int) Math.min(len, Math.min(remaining, Integer.MAX_VALUE));
931                         int read = super.read(b, off, toCopy);
932                         remaining -= read;
933                         return read;
934                 }
935
936                 /**
937                  * @see java.io.FilterInputStream#skip(long)
938                  */
939                 @Override
940                 public synchronized long skip(long n) throws IOException {
941                         if ((n < 0) || (remaining == 0)) {
942                                 return 0;
943                         }
944                         long skipped = super.skip(Math.min(n, remaining));
945                         remaining -= skipped;
946                         return skipped;
947                 }
948
949                 /**
950                  * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
951                  * {@link #reset()} are not supported.
952                  *
953                  * @see java.io.FilterInputStream#mark(int)
954                  */
955                 @Override
956                 public void mark(int readlimit) {
957                         /* do nothing. */
958                 }
959
960                 /**
961                  * {@inheritDoc}
962                  *
963                  * @see java.io.FilterInputStream#markSupported()
964                  * @return <code>false</code>
965                  */
966                 @Override
967                 public boolean markSupported() {
968                         return false;
969                 }
970
971                 /**
972                  * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
973                  * {@link #reset()} are not supported.
974                  *
975                  * @see java.io.FilterInputStream#reset()
976                  */
977                 @Override
978                 public void reset() throws IOException {
979                         /* do nothing. */
980                 }
981
982                 /**
983                  * Consumes the input stream, i.e. read all bytes until the limit is
984                  * reached.
985                  *
986                  * @throws IOException
987                  *             if an I/O error occurs
988                  */
989                 public void consume() throws IOException {
990                         while (remaining > 0) {
991                                 skip(remaining);
992                         }
993                 }
994
995         }
996
997 }