notify listeners on manual disconnect, too
[jFCPlib.git] / src / net / pterodactylus / fcp / FcpConnection.java
1 /*
2  * jSite2 - FpcConnection.java -
3  * Copyright © 2008 David Roden
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18  */
19
20 package net.pterodactylus.fcp;
21
22 import java.io.FilterInputStream;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.OutputStream;
26 import java.net.InetAddress;
27 import java.net.Socket;
28 import java.net.UnknownHostException;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.HashMap;
32 import java.util.List;
33 import java.util.Map;
34
35 /**
36  * An FCP connection to a Freenet node.
37  * 
38  * @author David ‘Bombe’ Roden <bombe@freenetproject.org>
39  * @version $Id$
40  */
41 public class FcpConnection {
42
43         /** The default port for FCP v2. */
44         public static final int DEFAULT_PORT = 9481;
45
46         /** The list of FCP listeners. */
47         private final List<FcpListener> fcpListeners = new ArrayList<FcpListener>();
48
49         /** The address of the node. */
50         private final InetAddress address;
51
52         /** The port number of the node’s FCP port. */
53         private final int port;
54
55         /** The remote socket. */
56         private Socket remoteSocket;
57
58         /** The input stream from the node. */
59         private InputStream remoteInputStream;
60
61         /** The output stream to the node. */
62         private OutputStream remoteOutputStream;
63
64         /** The connection handler. */
65         private FcpConnectionHandler connectionHandler;
66
67         /** Incoming message statistics. */
68         private Map<String, Integer> incomingMessageStatistics = Collections.synchronizedMap(new HashMap<String, Integer>());
69
70         /**
71          * Creates a new FCP connection to the freenet node running on localhost,
72          * using the default port.
73          * 
74          * @throws UnknownHostException
75          *             if the hostname can not be resolved
76          */
77         public FcpConnection() throws UnknownHostException {
78                 this(InetAddress.getLocalHost());
79         }
80
81         /**
82          * Creates a new FCP connection to the Freenet node running on the given
83          * host, listening on the default port.
84          * 
85          * @param host
86          *            The hostname of the Freenet node
87          * @throws UnknownHostException
88          *             if <code>host</code> can not be resolved
89          */
90         public FcpConnection(String host) throws UnknownHostException {
91                 this(host, DEFAULT_PORT);
92         }
93
94         /**
95          * Creates a new FCP connection to the Freenet node running on the given
96          * host, listening on the given port.
97          * 
98          * @param host
99          *            The hostname of the Freenet node
100          * @param port
101          *            The port number of the node’s FCP port
102          * @throws UnknownHostException
103          *             if <code>host</code> can not be resolved
104          */
105         public FcpConnection(String host, int port) throws UnknownHostException {
106                 this(InetAddress.getByName(host), port);
107         }
108
109         /**
110          * Creates a new FCP connection to the Freenet node running at the given
111          * address, listening on the default port.
112          * 
113          * @param address
114          *            The address of the Freenet node
115          */
116         public FcpConnection(InetAddress address) {
117                 this(address, DEFAULT_PORT);
118         }
119
120         /**
121          * Creates a new FCP connection to the Freenet node running at the given
122          * address, listening on the given port.
123          * 
124          * @param address
125          *            The address of the Freenet node
126          * @param port
127          *            The port number of the node’s FCP port
128          */
129         public FcpConnection(InetAddress address, int port) {
130                 this.address = address;
131                 this.port = port;
132         }
133
134         //
135         // LISTENER MANAGEMENT
136         //
137
138         /**
139          * Adds the given listener to the list of listeners.
140          * 
141          * @param fcpListener
142          *            The listener to add
143          */
144         public void addFcpListener(FcpListener fcpListener) {
145                 fcpListeners.add(fcpListener);
146         }
147
148         /**
149          * Removes the given listener from the list of listeners.
150          * 
151          * @param fcpListener
152          *            The listener to remove
153          */
154         public void removeFcpListener(FcpListener fcpListener) {
155                 fcpListeners.remove(fcpListener);
156         }
157
158         /**
159          * Notifies listeners that a “NodeHello” message was received.
160          * 
161          * @see FcpListener#receivedNodeHello(FcpConnection, NodeHello)
162          * @param nodeHello
163          *            The “NodeHello” message
164          */
165         private void fireReceivedNodeHello(NodeHello nodeHello) {
166                 for (FcpListener fcpListener: fcpListeners) {
167                         fcpListener.receivedNodeHello(this, nodeHello);
168                 }
169         }
170
171         /**
172          * Notifies listeners that a “CloseConnectionDuplicateClientName” message
173          * was received.
174          * 
175          * @see FcpListener#receivedCloseConnectionDuplicateClientName(FcpConnection,
176          *      CloseConnectionDuplicateClientName)
177          * @param closeConnectionDuplicateClientName
178          *            The “CloseConnectionDuplicateClientName” message
179          */
180         private void fireReceivedCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
181                 for (FcpListener fcpListener: fcpListeners) {
182                         fcpListener.receivedCloseConnectionDuplicateClientName(this, closeConnectionDuplicateClientName);
183                 }
184         }
185
186         /**
187          * Notifies listeners that a “SSKKeypair” message was received.
188          * 
189          * @see FcpListener#receivedSSKKeypair(FcpConnection, SSKKeypair)
190          * @param sskKeypair
191          *            The “SSKKeypair” message
192          */
193         private void fireReceivedSSKKeypair(SSKKeypair sskKeypair) {
194                 for (FcpListener fcpListener: fcpListeners) {
195                         fcpListener.receivedSSKKeypair(this, sskKeypair);
196                 }
197         }
198
199         /**
200          * Notifies listeners that a “Peer” message was received.
201          * 
202          * @see FcpListener#receivedPeer(FcpConnection, Peer)
203          * @param peer
204          *            The “Peer” message
205          */
206         private void fireReceivedPeer(Peer peer) {
207                 for (FcpListener fcpListener: fcpListeners) {
208                         fcpListener.receivedPeer(this, peer);
209                 }
210         }
211
212         /**
213          * Notifies all listeners that an “EndListPeers” message was received.
214          * 
215          * @see FcpListener#receivedEndListPeers(FcpConnection, EndListPeers)
216          * @param endListPeers
217          *            The “EndListPeers” message
218          */
219         private void fireReceivedEndListPeers(EndListPeers endListPeers) {
220                 for (FcpListener fcpListener: fcpListeners) {
221                         fcpListener.receivedEndListPeers(this, endListPeers);
222                 }
223         }
224
225         /**
226          * Notifies all listeners that a “PeerNote” message was received.
227          * 
228          * @see FcpListener#receivedPeerNote(FcpConnection, PeerNote)
229          * @param peerNote
230          */
231         private void fireReceivedPeerNote(PeerNote peerNote) {
232                 for (FcpListener fcpListener: fcpListeners) {
233                         fcpListener.receivedPeerNote(this, peerNote);
234                 }
235         }
236
237         /**
238          * Notifies all listeners that an “EndListPeerNotes” message was received.
239          * 
240          * @see FcpListener#receivedEndListPeerNotes(FcpConnection,
241          *      EndListPeerNotes)
242          * @param endListPeerNotes
243          *            The “EndListPeerNotes” message
244          */
245         private void fireReceivedEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
246                 for (FcpListener fcpListener: fcpListeners) {
247                         fcpListener.receivedEndListPeerNotes(this, endListPeerNotes);
248                 }
249         }
250
251         /**
252          * Notifies all listeners that a “PeerRemoved” message was received.
253          * 
254          * @see FcpListener#receivedPeerRemoved(FcpConnection, PeerRemoved)
255          * @param peerRemoved
256          *            The “PeerRemoved” message
257          */
258         private void fireReceivedPeerRemoved(PeerRemoved peerRemoved) {
259                 for (FcpListener fcpListener: fcpListeners) {
260                         fcpListener.receivedPeerRemoved(this, peerRemoved);
261                 }
262         }
263
264         /**
265          * Notifies all listeners that a “NodeData” message was received.
266          * 
267          * @see FcpListener#receivedNodeData(FcpConnection, NodeData)
268          * @param nodeData
269          *            The “NodeData” message
270          */
271         private void fireReceivedNodeData(NodeData nodeData) {
272                 for (FcpListener fcpListener: fcpListeners) {
273                         fcpListener.receivedNodeData(this, nodeData);
274                 }
275         }
276
277         /**
278          * Notifies all listeners that a “TestDDAReply” message was received.
279          * 
280          * @see FcpListener#receivedTestDDAReply(FcpConnection, TestDDAReply)
281          * @param testDDAReply
282          *            The “TestDDAReply” message
283          */
284         private void fireReceivedTestDDAReply(TestDDAReply testDDAReply) {
285                 for (FcpListener fcpListener: fcpListeners) {
286                         fcpListener.receivedTestDDAReply(this, testDDAReply);
287                 }
288         }
289
290         /**
291          * Notifies all listeners that a “TestDDAComplete” message was received.
292          * 
293          * @see FcpListener#receivedTestDDAComplete(FcpConnection, TestDDAComplete)
294          * @param testDDAComplete
295          *            The “TestDDAComplete” message
296          */
297         private void fireReceivedTestDDAComplete(TestDDAComplete testDDAComplete) {
298                 for (FcpListener fcpListener: fcpListeners) {
299                         fcpListener.receivedTestDDAComplete(this, testDDAComplete);
300                 }
301         }
302
303         /**
304          * Notifies all listeners that a “PersistentGet” message was received.
305          * 
306          * @see FcpListener#receivedPersistentGet(FcpConnection, PersistentGet)
307          * @param persistentGet
308          *            The “PersistentGet” message
309          */
310         private void fireReceivedPersistentGet(PersistentGet persistentGet) {
311                 for (FcpListener fcpListener: fcpListeners) {
312                         fcpListener.receivedPersistentGet(this, persistentGet);
313                 }
314         }
315
316         /**
317          * Notifies all listeners that a “PersistentPut” message was received.
318          * 
319          * @see FcpListener#receivedPersistentPut(FcpConnection, PersistentPut)
320          * @param persistentPut
321          *            The “PersistentPut” message
322          */
323         private void fireReceivedPersistentPut(PersistentPut persistentPut) {
324                 for (FcpListener fcpListener: fcpListeners) {
325                         fcpListener.receivedPersistentPut(this, persistentPut);
326                 }
327         }
328
329         /**
330          * Notifies all listeners that a “EndListPersistentRequests” message was
331          * received.
332          * 
333          * @see FcpListener#receivedEndListPersistentRequests(FcpConnection,
334          *      EndListPersistentRequests)
335          * @param endListPersistentRequests
336          *            The “EndListPersistentRequests” message
337          */
338         private void fireReceivedEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
339                 for (FcpListener fcpListener: fcpListeners) {
340                         fcpListener.receivedEndListPersistentRequests(this, endListPersistentRequests);
341                 }
342         }
343
344         /**
345          * Notifies all listeners that a “URIGenerated” message was received.
346          * 
347          * @see FcpListener#receivedURIGenerated(FcpConnection, URIGenerated)
348          * @param uriGenerated
349          *            The “URIGenerated” message
350          */
351         private void fireReceivedURIGenerated(URIGenerated uriGenerated) {
352                 for (FcpListener fcpListener: fcpListeners) {
353                         fcpListener.receivedURIGenerated(this, uriGenerated);
354                 }
355         }
356
357         /**
358          * Notifies all listeners that a “DataFound” message was received.
359          * 
360          * @see FcpListener#receivedDataFound(FcpConnection, DataFound)
361          * @param dataFound
362          *            The “DataFound” message
363          */
364         private void fireReceivedDataFound(DataFound dataFound) {
365                 for (FcpListener fcpListener: fcpListeners) {
366                         fcpListener.receivedDataFound(this, dataFound);
367                 }
368         }
369
370         /**
371          * Notifies all listeners that an “AllData” message was received.
372          * 
373          * @see FcpListener#receivedAllData(FcpConnection, AllData)
374          * @param allData
375          *            The “AllData” message
376          */
377         private void fireReceivedAllData(AllData allData) {
378                 for (FcpListener fcpListener: fcpListeners) {
379                         fcpListener.receivedAllData(this, allData);
380                 }
381         }
382
383         /**
384          * Notifies all listeners that a “SimpleProgress” message was received.
385          * 
386          * @see FcpListener#receivedSimpleProgress(FcpConnection, SimpleProgress)
387          * @param simpleProgress
388          *            The “SimpleProgress” message
389          */
390         private void fireReceivedSimpleProgress(SimpleProgress simpleProgress) {
391                 for (FcpListener fcpListener: fcpListeners) {
392                         fcpListener.receivedSimpleProgress(this, simpleProgress);
393                 }
394         }
395
396         /**
397          * Notifies all listeners that a “StartedCompression” message was received.
398          * 
399          * @see FcpListener#receivedStartedCompression(FcpConnection,
400          *      StartedCompression)
401          * @param startedCompression
402          *            The “StartedCompression” message
403          */
404         private void fireReceivedStartedCompression(StartedCompression startedCompression) {
405                 for (FcpListener fcpListener: fcpListeners) {
406                         fcpListener.receivedStartedCompression(this, startedCompression);
407                 }
408         }
409
410         /**
411          * Notifies all listeners that a “FinishedCompression” message was received.
412          * 
413          * @see FcpListener#receviedFinishedCompression(FcpConnection,
414          *      FinishedCompression)
415          * @param finishedCompression
416          *            The “FinishedCompression” message
417          */
418         private void fireReceivedFinishedCompression(FinishedCompression finishedCompression) {
419                 for (FcpListener fcpListener: fcpListeners) {
420                         fcpListener.receviedFinishedCompression(this, finishedCompression);
421                 }
422         }
423
424         /**
425          * Notifies all listeners that an “UnknownPeerNoteType” message was
426          * received.
427          * 
428          * @see FcpListener#receivedUnknownPeerNoteType(FcpConnection,
429          *      UnknownPeerNoteType)
430          * @param unknownPeerNoteType
431          *            The “UnknownPeerNoteType” message
432          */
433         private void fireReceivedUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
434                 for (FcpListener fcpListener: fcpListeners) {
435                         fcpListener.receivedUnknownPeerNoteType(this, unknownPeerNoteType);
436                 }
437         }
438
439         /**
440          * Notifies all listeners that an “UnknownNodeIdentifier” message was
441          * received.
442          * 
443          * @see FcpListener#receivedUnknownNodeIdentifier(FcpConnection,
444          *      UnknownNodeIdentifier)
445          * @param unknownNodeIdentifier
446          *            The “UnknownNodeIdentifier” message
447          */
448         private void fireReceivedUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
449                 for (FcpListener fcpListener: fcpListeners) {
450                         fcpListener.receivedUnknownNodeIdentifier(this, unknownNodeIdentifier);
451                 }
452         }
453
454         /**
455          * Notifies all listeners that a “ConfigData” message was received.
456          * 
457          * @see FcpListener#receivedConfigData(FcpConnection, ConfigData)
458          * @param configData
459          *            The “ConfigData” message
460          */
461         private void fireReceivedConfigData(ConfigData configData) {
462                 for (FcpListener fcpListener: fcpListeners) {
463                         fcpListener.receivedConfigData(this, configData);
464                 }
465         }
466
467         /**
468          * Notifies all listeners that a “GetFailed” message was received.
469          * 
470          * @see FcpListener#receivedGetFailed(FcpConnection, GetFailed)
471          * @param getFailed
472          *            The “GetFailed” message
473          */
474         private void fireReceivedGetFailed(GetFailed getFailed) {
475                 for (FcpListener fcpListener: fcpListeners) {
476                         fcpListener.receivedGetFailed(this, getFailed);
477                 }
478         }
479
480         /**
481          * Notifies all listeners that a “PutFailed” message was received.
482          * 
483          * @see FcpListener#receivedPutFailed(FcpConnection, PutFailed)
484          * @param putFailed
485          *            The “PutFailed” message
486          */
487         private void fireReceivedPutFailed(PutFailed putFailed) {
488                 for (FcpListener fcpListener: fcpListeners) {
489                         fcpListener.receivedPutFailed(this, putFailed);
490                 }
491         }
492
493         /**
494          * Notifies all listeners that an “IdentifierCollision” message was
495          * received.
496          * 
497          * @see FcpListener#receivedIdentifierCollision(FcpConnection,
498          *      IdentifierCollision)
499          * @param identifierCollision
500          *            The “IdentifierCollision” message
501          */
502         private void fireReceivedIdentifierCollision(IdentifierCollision identifierCollision) {
503                 for (FcpListener fcpListener: fcpListeners) {
504                         fcpListener.receivedIdentifierCollision(this, identifierCollision);
505                 }
506         }
507
508         /**
509          * Notifies all listeners that an “PersistentPutDir” message was received.
510          * 
511          * @see FcpListener#receivedPersistentPutDir(FcpConnection,
512          *      PersistentPutDir)
513          * @param persistentPutDir
514          *            The “PersistentPutDir” message
515          */
516         private void fireReceivedPersistentPutDir(PersistentPutDir persistentPutDir) {
517                 for (FcpListener fcpListener: fcpListeners) {
518                         fcpListener.receivedPersistentPutDir(this, persistentPutDir);
519                 }
520         }
521
522         /**
523          * Notifies all listeners that a “PersistentRequestRemoved” message was
524          * received.
525          * 
526          * @see FcpListener#receivedPersistentRequestRemoved(FcpConnection,
527          *      PersistentRequestRemoved)
528          * @param persistentRequestRemoved
529          *            The “PersistentRequestRemoved” message
530          */
531         private void fireReceivedPersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) {
532                 for (FcpListener fcpListener: fcpListeners) {
533                         fcpListener.receivedPersistentRequestRemoved(this, persistentRequestRemoved);
534                 }
535         }
536
537         /**
538          * Notifies all listeners that a “SubscribedUSKUpdate” message was received.
539          * 
540          * @see FcpListener#receivedSubscribedUSKUpdate(FcpConnection,
541          *      SubscribedUSKUpdate)
542          * @param subscribedUSKUpdate
543          *            The “SubscribedUSKUpdate” message
544          */
545         private void fireReceivedSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) {
546                 for (FcpListener fcpListener: fcpListeners) {
547                         fcpListener.receivedSubscribedUSKUpdate(this, subscribedUSKUpdate);
548                 }
549         }
550
551         /**
552          * Notifies all listeners that a “PluginInfo” message was received.
553          * 
554          * @see FcpListener#receivedPluginInfo(FcpConnection, PluginInfo)
555          * @param pluginInfo
556          *            The “PluginInfo” message
557          */
558         private void fireReceivedPluginInfo(PluginInfo pluginInfo) {
559                 for (FcpListener fcpListener: fcpListeners) {
560                         fcpListener.receivedPluginInfo(this, pluginInfo);
561                 }
562         }
563
564         /**
565          * Notifies all listeners that an “FCPPluginReply” message was received.
566          * 
567          * @see FcpListener#receivedFCPPluginReply(FcpConnection, FCPPluginReply)
568          * @param fcpPluginReply
569          *            The “FCPPluginReply” message
570          */
571         private void fireReceivedFCPPluginReply(FCPPluginReply fcpPluginReply) {
572                 for (FcpListener fcpListener: fcpListeners) {
573                         fcpListener.receivedFCPPluginReply(this, fcpPluginReply);
574                 }
575         }
576
577         /**
578          * Notifies all listeners that a “PersistentRequestModified” message was
579          * received.
580          * 
581          * @see FcpListener#receivedPersistentRequestModified(FcpConnection,
582          *      PersistentRequestModified)
583          * @param persistentRequestModified
584          *            The “PersistentRequestModified” message
585          */
586         private void fireReceivedPersistentRequestModified(PersistentRequestModified persistentRequestModified) {
587                 for (FcpListener fcpListener: fcpListeners) {
588                         fcpListener.receivedPersistentRequestModified(this, persistentRequestModified);
589                 }
590         }
591
592         /**
593          * Notifies all listeners that a “PutSuccessful” message was received.
594          * 
595          * @see FcpListener#receivedPutSuccessful(FcpConnection, PutSuccessful)
596          * @param putSuccessful
597          *            The “PutSuccessful” message
598          */
599         private void fireReceivedPutSuccessful(PutSuccessful putSuccessful) {
600                 for (FcpListener fcpListener: fcpListeners) {
601                         fcpListener.receivedPutSuccessful(this, putSuccessful);
602                 }
603         }
604
605         /**
606          * Notifies all listeners that a “PutFetchable” message was received.
607          * 
608          * @see FcpListener#receivedPutFetchable(FcpConnection, PutFetchable)
609          * @param putFetchable
610          *            The “PutFetchable” message
611          */
612         private void fireReceivedPutFetchable(PutFetchable putFetchable) {
613                 for (FcpListener fcpListener: fcpListeners) {
614                         fcpListener.receivedPutFetchable(this, putFetchable);
615                 }
616         }
617
618         /**
619          * Notifies all listeners that a “ProtocolError” message was received.
620          * 
621          * @see FcpListener#receivedProtocolError(FcpConnection, ProtocolError)
622          * @param protocolError
623          *            The “ProtocolError” message
624          */
625         private void fireReceivedProtocolError(ProtocolError protocolError) {
626                 for (FcpListener fcpListener: fcpListeners) {
627                         fcpListener.receivedProtocolError(this, protocolError);
628                 }
629         }
630
631         /**
632          * Notifies all registered listeners that a message has been received.
633          * 
634          * @see FcpListener#receivedMessage(FcpConnection, FcpMessage)
635          * @param fcpMessage
636          *            The message that was received
637          */
638         private void fireMessageReceived(FcpMessage fcpMessage) {
639                 for (FcpListener fcpListener: fcpListeners) {
640                         fcpListener.receivedMessage(this, fcpMessage);
641                 }
642         }
643
644         /**
645          * Notifies all listeners that the connection to the node was closed.
646          * 
647          * @see FcpListener#connectionClosed(FcpConnection)
648          */
649         private void fireConnectionClosed() {
650                 for (FcpListener fcpListener: fcpListeners) {
651                         fcpListener.connectionClosed(this);
652                 }
653         }
654
655         //
656         // ACTIONS
657         //
658
659         /**
660          * Connects to the node.
661          * 
662          * @throws IOException
663          *             if an I/O error occurs
664          * @throws IllegalStateException
665          *             if there is already a connection to the node
666          */
667         public synchronized void connect() throws IOException, IllegalStateException {
668                 if (connectionHandler != null) {
669                         throw new IllegalStateException("already connected, disconnect first");
670                 }
671                 remoteSocket = new Socket(address, port);
672                 remoteInputStream = remoteSocket.getInputStream();
673                 remoteOutputStream = remoteSocket.getOutputStream();
674                 new Thread(connectionHandler = new FcpConnectionHandler(this, remoteInputStream)).start();
675         }
676
677         /**
678          * Disconnects from the node. If there is no connection to the node, this
679          * method does nothing.
680          */
681         public synchronized void disconnect() {
682                 if (connectionHandler == null) {
683                         return;
684                 }
685                 FcpUtils.close(remoteSocket);
686                 connectionHandler.stop();
687                 connectionHandler = null;
688                 fireConnectionClosed();
689         }
690
691         /**
692          * Sends the given FCP message.
693          * 
694          * @param fcpMessage
695          *            The FCP message to send
696          * @throws IOException
697          *             if an I/O error occurs
698          */
699         public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException {
700                 System.out.println("sending message: " + fcpMessage.getName());
701                 fcpMessage.write(remoteOutputStream);
702         }
703
704         //
705         // PACKAGE-PRIVATE METHODS
706         //
707
708         /**
709          * Handles the given message, notifying listeners. This message should only
710          * be called by {@link FcpConnectionHandler}.
711          * 
712          * @param fcpMessage
713          *            The received message
714          */
715         void handleMessage(FcpMessage fcpMessage) {
716                 String messageName = fcpMessage.getName();
717                 countMessage(messageName);
718                 if ("SimpleProgress".equals(messageName)) {
719                         fireReceivedSimpleProgress(new SimpleProgress(fcpMessage));
720                 } else if ("ProtocolError".equals(messageName)) {
721                         fireReceivedProtocolError(new ProtocolError(fcpMessage));
722                 } else if ("PersistentGet".equals(messageName)) {
723                         fireReceivedPersistentGet(new PersistentGet(fcpMessage));
724                 } else if ("PersistentPut".equals(messageName)) {
725                         fireReceivedPersistentPut(new PersistentPut(fcpMessage));
726                 } else if ("PersistentPutDir".equals(messageName)) {
727                         fireReceivedPersistentPutDir(new PersistentPutDir(fcpMessage));
728                 } else if ("URIGenerated".equals(messageName)) {
729                         fireReceivedURIGenerated(new URIGenerated(fcpMessage));
730                 } else if ("EndListPersistentRequests".equals(messageName)) {
731                         fireReceivedEndListPersistentRequests(new EndListPersistentRequests(fcpMessage));
732                 } else if ("Peer".equals(messageName)) {
733                         fireReceivedPeer(new Peer(fcpMessage));
734                 } else if ("PeerNote".equals(messageName)) {
735                         fireReceivedPeerNote(new PeerNote(fcpMessage));
736                 } else if ("StartedCompression".equals(messageName)) {
737                         fireReceivedStartedCompression(new StartedCompression(fcpMessage));
738                 } else if ("FinishedCompression".equals(messageName)) {
739                         fireReceivedFinishedCompression(new FinishedCompression(fcpMessage));
740                 } else if ("GetFailed".equals(messageName)) {
741                         fireReceivedGetFailed(new GetFailed(fcpMessage));
742                 } else if ("PutFetchable".equals(messageName)) {
743                         fireReceivedPutFetchable(new PutFetchable(fcpMessage));
744                 } else if ("PutSuccessful".equals(messageName)) {
745                         fireReceivedPutSuccessful(new PutSuccessful(fcpMessage));
746                 } else if ("PutFailed".equals(messageName)) {
747                         fireReceivedPutFailed(new PutFailed(fcpMessage));
748                 } else if ("DataFound".equals(messageName)) {
749                         fireReceivedDataFound(new DataFound(fcpMessage));
750                 } else if ("SubscribedUSKUpdate".equals(messageName)) {
751                         fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage));
752                 } else if ("IdentifierCollision".equals(messageName)) {
753                         fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage));
754                 } else if ("AllData".equals(messageName)) {
755                         LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
756                         fireReceivedAllData(new AllData(fcpMessage, payloadInputStream));
757                         try {
758                                 payloadInputStream.consume();
759                         } catch (IOException ioe1) {
760                                 /* well, ignore. when the connection handler fails, all fails. */
761                         }
762                 } else if ("EndListPeerNotes".equals(messageName)) {
763                         fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage));
764                 } else if ("EndListPeers".equals(messageName)) {
765                         fireReceivedEndListPeers(new EndListPeers(fcpMessage));
766                 } else if ("SSKKeypair".equals(messageName)) {
767                         fireReceivedSSKKeypair(new SSKKeypair(fcpMessage));
768                 } else if ("PeerRemoved".equals(messageName)) {
769                         fireReceivedPeerRemoved(new PeerRemoved(fcpMessage));
770                 } else if ("PersistentRequestModified".equals(messageName)) {
771                         fireReceivedPersistentRequestModified(new PersistentRequestModified(fcpMessage));
772                 } else if ("PersistentRequestRemoved".equals(messageName)) {
773                         fireReceivedPersistentRequestRemoved(new PersistentRequestRemoved(fcpMessage));
774                 } else if ("UnknownPeerNoteType".equals(messageName)) {
775                         fireReceivedUnknownPeerNoteType(new UnknownPeerNoteType(fcpMessage));
776                 } else if ("UnknownNodeIdentifier".equals(messageName)) {
777                         fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage));
778                 } else if ("FCPPluginReply".equals(messageName)) {
779                         LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
780                         fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream));
781                         try {
782                                 payloadInputStream.consume();
783                         } catch (IOException ioe1) {
784                                 /* ignore. */
785                         }
786                 } else if ("PluginInfo".equals(messageName)) {
787                         fireReceivedPluginInfo(new PluginInfo(fcpMessage));
788                 } else if ("NodeData".equals(messageName)) {
789                         fireReceivedNodeData(new NodeData(fcpMessage));
790                 } else if ("TestDDAReply".equals(messageName)) {
791                         fireReceivedTestDDAReply(new TestDDAReply(fcpMessage));
792                 } else if ("TestDDAComplete".equals(messageName)) {
793                         fireReceivedTestDDAComplete(new TestDDAComplete(fcpMessage));
794                 } else if ("ConfigData".equals(messageName)) {
795                         fireReceivedConfigData(new ConfigData(fcpMessage));
796                 } else if ("NodeHello".equals(messageName)) {
797                         fireReceivedNodeHello(new NodeHello(fcpMessage));
798                 } else if ("CloseConnectionDuplicateClientName".equals(messageName)) {
799                         fireReceivedCloseConnectionDuplicateClientName(new CloseConnectionDuplicateClientName(fcpMessage));
800                 } else {
801                         fireMessageReceived(fcpMessage);
802                 }
803         }
804
805         /**
806          * Handles a disconnect from the node.
807          */
808         synchronized void handleDisconnect() {
809                 FcpUtils.close(remoteInputStream);
810                 FcpUtils.close(remoteOutputStream);
811                 FcpUtils.close(remoteSocket);
812                 connectionHandler = null;
813                 fireConnectionClosed();
814         }
815
816         //
817         // PRIVATE METHODS
818         //
819
820         /**
821          * Incremets the counter in {@link #incomingMessageStatistics} by <cod>1</code>
822          * for the given message name.
823          * 
824          * @param name
825          *            The name of the message to count
826          */
827         private void countMessage(String name) {
828                 int oldValue = 0;
829                 if (incomingMessageStatistics.containsKey(name)) {
830                         oldValue = incomingMessageStatistics.get(name);
831                 }
832                 incomingMessageStatistics.put(name, oldValue + 1);
833         }
834
835         /**
836          * Returns a limited input stream from the node’s input stream.
837          * 
838          * @param dataLength
839          *            The length of the stream
840          * @return The limited input stream
841          */
842         private LimitedInputStream getInputStream(long dataLength) {
843                 if (dataLength <= 0) {
844                         return new LimitedInputStream(null, 0);
845                 }
846                 return new LimitedInputStream(remoteInputStream, dataLength);
847         }
848
849         /**
850          * A wrapper around an {@link InputStream} that only supplies a limit number
851          * of bytes from the underlying input stream.
852          * 
853          * @author <a href="mailto:dr@ina-germany.de">David Roden</a>
854          * @version $Id$
855          */
856         private static class LimitedInputStream extends FilterInputStream {
857
858                 /** The remaining number of bytes that can be read. */
859                 private long remaining;
860
861                 /**
862                  * Creates a new LimitedInputStream that supplies at most
863                  * <code>length</code> bytes from the given input stream.
864                  * 
865                  * @param inputStream
866                  *            The input stream
867                  * @param length
868                  *            The number of bytes to read
869                  */
870                 public LimitedInputStream(InputStream inputStream, long length) {
871                         super(inputStream);
872                         remaining = length;
873                 }
874
875                 /**
876                  * @see java.io.FilterInputStream#available()
877                  */
878                 @Override
879                 public synchronized int available() throws IOException {
880                         if (remaining == 0) {
881                                 return 0;
882                         }
883                         return (int) Math.min(super.available(), Math.min(Integer.MAX_VALUE, remaining));
884                 }
885
886                 /**
887                  * @see java.io.FilterInputStream#read()
888                  */
889                 @Override
890                 public synchronized int read() throws IOException {
891                         int read = -1;
892                         if (remaining > 0) {
893                                 read = super.read();
894                                 remaining--;
895                         }
896                         return read;
897                 }
898
899                 /**
900                  * @see java.io.FilterInputStream#read(byte[], int, int)
901                  */
902                 @Override
903                 public synchronized int read(byte[] b, int off, int len) throws IOException {
904                         if (remaining == 0) {
905                                 return -1;
906                         }
907                         int toCopy = (int) Math.min(len, Math.min(remaining, Integer.MAX_VALUE));
908                         int read = super.read(b, off, toCopy);
909                         remaining -= read;
910                         return read;
911                 }
912
913                 /**
914                  * @see java.io.FilterInputStream#skip(long)
915                  */
916                 @Override
917                 public synchronized long skip(long n) throws IOException {
918                         if ((n < 0) || (remaining == 0)) {
919                                 return 0;
920                         }
921                         long skipped = super.skip(Math.min(n, remaining));
922                         remaining -= skipped;
923                         return skipped;
924                 }
925
926                 /**
927                  * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
928                  * {@link #reset()} are not supported.
929                  * 
930                  * @see java.io.FilterInputStream#mark(int)
931                  */
932                 @Override
933                 public void mark(int readlimit) {
934                         /* do nothing. */
935                 }
936
937                 /**
938                  * {@inheritDoc}
939                  * 
940                  * @see java.io.FilterInputStream#markSupported()
941                  * @return <code>false</code>
942                  */
943                 @Override
944                 public boolean markSupported() {
945                         return false;
946                 }
947
948                 /**
949                  * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
950                  * {@link #reset()} are not supported.
951                  * 
952                  * @see java.io.FilterInputStream#reset()
953                  */
954                 @Override
955                 public void reset() throws IOException {
956                         /* do nothing. */
957                 }
958
959                 /**
960                  * Consumes the input stream, i.e. read all bytes until the limit is
961                  * reached.
962                  * 
963                  * @throws IOException
964                  *             if an I/O error occurs
965                  */
966                 public void consume() throws IOException {
967                         while (remaining > 0) {
968                                 skip(remaining);
969                         }
970                 }
971
972         }
973
974 }