748bb8f18bff478f2f58b1a6dbd977ba4045b587
[jSite2.git] / src / net / pterodactylus / util / fcp / FcpConnection.java
1 /*
2  * jSite2 - FpcConnection.java -
3  * Copyright © 2008 David Roden
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18  */
19
20 package net.pterodactylus.util.fcp;
21
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25 import java.net.InetAddress;
26 import java.net.Socket;
27 import java.net.UnknownHostException;
28 import java.util.ArrayList;
29 import java.util.Collections;
30 import java.util.HashMap;
31 import java.util.List;
32 import java.util.Map;
33
34 import net.pterodactylus.util.io.Closer;
35 import net.pterodactylus.util.io.LimitedInputStream;
36
37 /**
38  * An FCP connection to a Freenet node.
39  * 
40  * @author David ‘Bombe’ Roden <bombe@freenetproject.org>
41  * @version $Id$
42  */
43 public class FcpConnection {
44
45         /** The default port for FCP v2. */
46         public static final int DEFAULT_PORT = 9481;
47
48         /** The list of FCP listeners. */
49         private final List<FcpListener> fcpListeners = new ArrayList<FcpListener>();
50
51         /** The address of the node. */
52         private final InetAddress address;
53
54         /** The port number of the node’s FCP port. */
55         private final int port;
56
57         /** The remote socket. */
58         private Socket remoteSocket;
59
60         /** The input stream from the node. */
61         private InputStream remoteInputStream;
62
63         /** The output stream to the node. */
64         private OutputStream remoteOutputStream;
65
66         /** The connection handler. */
67         private FcpConnectionHandler connectionHandler;
68
69         /** Incoming message statistics. */
70         private Map<String, Integer> incomingMessageStatistics = Collections.synchronizedMap(new HashMap<String, Integer>());
71
72         /**
73          * Creates a new FCP connection to the freenet node running on localhost,
74          * using the default port.
75          * 
76          * @throws UnknownHostException
77          *             if the hostname can not be resolved
78          */
79         public FcpConnection() throws UnknownHostException {
80                 this(InetAddress.getLocalHost());
81         }
82
83         /**
84          * Creates a new FCP connection to the Freenet node running on the given
85          * host, listening on the default port.
86          * 
87          * @param host
88          *            The hostname of the Freenet node
89          * @throws UnknownHostException
90          *             if <code>host</code> can not be resolved
91          */
92         public FcpConnection(String host) throws UnknownHostException {
93                 this(host, DEFAULT_PORT);
94         }
95
96         /**
97          * Creates a new FCP connection to the Freenet node running on the given
98          * host, listening on the given port.
99          * 
100          * @param host
101          *            The hostname of the Freenet node
102          * @param port
103          *            The port number of the node’s FCP port
104          * @throws UnknownHostException
105          *             if <code>host</code> can not be resolved
106          */
107         public FcpConnection(String host, int port) throws UnknownHostException {
108                 this(InetAddress.getByName(host), port);
109         }
110
111         /**
112          * Creates a new FCP connection to the Freenet node running at the given
113          * address, listening on the default port.
114          * 
115          * @param address
116          *            The address of the Freenet node
117          */
118         public FcpConnection(InetAddress address) {
119                 this(address, DEFAULT_PORT);
120         }
121
122         /**
123          * Creates a new FCP connection to the Freenet node running at the given
124          * address, listening on the given port.
125          * 
126          * @param address
127          *            The address of the Freenet node
128          * @param port
129          *            The port number of the node’s FCP port
130          */
131         public FcpConnection(InetAddress address, int port) {
132                 this.address = address;
133                 this.port = port;
134         }
135
136         //
137         // LISTENER MANAGEMENT
138         //
139
140         /**
141          * Adds the given listener to the list of listeners.
142          * 
143          * @param fcpListener
144          *            The listener to add
145          */
146         public void addFcpListener(FcpListener fcpListener) {
147                 fcpListeners.add(fcpListener);
148         }
149
150         /**
151          * Removes the given listener from the list of listeners.
152          * 
153          * @param fcpListener
154          *            The listener to remove
155          */
156         public void removeFcpListener(FcpListener fcpListener) {
157                 fcpListeners.remove(fcpListener);
158         }
159
160         /**
161          * Notifies listeners that a “NodeHello” message was received.
162          * 
163          * @see FcpListener#receivedNodeHello(FcpConnection, NodeHello)
164          * @param nodeHello
165          *            The “NodeHello” message
166          */
167         private void fireReceivedNodeHello(NodeHello nodeHello) {
168                 for (FcpListener fcpListener: fcpListeners) {
169                         fcpListener.receivedNodeHello(this, nodeHello);
170                 }
171         }
172
173         /**
174          * Notifies listeners that a “CloseConnectionDuplicateClientName” message
175          * was received.
176          * 
177          * @see FcpListener#receivedCloseConnectionDuplicateClientName(FcpConnection,
178          *      CloseConnectionDuplicateClientName)
179          * @param closeConnectionDuplicateClientName
180          *            The “CloseConnectionDuplicateClientName” message
181          */
182         private void fireReceivedCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
183                 for (FcpListener fcpListener: fcpListeners) {
184                         fcpListener.receivedCloseConnectionDuplicateClientName(this, closeConnectionDuplicateClientName);
185                 }
186         }
187
188         /**
189          * Notifies listeners that a “SSKKeypair” message was received.
190          * 
191          * @see FcpListener#receivedSSKKeypair(FcpConnection, SSKKeypair)
192          * @param sskKeypair
193          *            The “SSKKeypair” message
194          */
195         private void fireReceivedSSKKeypair(SSKKeypair sskKeypair) {
196                 for (FcpListener fcpListener: fcpListeners) {
197                         fcpListener.receivedSSKKeypair(this, sskKeypair);
198                 }
199         }
200
201         /**
202          * Notifies listeners that a “Peer” message was received.
203          * 
204          * @see FcpListener#receivedPeer(FcpConnection, Peer)
205          * @param peer
206          *            The “Peer” message
207          */
208         private void fireReceivedPeer(Peer peer) {
209                 for (FcpListener fcpListener: fcpListeners) {
210                         fcpListener.receivedPeer(this, peer);
211                 }
212         }
213
214         /**
215          * Notifies all listeners that an “EndListPeers” message was received.
216          * 
217          * @see FcpListener#receivedEndListPeers(FcpConnection, EndListPeers)
218          * @param endListPeers
219          *            The “EndListPeers” message
220          */
221         private void fireReceivedEndListPeers(EndListPeers endListPeers) {
222                 for (FcpListener fcpListener: fcpListeners) {
223                         fcpListener.receivedEndListPeers(this, endListPeers);
224                 }
225         }
226
227         /**
228          * Notifies all listeners that a “PeerNote” message was received.
229          * 
230          * @see FcpListener#receivedPeerNote(FcpConnection, PeerNote)
231          * @param peerNote
232          */
233         private void fireReceivedPeerNote(PeerNote peerNote) {
234                 for (FcpListener fcpListener: fcpListeners) {
235                         fcpListener.receivedPeerNote(this, peerNote);
236                 }
237         }
238
239         /**
240          * Notifies all listeners that an “EndListPeerNotes” message was received.
241          * 
242          * @see FcpListener#receivedEndListPeerNotes(FcpConnection,
243          *      EndListPeerNotes)
244          * @param endListPeerNotes
245          *            The “EndListPeerNotes” message
246          */
247         private void fireReceivedEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
248                 for (FcpListener fcpListener: fcpListeners) {
249                         fcpListener.receivedEndListPeerNotes(this, endListPeerNotes);
250                 }
251         }
252
253         /**
254          * Notifies all listeners that a “PeerRemoved” message was received.
255          * 
256          * @see FcpListener#receivedPeerRemoved(FcpConnection, PeerRemoved)
257          * @param peerRemoved
258          *            The “PeerRemoved” message
259          */
260         private void fireReceivedPeerRemoved(PeerRemoved peerRemoved) {
261                 for (FcpListener fcpListener: fcpListeners) {
262                         fcpListener.receivedPeerRemoved(this, peerRemoved);
263                 }
264         }
265
266         /**
267          * Notifies all listeners that a “NodeData” message was received.
268          * 
269          * @see FcpListener#receivedNodeData(FcpConnection, NodeData)
270          * @param nodeData
271          *            The “NodeData” message
272          */
273         private void fireReceivedNodeData(NodeData nodeData) {
274                 for (FcpListener fcpListener: fcpListeners) {
275                         fcpListener.receivedNodeData(this, nodeData);
276                 }
277         }
278
279         /**
280          * Notifies all listeners that a “TestDDAReply” message was received.
281          * 
282          * @see FcpListener#receivedTestDDAReply(FcpConnection, TestDDAReply)
283          * @param testDDAReply
284          *            The “TestDDAReply” message
285          */
286         private void fireReceivedTestDDAReply(TestDDAReply testDDAReply) {
287                 for (FcpListener fcpListener: fcpListeners) {
288                         fcpListener.receivedTestDDAReply(this, testDDAReply);
289                 }
290         }
291
292         /**
293          * Notifies all listeners that a “TestDDAComplete” message was received.
294          * 
295          * @see FcpListener#receivedTestDDAComplete(FcpConnection, TestDDAComplete)
296          * @param testDDAComplete
297          *            The “TestDDAComplete” message
298          */
299         private void fireReceivedTestDDAComplete(TestDDAComplete testDDAComplete) {
300                 for (FcpListener fcpListener: fcpListeners) {
301                         fcpListener.receivedTestDDAComplete(this, testDDAComplete);
302                 }
303         }
304
305         /**
306          * Notifies all listeners that a “PersistentGet” message was received.
307          * 
308          * @see FcpListener#receivedPersistentGet(FcpConnection, PersistentGet)
309          * @param persistentGet
310          *            The “PersistentGet” message
311          */
312         private void fireReceivedPersistentGet(PersistentGet persistentGet) {
313                 for (FcpListener fcpListener: fcpListeners) {
314                         fcpListener.receivedPersistentGet(this, persistentGet);
315                 }
316         }
317
318         /**
319          * Notifies all listeners that a “PersistentPut” message was received.
320          * 
321          * @see FcpListener#receivedPersistentPut(FcpConnection, PersistentPut)
322          * @param persistentPut
323          *            The “PersistentPut” message
324          */
325         private void fireReceivedPersistentPut(PersistentPut persistentPut) {
326                 for (FcpListener fcpListener: fcpListeners) {
327                         fcpListener.receivedPersistentPut(this, persistentPut);
328                 }
329         }
330
331         /**
332          * Notifies all listeners that a “EndListPersistentRequests” message was
333          * received.
334          * 
335          * @see FcpListener#receivedEndListPersistentRequests(FcpConnection,
336          *      EndListPersistentRequests)
337          * @param endListPersistentRequests
338          *            The “EndListPersistentRequests” message
339          */
340         private void fireReceivedEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
341                 for (FcpListener fcpListener: fcpListeners) {
342                         fcpListener.receivedEndListPersistentRequests(this, endListPersistentRequests);
343                 }
344         }
345
346         /**
347          * Notifies all listeners that a “URIGenerated” message was received.
348          * 
349          * @see FcpListener#receivedURIGenerated(FcpConnection, URIGenerated)
350          * @param uriGenerated
351          *            The “URIGenerated” message
352          */
353         private void fireReceivedURIGenerated(URIGenerated uriGenerated) {
354                 for (FcpListener fcpListener: fcpListeners) {
355                         fcpListener.receivedURIGenerated(this, uriGenerated);
356                 }
357         }
358
359         /**
360          * Notifies all listeners that a “DataFound” message was received.
361          * 
362          * @see FcpListener#receivedDataFound(FcpConnection, DataFound)
363          * @param dataFound
364          *            The “DataFound” message
365          */
366         private void fireReceivedDataFound(DataFound dataFound) {
367                 for (FcpListener fcpListener: fcpListeners) {
368                         fcpListener.receivedDataFound(this, dataFound);
369                 }
370         }
371
372         /**
373          * Notifies all listeners that an “AllData” message was received.
374          * 
375          * @see FcpListener#receivedAllData(FcpConnection, AllData)
376          * @param allData
377          *            The “AllData” message
378          */
379         private void fireReceivedAllData(AllData allData) {
380                 for (FcpListener fcpListener: fcpListeners) {
381                         fcpListener.receivedAllData(this, allData);
382                 }
383         }
384
385         /**
386          * Notifies all listeners that a “SimpleProgress” message was received.
387          * 
388          * @see FcpListener#receivedSimpleProgress(FcpConnection, SimpleProgress)
389          * @param simpleProgress
390          *            The “SimpleProgress” message
391          */
392         private void fireReceivedSimpleProgress(SimpleProgress simpleProgress) {
393                 for (FcpListener fcpListener: fcpListeners) {
394                         fcpListener.receivedSimpleProgress(this, simpleProgress);
395                 }
396         }
397
398         /**
399          * Notifies all listeners that a “StartedCompression” message was received.
400          * 
401          * @see FcpListener#receivedStartedCompression(FcpConnection,
402          *      StartedCompression)
403          * @param startedCompression
404          *            The “StartedCompression” message
405          */
406         private void fireReceivedStartedCompression(StartedCompression startedCompression) {
407                 for (FcpListener fcpListener: fcpListeners) {
408                         fcpListener.receivedStartedCompression(this, startedCompression);
409                 }
410         }
411
412         /**
413          * Notifies all listeners that a “FinishedCompression” message was received.
414          * 
415          * @see FcpListener#receviedFinishedCompression(FcpConnection,
416          *      FinishedCompression)
417          * @param finishedCompression
418          *            The “FinishedCompression” message
419          */
420         private void fireReceivedFinishedCompression(FinishedCompression finishedCompression) {
421                 for (FcpListener fcpListener: fcpListeners) {
422                         fcpListener.receviedFinishedCompression(this, finishedCompression);
423                 }
424         }
425
426         /**
427          * Notifies all listeners that an “UnknownPeerNoteType” message was
428          * received.
429          * 
430          * @see FcpListener#receivedUnknownPeerNoteType(FcpConnection,
431          *      UnknownPeerNoteType)
432          * @param unknownPeerNoteType
433          *            The “UnknownPeerNoteType” message
434          */
435         private void fireReceivedUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
436                 for (FcpListener fcpListener: fcpListeners) {
437                         fcpListener.receivedUnknownPeerNoteType(this, unknownPeerNoteType);
438                 }
439         }
440
441         /**
442          * Notifies all listeners that an “UnknownNodeIdentifier” message was
443          * received.
444          * 
445          * @see FcpListener#receivedUnknownNodeIdentifier(FcpConnection,
446          *      UnknownNodeIdentifier)
447          * @param unknownNodeIdentifier
448          *            The “UnknownNodeIdentifier” message
449          */
450         private void fireReceivedUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
451                 for (FcpListener fcpListener: fcpListeners) {
452                         fcpListener.receivedUnknownNodeIdentifier(this, unknownNodeIdentifier);
453                 }
454         }
455
456         /**
457          * Notifies all listeners that a “ConfigData” message was received.
458          * 
459          * @see FcpListener#receivedConfigData(FcpConnection, ConfigData)
460          * @param configData
461          *            The “ConfigData” message
462          */
463         private void fireReceivedConfigData(ConfigData configData) {
464                 for (FcpListener fcpListener: fcpListeners) {
465                         fcpListener.receivedConfigData(this, configData);
466                 }
467         }
468
469         /**
470          * Notifies all listeners that a “GetFailed” message was received.
471          * 
472          * @see FcpListener#receivedGetFailed(FcpConnection, GetFailed)
473          * @param getFailed
474          *            The “GetFailed” message
475          */
476         private void fireReceivedGetFailed(GetFailed getFailed) {
477                 for (FcpListener fcpListener: fcpListeners) {
478                         fcpListener.receivedGetFailed(this, getFailed);
479                 }
480         }
481
482         /**
483          * Notifies all listeners that a “PutFailed” message was received.
484          * 
485          * @see FcpListener#receivedPutFailed(FcpConnection, PutFailed)
486          * @param putFailed
487          *            The “PutFailed” message
488          */
489         private void fireReceivedPutFailed(PutFailed putFailed) {
490                 for (FcpListener fcpListener: fcpListeners) {
491                         fcpListener.receivedPutFailed(this, putFailed);
492                 }
493         }
494
495         /**
496          * Notifies all listeners that an “IdentifierCollision” message was
497          * received.
498          * 
499          * @see FcpListener#receivedIdentifierCollision(FcpConnection,
500          *      IdentifierCollision)
501          * @param identifierCollision
502          *            The “IdentifierCollision” message
503          */
504         private void fireReceivedIdentifierCollision(IdentifierCollision identifierCollision) {
505                 for (FcpListener fcpListener: fcpListeners) {
506                         fcpListener.receivedIdentifierCollision(this, identifierCollision);
507                 }
508         }
509
510         /**
511          * Notifies all listeners that an “PersistentPutDir” message was received.
512          * 
513          * @see FcpListener#receivedPersistentPutDir(FcpConnection,
514          *      PersistentPutDir)
515          * @param persistentPutDir
516          *            The “PersistentPutDir” message
517          */
518         private void fireReceivedPersistentPutDir(PersistentPutDir persistentPutDir) {
519                 for (FcpListener fcpListener: fcpListeners) {
520                         fcpListener.receivedPersistentPutDir(this, persistentPutDir);
521                 }
522         }
523
524         /**
525          * Notifies all listeners that a “PersistentRequestRemoved” message was
526          * received.
527          * 
528          * @see FcpListener#receivedPersistentRequestRemoved(FcpConnection,
529          *      PersistentRequestRemoved)
530          * @param persistentRequestRemoved
531          *            The “PersistentRequestRemoved” message
532          */
533         private void fireReceivedPersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) {
534                 for (FcpListener fcpListener: fcpListeners) {
535                         fcpListener.receivedPersistentRequestRemoved(this, persistentRequestRemoved);
536                 }
537         }
538
539         /**
540          * Notifies all listeners that a “SubscribedUSKUpdate” message was received.
541          * 
542          * @see FcpListener#receivedSubscribedUSKUpdate(FcpConnection,
543          *      SubscribedUSKUpdate)
544          * @param subscribedUSKUpdate
545          *            The “SubscribedUSKUpdate” message
546          */
547         private void fireReceivedSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) {
548                 for (FcpListener fcpListener: fcpListeners) {
549                         fcpListener.receivedSubscribedUSKUpdate(this, subscribedUSKUpdate);
550                 }
551         }
552
553         /**
554          * Notifies all listeners that a “PluginInfo” message was received.
555          * 
556          * @see FcpListener#receivedPluginInfo(FcpConnection, PluginInfo)
557          * @param pluginInfo
558          *            The “PluginInfo” message
559          */
560         private void fireReceivedPluginInfo(PluginInfo pluginInfo) {
561                 for (FcpListener fcpListener: fcpListeners) {
562                         fcpListener.receivedPluginInfo(this, pluginInfo);
563                 }
564         }
565
566         /**
567          * Notifies all listeners that an “FCPPluginReply” message was received.
568          * 
569          * @see FcpListener#receivedFCPPluginReply(FcpConnection, FCPPluginReply)
570          * @param fcpPluginReply
571          *            The “FCPPluginReply” message
572          */
573         private void fireReceivedFCPPluginReply(FCPPluginReply fcpPluginReply) {
574                 for (FcpListener fcpListener: fcpListeners) {
575                         fcpListener.receivedFCPPluginReply(this, fcpPluginReply);
576                 }
577         }
578
579         /**
580          * Notifies all listeners that a “PersistentRequestModified” message was
581          * received.
582          * 
583          * @see FcpListener#receivedPersistentRequestModified(FcpConnection,
584          *      PersistentRequestModified)
585          * @param persistentRequestModified
586          *            The “PersistentRequestModified” message
587          */
588         private void fireReceivedPersistentRequestModified(PersistentRequestModified persistentRequestModified) {
589                 for (FcpListener fcpListener: fcpListeners) {
590                         fcpListener.receivedPersistentRequestModified(this, persistentRequestModified);
591                 }
592         }
593
594         /**
595          * Notifies all listeners that a “PutSuccessful” message was received.
596          * 
597          * @see FcpListener#receivedPutSuccessful(FcpConnection, PutSuccessful)
598          * @param putSuccessful
599          *            The “PutSuccessful” message
600          */
601         private void fireReceivedPutSuccessful(PutSuccessful putSuccessful) {
602                 for (FcpListener fcpListener: fcpListeners) {
603                         fcpListener.receivedPutSuccessful(this, putSuccessful);
604                 }
605         }
606
607         /**
608          * Notifies all listeners that a “PutFetchable” message was received.
609          * 
610          * @see FcpListener#receivedPutFetchable(FcpConnection, PutFetchable)
611          * @param putFetchable
612          *            The “PutFetchable” message
613          */
614         private void fireReceivedPutFetchable(PutFetchable putFetchable) {
615                 for (FcpListener fcpListener: fcpListeners) {
616                         fcpListener.receivedPutFetchable(this, putFetchable);
617                 }
618         }
619
620         /**
621          * Notifies all listeners that a “ProtocolError” message was received.
622          * 
623          * @see FcpListener#receivedProtocolError(FcpConnection, ProtocolError)
624          * @param protocolError
625          *            The “ProtocolError” message
626          */
627         private void fireReceivedProtocolError(ProtocolError protocolError) {
628                 for (FcpListener fcpListener: fcpListeners) {
629                         fcpListener.receivedProtocolError(this, protocolError);
630                 }
631         }
632
633         /**
634          * Notifies all registered listeners that a message has been received.
635          * 
636          * @see FcpListener#receivedMessage(FcpConnection, FcpMessage)
637          * @param fcpMessage
638          *            The message that was received
639          */
640         private void fireMessageReceived(FcpMessage fcpMessage) {
641                 for (FcpListener fcpListener: fcpListeners) {
642                         fcpListener.receivedMessage(this, fcpMessage);
643                 }
644         }
645
646         /**
647          * Notifies all listeners that the connection to the node was closed.
648          * 
649          * @see FcpListener#connectionClosed(FcpConnection)
650          */
651         private void fireConnectionClosed() {
652                 for (FcpListener fcpListener: fcpListeners) {
653                         fcpListener.connectionClosed(this);
654                 }
655         }
656
657         //
658         // ACTIONS
659         //
660
661         /**
662          * Connects to the node.
663          * 
664          * @throws IOException
665          *             if an I/O error occurs
666          * @throws IllegalStateException
667          *             if there is already a connection to the node
668          */
669         public synchronized void connect() throws IOException, IllegalStateException {
670                 if (connectionHandler != null) {
671                         throw new IllegalStateException("already connected, disconnect first");
672                 }
673                 remoteSocket = new Socket(address, port);
674                 remoteInputStream = remoteSocket.getInputStream();
675                 remoteOutputStream = remoteSocket.getOutputStream();
676                 new Thread(connectionHandler = new FcpConnectionHandler(this, remoteInputStream)).start();
677         }
678
679         /**
680          * Disconnects from the node. If there is no connection to the node, this
681          * method does nothing.
682          */
683         public synchronized void disconnect() {
684                 if (connectionHandler == null) {
685                         return;
686                 }
687                 Closer.close(remoteSocket);
688                 connectionHandler.stop();
689                 connectionHandler = null;
690         }
691
692         /**
693          * Sends the given FCP message.
694          * 
695          * @param fcpMessage
696          *            The FCP message to send
697          * @throws IOException
698          *             if an I/O error occurs
699          */
700         public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException {
701                 System.out.println("sending message: " + fcpMessage.getName());
702                 fcpMessage.write(remoteOutputStream);
703         }
704
705         //
706         // PACKAGE-PRIVATE METHODS
707         //
708
709         /**
710          * Handles the given message, notifying listeners. This message should only
711          * be called by {@link FcpConnectionHandler}.
712          * 
713          * @param fcpMessage
714          *            The received message
715          */
716         void handleMessage(FcpMessage fcpMessage) {
717                 String messageName = fcpMessage.getName();
718                 countMessage(messageName);
719                 if ("SimpleProgress".equals(messageName)) {
720                         fireReceivedSimpleProgress(new SimpleProgress(fcpMessage));
721                 } else if ("ProtocolError".equals(messageName)) {
722                         fireReceivedProtocolError(new ProtocolError(fcpMessage));
723                 } else if ("PersistentGet".equals(messageName)) {
724                         fireReceivedPersistentGet(new PersistentGet(fcpMessage));
725                 } else if ("PersistentPut".equals(messageName)) {
726                         fireReceivedPersistentPut(new PersistentPut(fcpMessage));
727                 } else if ("PersistentPutDir".equals(messageName)) {
728                         fireReceivedPersistentPutDir(new PersistentPutDir(fcpMessage));
729                 } else if ("URIGenerated".equals(messageName)) {
730                         fireReceivedURIGenerated(new URIGenerated(fcpMessage));
731                 } else if ("EndListPersistentRequests".equals(messageName)) {
732                         fireReceivedEndListPersistentRequests(new EndListPersistentRequests(fcpMessage));
733                 } else if ("Peer".equals(messageName)) {
734                         fireReceivedPeer(new Peer(fcpMessage));
735                 } else if ("PeerNote".equals(messageName)) {
736                         fireReceivedPeerNote(new PeerNote(fcpMessage));
737                 } else if ("StartedCompression".equals(messageName)) {
738                         fireReceivedStartedCompression(new StartedCompression(fcpMessage));
739                 } else if ("FinishedCompression".equals(messageName)) {
740                         fireReceivedFinishedCompression(new FinishedCompression(fcpMessage));
741                 } else if ("GetFailed".equals(messageName)) {
742                         fireReceivedGetFailed(new GetFailed(fcpMessage));
743                 } else if ("PutFetchable".equals(messageName)) {
744                         fireReceivedPutFetchable(new PutFetchable(fcpMessage));
745                 } else if ("PutSuccessful".equals(messageName)) {
746                         fireReceivedPutSuccessful(new PutSuccessful(fcpMessage));
747                 } else if ("PutFailed".equals(messageName)) {
748                         fireReceivedPutFailed(new PutFailed(fcpMessage));
749                 } else if ("DataFound".equals(messageName)) {
750                         fireReceivedDataFound(new DataFound(fcpMessage));
751                 } else if ("SubscribedUSKUpdate".equals(messageName)) {
752                         fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage));
753                 } else if ("IdentifierCollision".equals(messageName)) {
754                         fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage));
755                 } else if ("AllData".equals(messageName)) {
756                         LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
757                         fireReceivedAllData(new AllData(fcpMessage, payloadInputStream));
758                         try {
759                                 payloadInputStream.consume();
760                         } catch (IOException ioe1) {
761                                 /* well, ignore. when the connection handler fails, all fails. */
762                         }
763                 } else if ("EndListPeerNotes".equals(messageName)) {
764                         fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage));
765                 } else if ("EndListPeers".equals(messageName)) {
766                         fireReceivedEndListPeers(new EndListPeers(fcpMessage));
767                 } else if ("SSKKeypair".equals(messageName)) {
768                         fireReceivedSSKKeypair(new SSKKeypair(fcpMessage));
769                 } else if ("PeerRemoved".equals(messageName)) {
770                         fireReceivedPeerRemoved(new PeerRemoved(fcpMessage));
771                 } else if ("PersistentRequestModified".equals(messageName)) {
772                         fireReceivedPersistentRequestModified(new PersistentRequestModified(fcpMessage));
773                 } else if ("PersistentRequestRemoved".equals(messageName)) {
774                         fireReceivedPersistentRequestRemoved(new PersistentRequestRemoved(fcpMessage));
775                 } else if ("UnknownPeerNoteType".equals(messageName)) {
776                         fireReceivedUnknownPeerNoteType(new UnknownPeerNoteType(fcpMessage));
777                 } else if ("UnknownNodeIdentifier".equals(messageName)) {
778                         fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage));
779                 } else if ("FCPPluginReply".equals(messageName)) {
780                         LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
781                         fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream));
782                         try {
783                                 payloadInputStream.consume();
784                         } catch (IOException ioe1) {
785                                 /* ignore. */
786                         }
787                 } else if ("PluginInfo".equals(messageName)) {
788                         fireReceivedPluginInfo(new PluginInfo(fcpMessage));
789                 } else if ("NodeData".equals(messageName)) {
790                         fireReceivedNodeData(new NodeData(fcpMessage));
791                 } else if ("TestDDAReply".equals(messageName)) {
792                         fireReceivedTestDDAReply(new TestDDAReply(fcpMessage));
793                 } else if ("TestDDAComplete".equals(messageName)) {
794                         fireReceivedTestDDAComplete(new TestDDAComplete(fcpMessage));
795                 } else if ("ConfigData".equals(messageName)) {
796                         fireReceivedConfigData(new ConfigData(fcpMessage));
797                 } else if ("NodeHello".equals(messageName)) {
798                         fireReceivedNodeHello(new NodeHello(fcpMessage));
799                 } else if ("CloseConnectionDuplicateClientName".equals(messageName)) {
800                         fireReceivedCloseConnectionDuplicateClientName(new CloseConnectionDuplicateClientName(fcpMessage));
801                 } else {
802                         fireMessageReceived(fcpMessage);
803                 }
804         }
805
806         /**
807          * Handles a disconnect from the node.
808          */
809         synchronized void handleDisconnect() {
810                 Closer.close(remoteInputStream);
811                 Closer.close(remoteOutputStream);
812                 Closer.close(remoteSocket);
813                 connectionHandler = null;
814                 fireConnectionClosed();
815         }
816
817         //
818         // PRIVATE METHODS
819         //
820
821         /**
822          * Incremets the counter in {@link #incomingMessageStatistics} by <cod>1</code>
823          * for the given message name.
824          * 
825          * @param name
826          *            The name of the message to count
827          */
828         private void countMessage(String name) {
829                 int oldValue = 0;
830                 if (incomingMessageStatistics.containsKey(name)) {
831                         oldValue = incomingMessageStatistics.get(name);
832                 }
833                 incomingMessageStatistics.put(name, oldValue + 1);
834         }
835
836         /**
837          * Returns a limited input stream from the node’s input stream.
838          * 
839          * @param dataLength
840          *            The length of the stream
841          * @return The limited input stream
842          */
843         private LimitedInputStream getInputStream(long dataLength) {
844                 if (dataLength <= 0) {
845                         return new LimitedInputStream(null, 0);
846                 }
847                 return new LimitedInputStream(remoteInputStream, dataLength);
848         }
849
850 }