rename fcplib to jFCPlib
[jFCPlib.git] / src / net / pterodactylus / fcp / FcpConnection.java
1 /*
2  * jSite2 - FpcConnection.java -
3  * Copyright © 2008 David Roden
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18  */
19
20 package net.pterodactylus.fcp;
21
22 import java.io.FilterInputStream;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.OutputStream;
26 import java.net.InetAddress;
27 import java.net.Socket;
28 import java.net.UnknownHostException;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.HashMap;
32 import java.util.List;
33 import java.util.Map;
34
35 /**
36  * An FCP connection to a Freenet node.
37  * 
38  * @author David ‘Bombe’ Roden <bombe@freenetproject.org>
39  * @version $Id$
40  */
41 public class FcpConnection {
42
43         /** The default port for FCP v2. */
44         public static final int DEFAULT_PORT = 9481;
45
46         /** The list of FCP listeners. */
47         private final List<FcpListener> fcpListeners = new ArrayList<FcpListener>();
48
49         /** The address of the node. */
50         private final InetAddress address;
51
52         /** The port number of the node’s FCP port. */
53         private final int port;
54
55         /** The remote socket. */
56         private Socket remoteSocket;
57
58         /** The input stream from the node. */
59         private InputStream remoteInputStream;
60
61         /** The output stream to the node. */
62         private OutputStream remoteOutputStream;
63
64         /** The connection handler. */
65         private FcpConnectionHandler connectionHandler;
66
67         /** Incoming message statistics. */
68         private Map<String, Integer> incomingMessageStatistics = Collections.synchronizedMap(new HashMap<String, Integer>());
69
70         /**
71          * Creates a new FCP connection to the freenet node running on localhost,
72          * using the default port.
73          * 
74          * @throws UnknownHostException
75          *             if the hostname can not be resolved
76          */
77         public FcpConnection() throws UnknownHostException {
78                 this(InetAddress.getLocalHost());
79         }
80
81         /**
82          * Creates a new FCP connection to the Freenet node running on the given
83          * host, listening on the default port.
84          * 
85          * @param host
86          *            The hostname of the Freenet node
87          * @throws UnknownHostException
88          *             if <code>host</code> can not be resolved
89          */
90         public FcpConnection(String host) throws UnknownHostException {
91                 this(host, DEFAULT_PORT);
92         }
93
94         /**
95          * Creates a new FCP connection to the Freenet node running on the given
96          * host, listening on the given port.
97          * 
98          * @param host
99          *            The hostname of the Freenet node
100          * @param port
101          *            The port number of the node’s FCP port
102          * @throws UnknownHostException
103          *             if <code>host</code> can not be resolved
104          */
105         public FcpConnection(String host, int port) throws UnknownHostException {
106                 this(InetAddress.getByName(host), port);
107         }
108
109         /**
110          * Creates a new FCP connection to the Freenet node running at the given
111          * address, listening on the default port.
112          * 
113          * @param address
114          *            The address of the Freenet node
115          */
116         public FcpConnection(InetAddress address) {
117                 this(address, DEFAULT_PORT);
118         }
119
120         /**
121          * Creates a new FCP connection to the Freenet node running at the given
122          * address, listening on the given port.
123          * 
124          * @param address
125          *            The address of the Freenet node
126          * @param port
127          *            The port number of the node’s FCP port
128          */
129         public FcpConnection(InetAddress address, int port) {
130                 this.address = address;
131                 this.port = port;
132         }
133
134         //
135         // LISTENER MANAGEMENT
136         //
137
138         /**
139          * Adds the given listener to the list of listeners.
140          * 
141          * @param fcpListener
142          *            The listener to add
143          */
144         public void addFcpListener(FcpListener fcpListener) {
145                 fcpListeners.add(fcpListener);
146         }
147
148         /**
149          * Removes the given listener from the list of listeners.
150          * 
151          * @param fcpListener
152          *            The listener to remove
153          */
154         public void removeFcpListener(FcpListener fcpListener) {
155                 fcpListeners.remove(fcpListener);
156         }
157
158         /**
159          * Notifies listeners that a “NodeHello” message was received.
160          * 
161          * @see FcpListener#receivedNodeHello(FcpConnection, NodeHello)
162          * @param nodeHello
163          *            The “NodeHello” message
164          */
165         private void fireReceivedNodeHello(NodeHello nodeHello) {
166                 for (FcpListener fcpListener: fcpListeners) {
167                         fcpListener.receivedNodeHello(this, nodeHello);
168                 }
169         }
170
171         /**
172          * Notifies listeners that a “CloseConnectionDuplicateClientName” message
173          * was received.
174          * 
175          * @see FcpListener#receivedCloseConnectionDuplicateClientName(FcpConnection,
176          *      CloseConnectionDuplicateClientName)
177          * @param closeConnectionDuplicateClientName
178          *            The “CloseConnectionDuplicateClientName” message
179          */
180         private void fireReceivedCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
181                 for (FcpListener fcpListener: fcpListeners) {
182                         fcpListener.receivedCloseConnectionDuplicateClientName(this, closeConnectionDuplicateClientName);
183                 }
184         }
185
186         /**
187          * Notifies listeners that a “SSKKeypair” message was received.
188          * 
189          * @see FcpListener#receivedSSKKeypair(FcpConnection, SSKKeypair)
190          * @param sskKeypair
191          *            The “SSKKeypair” message
192          */
193         private void fireReceivedSSKKeypair(SSKKeypair sskKeypair) {
194                 for (FcpListener fcpListener: fcpListeners) {
195                         fcpListener.receivedSSKKeypair(this, sskKeypair);
196                 }
197         }
198
199         /**
200          * Notifies listeners that a “Peer” message was received.
201          * 
202          * @see FcpListener#receivedPeer(FcpConnection, Peer)
203          * @param peer
204          *            The “Peer” message
205          */
206         private void fireReceivedPeer(Peer peer) {
207                 for (FcpListener fcpListener: fcpListeners) {
208                         fcpListener.receivedPeer(this, peer);
209                 }
210         }
211
212         /**
213          * Notifies all listeners that an “EndListPeers” message was received.
214          * 
215          * @see FcpListener#receivedEndListPeers(FcpConnection, EndListPeers)
216          * @param endListPeers
217          *            The “EndListPeers” message
218          */
219         private void fireReceivedEndListPeers(EndListPeers endListPeers) {
220                 for (FcpListener fcpListener: fcpListeners) {
221                         fcpListener.receivedEndListPeers(this, endListPeers);
222                 }
223         }
224
225         /**
226          * Notifies all listeners that a “PeerNote” message was received.
227          * 
228          * @see FcpListener#receivedPeerNote(FcpConnection, PeerNote)
229          * @param peerNote
230          */
231         private void fireReceivedPeerNote(PeerNote peerNote) {
232                 for (FcpListener fcpListener: fcpListeners) {
233                         fcpListener.receivedPeerNote(this, peerNote);
234                 }
235         }
236
237         /**
238          * Notifies all listeners that an “EndListPeerNotes” message was received.
239          * 
240          * @see FcpListener#receivedEndListPeerNotes(FcpConnection,
241          *      EndListPeerNotes)
242          * @param endListPeerNotes
243          *            The “EndListPeerNotes” message
244          */
245         private void fireReceivedEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
246                 for (FcpListener fcpListener: fcpListeners) {
247                         fcpListener.receivedEndListPeerNotes(this, endListPeerNotes);
248                 }
249         }
250
251         /**
252          * Notifies all listeners that a “PeerRemoved” message was received.
253          * 
254          * @see FcpListener#receivedPeerRemoved(FcpConnection, PeerRemoved)
255          * @param peerRemoved
256          *            The “PeerRemoved” message
257          */
258         private void fireReceivedPeerRemoved(PeerRemoved peerRemoved) {
259                 for (FcpListener fcpListener: fcpListeners) {
260                         fcpListener.receivedPeerRemoved(this, peerRemoved);
261                 }
262         }
263
264         /**
265          * Notifies all listeners that a “NodeData” message was received.
266          * 
267          * @see FcpListener#receivedNodeData(FcpConnection, NodeData)
268          * @param nodeData
269          *            The “NodeData” message
270          */
271         private void fireReceivedNodeData(NodeData nodeData) {
272                 for (FcpListener fcpListener: fcpListeners) {
273                         fcpListener.receivedNodeData(this, nodeData);
274                 }
275         }
276
277         /**
278          * Notifies all listeners that a “TestDDAReply” message was received.
279          * 
280          * @see FcpListener#receivedTestDDAReply(FcpConnection, TestDDAReply)
281          * @param testDDAReply
282          *            The “TestDDAReply” message
283          */
284         private void fireReceivedTestDDAReply(TestDDAReply testDDAReply) {
285                 for (FcpListener fcpListener: fcpListeners) {
286                         fcpListener.receivedTestDDAReply(this, testDDAReply);
287                 }
288         }
289
290         /**
291          * Notifies all listeners that a “TestDDAComplete” message was received.
292          * 
293          * @see FcpListener#receivedTestDDAComplete(FcpConnection, TestDDAComplete)
294          * @param testDDAComplete
295          *            The “TestDDAComplete” message
296          */
297         private void fireReceivedTestDDAComplete(TestDDAComplete testDDAComplete) {
298                 for (FcpListener fcpListener: fcpListeners) {
299                         fcpListener.receivedTestDDAComplete(this, testDDAComplete);
300                 }
301         }
302
303         /**
304          * Notifies all listeners that a “PersistentGet” message was received.
305          * 
306          * @see FcpListener#receivedPersistentGet(FcpConnection, PersistentGet)
307          * @param persistentGet
308          *            The “PersistentGet” message
309          */
310         private void fireReceivedPersistentGet(PersistentGet persistentGet) {
311                 for (FcpListener fcpListener: fcpListeners) {
312                         fcpListener.receivedPersistentGet(this, persistentGet);
313                 }
314         }
315
316         /**
317          * Notifies all listeners that a “PersistentPut” message was received.
318          * 
319          * @see FcpListener#receivedPersistentPut(FcpConnection, PersistentPut)
320          * @param persistentPut
321          *            The “PersistentPut” message
322          */
323         private void fireReceivedPersistentPut(PersistentPut persistentPut) {
324                 for (FcpListener fcpListener: fcpListeners) {
325                         fcpListener.receivedPersistentPut(this, persistentPut);
326                 }
327         }
328
329         /**
330          * Notifies all listeners that a “EndListPersistentRequests” message was
331          * received.
332          * 
333          * @see FcpListener#receivedEndListPersistentRequests(FcpConnection,
334          *      EndListPersistentRequests)
335          * @param endListPersistentRequests
336          *            The “EndListPersistentRequests” message
337          */
338         private void fireReceivedEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
339                 for (FcpListener fcpListener: fcpListeners) {
340                         fcpListener.receivedEndListPersistentRequests(this, endListPersistentRequests);
341                 }
342         }
343
344         /**
345          * Notifies all listeners that a “URIGenerated” message was received.
346          * 
347          * @see FcpListener#receivedURIGenerated(FcpConnection, URIGenerated)
348          * @param uriGenerated
349          *            The “URIGenerated” message
350          */
351         private void fireReceivedURIGenerated(URIGenerated uriGenerated) {
352                 for (FcpListener fcpListener: fcpListeners) {
353                         fcpListener.receivedURIGenerated(this, uriGenerated);
354                 }
355         }
356
357         /**
358          * Notifies all listeners that a “DataFound” message was received.
359          * 
360          * @see FcpListener#receivedDataFound(FcpConnection, DataFound)
361          * @param dataFound
362          *            The “DataFound” message
363          */
364         private void fireReceivedDataFound(DataFound dataFound) {
365                 for (FcpListener fcpListener: fcpListeners) {
366                         fcpListener.receivedDataFound(this, dataFound);
367                 }
368         }
369
370         /**
371          * Notifies all listeners that an “AllData” message was received.
372          * 
373          * @see FcpListener#receivedAllData(FcpConnection, AllData)
374          * @param allData
375          *            The “AllData” message
376          */
377         private void fireReceivedAllData(AllData allData) {
378                 for (FcpListener fcpListener: fcpListeners) {
379                         fcpListener.receivedAllData(this, allData);
380                 }
381         }
382
383         /**
384          * Notifies all listeners that a “SimpleProgress” message was received.
385          * 
386          * @see FcpListener#receivedSimpleProgress(FcpConnection, SimpleProgress)
387          * @param simpleProgress
388          *            The “SimpleProgress” message
389          */
390         private void fireReceivedSimpleProgress(SimpleProgress simpleProgress) {
391                 for (FcpListener fcpListener: fcpListeners) {
392                         fcpListener.receivedSimpleProgress(this, simpleProgress);
393                 }
394         }
395
396         /**
397          * Notifies all listeners that a “StartedCompression” message was received.
398          * 
399          * @see FcpListener#receivedStartedCompression(FcpConnection,
400          *      StartedCompression)
401          * @param startedCompression
402          *            The “StartedCompression” message
403          */
404         private void fireReceivedStartedCompression(StartedCompression startedCompression) {
405                 for (FcpListener fcpListener: fcpListeners) {
406                         fcpListener.receivedStartedCompression(this, startedCompression);
407                 }
408         }
409
410         /**
411          * Notifies all listeners that a “FinishedCompression” message was received.
412          * 
413          * @see FcpListener#receviedFinishedCompression(FcpConnection,
414          *      FinishedCompression)
415          * @param finishedCompression
416          *            The “FinishedCompression” message
417          */
418         private void fireReceivedFinishedCompression(FinishedCompression finishedCompression) {
419                 for (FcpListener fcpListener: fcpListeners) {
420                         fcpListener.receviedFinishedCompression(this, finishedCompression);
421                 }
422         }
423
424         /**
425          * Notifies all listeners that an “UnknownPeerNoteType” message was
426          * received.
427          * 
428          * @see FcpListener#receivedUnknownPeerNoteType(FcpConnection,
429          *      UnknownPeerNoteType)
430          * @param unknownPeerNoteType
431          *            The “UnknownPeerNoteType” message
432          */
433         private void fireReceivedUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
434                 for (FcpListener fcpListener: fcpListeners) {
435                         fcpListener.receivedUnknownPeerNoteType(this, unknownPeerNoteType);
436                 }
437         }
438
439         /**
440          * Notifies all listeners that an “UnknownNodeIdentifier” message was
441          * received.
442          * 
443          * @see FcpListener#receivedUnknownNodeIdentifier(FcpConnection,
444          *      UnknownNodeIdentifier)
445          * @param unknownNodeIdentifier
446          *            The “UnknownNodeIdentifier” message
447          */
448         private void fireReceivedUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
449                 for (FcpListener fcpListener: fcpListeners) {
450                         fcpListener.receivedUnknownNodeIdentifier(this, unknownNodeIdentifier);
451                 }
452         }
453
454         /**
455          * Notifies all listeners that a “ConfigData” message was received.
456          * 
457          * @see FcpListener#receivedConfigData(FcpConnection, ConfigData)
458          * @param configData
459          *            The “ConfigData” message
460          */
461         private void fireReceivedConfigData(ConfigData configData) {
462                 for (FcpListener fcpListener: fcpListeners) {
463                         fcpListener.receivedConfigData(this, configData);
464                 }
465         }
466
467         /**
468          * Notifies all listeners that a “GetFailed” message was received.
469          * 
470          * @see FcpListener#receivedGetFailed(FcpConnection, GetFailed)
471          * @param getFailed
472          *            The “GetFailed” message
473          */
474         private void fireReceivedGetFailed(GetFailed getFailed) {
475                 for (FcpListener fcpListener: fcpListeners) {
476                         fcpListener.receivedGetFailed(this, getFailed);
477                 }
478         }
479
480         /**
481          * Notifies all listeners that a “PutFailed” message was received.
482          * 
483          * @see FcpListener#receivedPutFailed(FcpConnection, PutFailed)
484          * @param putFailed
485          *            The “PutFailed” message
486          */
487         private void fireReceivedPutFailed(PutFailed putFailed) {
488                 for (FcpListener fcpListener: fcpListeners) {
489                         fcpListener.receivedPutFailed(this, putFailed);
490                 }
491         }
492
493         /**
494          * Notifies all listeners that an “IdentifierCollision” message was
495          * received.
496          * 
497          * @see FcpListener#receivedIdentifierCollision(FcpConnection,
498          *      IdentifierCollision)
499          * @param identifierCollision
500          *            The “IdentifierCollision” message
501          */
502         private void fireReceivedIdentifierCollision(IdentifierCollision identifierCollision) {
503                 for (FcpListener fcpListener: fcpListeners) {
504                         fcpListener.receivedIdentifierCollision(this, identifierCollision);
505                 }
506         }
507
508         /**
509          * Notifies all listeners that an “PersistentPutDir” message was received.
510          * 
511          * @see FcpListener#receivedPersistentPutDir(FcpConnection,
512          *      PersistentPutDir)
513          * @param persistentPutDir
514          *            The “PersistentPutDir” message
515          */
516         private void fireReceivedPersistentPutDir(PersistentPutDir persistentPutDir) {
517                 for (FcpListener fcpListener: fcpListeners) {
518                         fcpListener.receivedPersistentPutDir(this, persistentPutDir);
519                 }
520         }
521
522         /**
523          * Notifies all listeners that a “PersistentRequestRemoved” message was
524          * received.
525          * 
526          * @see FcpListener#receivedPersistentRequestRemoved(FcpConnection,
527          *      PersistentRequestRemoved)
528          * @param persistentRequestRemoved
529          *            The “PersistentRequestRemoved” message
530          */
531         private void fireReceivedPersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) {
532                 for (FcpListener fcpListener: fcpListeners) {
533                         fcpListener.receivedPersistentRequestRemoved(this, persistentRequestRemoved);
534                 }
535         }
536
537         /**
538          * Notifies all listeners that a “SubscribedUSKUpdate” message was received.
539          * 
540          * @see FcpListener#receivedSubscribedUSKUpdate(FcpConnection,
541          *      SubscribedUSKUpdate)
542          * @param subscribedUSKUpdate
543          *            The “SubscribedUSKUpdate” message
544          */
545         private void fireReceivedSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) {
546                 for (FcpListener fcpListener: fcpListeners) {
547                         fcpListener.receivedSubscribedUSKUpdate(this, subscribedUSKUpdate);
548                 }
549         }
550
551         /**
552          * Notifies all listeners that a “PluginInfo” message was received.
553          * 
554          * @see FcpListener#receivedPluginInfo(FcpConnection, PluginInfo)
555          * @param pluginInfo
556          *            The “PluginInfo” message
557          */
558         private void fireReceivedPluginInfo(PluginInfo pluginInfo) {
559                 for (FcpListener fcpListener: fcpListeners) {
560                         fcpListener.receivedPluginInfo(this, pluginInfo);
561                 }
562         }
563
564         /**
565          * Notifies all listeners that an “FCPPluginReply” message was received.
566          * 
567          * @see FcpListener#receivedFCPPluginReply(FcpConnection, FCPPluginReply)
568          * @param fcpPluginReply
569          *            The “FCPPluginReply” message
570          */
571         private void fireReceivedFCPPluginReply(FCPPluginReply fcpPluginReply) {
572                 for (FcpListener fcpListener: fcpListeners) {
573                         fcpListener.receivedFCPPluginReply(this, fcpPluginReply);
574                 }
575         }
576
577         /**
578          * Notifies all listeners that a “PersistentRequestModified” message was
579          * received.
580          * 
581          * @see FcpListener#receivedPersistentRequestModified(FcpConnection,
582          *      PersistentRequestModified)
583          * @param persistentRequestModified
584          *            The “PersistentRequestModified” message
585          */
586         private void fireReceivedPersistentRequestModified(PersistentRequestModified persistentRequestModified) {
587                 for (FcpListener fcpListener: fcpListeners) {
588                         fcpListener.receivedPersistentRequestModified(this, persistentRequestModified);
589                 }
590         }
591
592         /**
593          * Notifies all listeners that a “PutSuccessful” message was received.
594          * 
595          * @see FcpListener#receivedPutSuccessful(FcpConnection, PutSuccessful)
596          * @param putSuccessful
597          *            The “PutSuccessful” message
598          */
599         private void fireReceivedPutSuccessful(PutSuccessful putSuccessful) {
600                 for (FcpListener fcpListener: fcpListeners) {
601                         fcpListener.receivedPutSuccessful(this, putSuccessful);
602                 }
603         }
604
605         /**
606          * Notifies all listeners that a “PutFetchable” message was received.
607          * 
608          * @see FcpListener#receivedPutFetchable(FcpConnection, PutFetchable)
609          * @param putFetchable
610          *            The “PutFetchable” message
611          */
612         private void fireReceivedPutFetchable(PutFetchable putFetchable) {
613                 for (FcpListener fcpListener: fcpListeners) {
614                         fcpListener.receivedPutFetchable(this, putFetchable);
615                 }
616         }
617
618         /**
619          * Notifies all listeners that a “ProtocolError” message was received.
620          * 
621          * @see FcpListener#receivedProtocolError(FcpConnection, ProtocolError)
622          * @param protocolError
623          *            The “ProtocolError” message
624          */
625         private void fireReceivedProtocolError(ProtocolError protocolError) {
626                 for (FcpListener fcpListener: fcpListeners) {
627                         fcpListener.receivedProtocolError(this, protocolError);
628                 }
629         }
630
631         /**
632          * Notifies all registered listeners that a message has been received.
633          * 
634          * @see FcpListener#receivedMessage(FcpConnection, FcpMessage)
635          * @param fcpMessage
636          *            The message that was received
637          */
638         private void fireMessageReceived(FcpMessage fcpMessage) {
639                 for (FcpListener fcpListener: fcpListeners) {
640                         fcpListener.receivedMessage(this, fcpMessage);
641                 }
642         }
643
644         /**
645          * Notifies all listeners that the connection to the node was closed.
646          * 
647          * @see FcpListener#connectionClosed(FcpConnection)
648          */
649         private void fireConnectionClosed() {
650                 for (FcpListener fcpListener: fcpListeners) {
651                         fcpListener.connectionClosed(this);
652                 }
653         }
654
655         //
656         // ACTIONS
657         //
658
659         /**
660          * Connects to the node.
661          * 
662          * @throws IOException
663          *             if an I/O error occurs
664          * @throws IllegalStateException
665          *             if there is already a connection to the node
666          */
667         public synchronized void connect() throws IOException, IllegalStateException {
668                 if (connectionHandler != null) {
669                         throw new IllegalStateException("already connected, disconnect first");
670                 }
671                 remoteSocket = new Socket(address, port);
672                 remoteInputStream = remoteSocket.getInputStream();
673                 remoteOutputStream = remoteSocket.getOutputStream();
674                 new Thread(connectionHandler = new FcpConnectionHandler(this, remoteInputStream)).start();
675         }
676
677         /**
678          * Disconnects from the node. If there is no connection to the node, this
679          * method does nothing.
680          */
681         public synchronized void disconnect() {
682                 if (connectionHandler == null) {
683                         return;
684                 }
685                 FcpUtils.close(remoteSocket);
686                 connectionHandler.stop();
687                 connectionHandler = null;
688         }
689
690         /**
691          * Sends the given FCP message.
692          * 
693          * @param fcpMessage
694          *            The FCP message to send
695          * @throws IOException
696          *             if an I/O error occurs
697          */
698         public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException {
699                 System.out.println("sending message: " + fcpMessage.getName());
700                 fcpMessage.write(remoteOutputStream);
701         }
702
703         //
704         // PACKAGE-PRIVATE METHODS
705         //
706
707         /**
708          * Handles the given message, notifying listeners. This message should only
709          * be called by {@link FcpConnectionHandler}.
710          * 
711          * @param fcpMessage
712          *            The received message
713          */
714         void handleMessage(FcpMessage fcpMessage) {
715                 String messageName = fcpMessage.getName();
716                 countMessage(messageName);
717                 if ("SimpleProgress".equals(messageName)) {
718                         fireReceivedSimpleProgress(new SimpleProgress(fcpMessage));
719                 } else if ("ProtocolError".equals(messageName)) {
720                         fireReceivedProtocolError(new ProtocolError(fcpMessage));
721                 } else if ("PersistentGet".equals(messageName)) {
722                         fireReceivedPersistentGet(new PersistentGet(fcpMessage));
723                 } else if ("PersistentPut".equals(messageName)) {
724                         fireReceivedPersistentPut(new PersistentPut(fcpMessage));
725                 } else if ("PersistentPutDir".equals(messageName)) {
726                         fireReceivedPersistentPutDir(new PersistentPutDir(fcpMessage));
727                 } else if ("URIGenerated".equals(messageName)) {
728                         fireReceivedURIGenerated(new URIGenerated(fcpMessage));
729                 } else if ("EndListPersistentRequests".equals(messageName)) {
730                         fireReceivedEndListPersistentRequests(new EndListPersistentRequests(fcpMessage));
731                 } else if ("Peer".equals(messageName)) {
732                         fireReceivedPeer(new Peer(fcpMessage));
733                 } else if ("PeerNote".equals(messageName)) {
734                         fireReceivedPeerNote(new PeerNote(fcpMessage));
735                 } else if ("StartedCompression".equals(messageName)) {
736                         fireReceivedStartedCompression(new StartedCompression(fcpMessage));
737                 } else if ("FinishedCompression".equals(messageName)) {
738                         fireReceivedFinishedCompression(new FinishedCompression(fcpMessage));
739                 } else if ("GetFailed".equals(messageName)) {
740                         fireReceivedGetFailed(new GetFailed(fcpMessage));
741                 } else if ("PutFetchable".equals(messageName)) {
742                         fireReceivedPutFetchable(new PutFetchable(fcpMessage));
743                 } else if ("PutSuccessful".equals(messageName)) {
744                         fireReceivedPutSuccessful(new PutSuccessful(fcpMessage));
745                 } else if ("PutFailed".equals(messageName)) {
746                         fireReceivedPutFailed(new PutFailed(fcpMessage));
747                 } else if ("DataFound".equals(messageName)) {
748                         fireReceivedDataFound(new DataFound(fcpMessage));
749                 } else if ("SubscribedUSKUpdate".equals(messageName)) {
750                         fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage));
751                 } else if ("IdentifierCollision".equals(messageName)) {
752                         fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage));
753                 } else if ("AllData".equals(messageName)) {
754                         LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
755                         fireReceivedAllData(new AllData(fcpMessage, payloadInputStream));
756                         try {
757                                 payloadInputStream.consume();
758                         } catch (IOException ioe1) {
759                                 /* well, ignore. when the connection handler fails, all fails. */
760                         }
761                 } else if ("EndListPeerNotes".equals(messageName)) {
762                         fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage));
763                 } else if ("EndListPeers".equals(messageName)) {
764                         fireReceivedEndListPeers(new EndListPeers(fcpMessage));
765                 } else if ("SSKKeypair".equals(messageName)) {
766                         fireReceivedSSKKeypair(new SSKKeypair(fcpMessage));
767                 } else if ("PeerRemoved".equals(messageName)) {
768                         fireReceivedPeerRemoved(new PeerRemoved(fcpMessage));
769                 } else if ("PersistentRequestModified".equals(messageName)) {
770                         fireReceivedPersistentRequestModified(new PersistentRequestModified(fcpMessage));
771                 } else if ("PersistentRequestRemoved".equals(messageName)) {
772                         fireReceivedPersistentRequestRemoved(new PersistentRequestRemoved(fcpMessage));
773                 } else if ("UnknownPeerNoteType".equals(messageName)) {
774                         fireReceivedUnknownPeerNoteType(new UnknownPeerNoteType(fcpMessage));
775                 } else if ("UnknownNodeIdentifier".equals(messageName)) {
776                         fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage));
777                 } else if ("FCPPluginReply".equals(messageName)) {
778                         LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
779                         fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream));
780                         try {
781                                 payloadInputStream.consume();
782                         } catch (IOException ioe1) {
783                                 /* ignore. */
784                         }
785                 } else if ("PluginInfo".equals(messageName)) {
786                         fireReceivedPluginInfo(new PluginInfo(fcpMessage));
787                 } else if ("NodeData".equals(messageName)) {
788                         fireReceivedNodeData(new NodeData(fcpMessage));
789                 } else if ("TestDDAReply".equals(messageName)) {
790                         fireReceivedTestDDAReply(new TestDDAReply(fcpMessage));
791                 } else if ("TestDDAComplete".equals(messageName)) {
792                         fireReceivedTestDDAComplete(new TestDDAComplete(fcpMessage));
793                 } else if ("ConfigData".equals(messageName)) {
794                         fireReceivedConfigData(new ConfigData(fcpMessage));
795                 } else if ("NodeHello".equals(messageName)) {
796                         fireReceivedNodeHello(new NodeHello(fcpMessage));
797                 } else if ("CloseConnectionDuplicateClientName".equals(messageName)) {
798                         fireReceivedCloseConnectionDuplicateClientName(new CloseConnectionDuplicateClientName(fcpMessage));
799                 } else {
800                         fireMessageReceived(fcpMessage);
801                 }
802         }
803
804         /**
805          * Handles a disconnect from the node.
806          */
807         synchronized void handleDisconnect() {
808                 FcpUtils.close(remoteInputStream);
809                 FcpUtils.close(remoteOutputStream);
810                 FcpUtils.close(remoteSocket);
811                 connectionHandler = null;
812                 fireConnectionClosed();
813         }
814
815         //
816         // PRIVATE METHODS
817         //
818
819         /**
820          * Incremets the counter in {@link #incomingMessageStatistics} by <cod>1</code>
821          * for the given message name.
822          * 
823          * @param name
824          *            The name of the message to count
825          */
826         private void countMessage(String name) {
827                 int oldValue = 0;
828                 if (incomingMessageStatistics.containsKey(name)) {
829                         oldValue = incomingMessageStatistics.get(name);
830                 }
831                 incomingMessageStatistics.put(name, oldValue + 1);
832         }
833
834         /**
835          * Returns a limited input stream from the node’s input stream.
836          * 
837          * @param dataLength
838          *            The length of the stream
839          * @return The limited input stream
840          */
841         private LimitedInputStream getInputStream(long dataLength) {
842                 if (dataLength <= 0) {
843                         return new LimitedInputStream(null, 0);
844                 }
845                 return new LimitedInputStream(remoteInputStream, dataLength);
846         }
847
848         /**
849          * A wrapper around an {@link InputStream} that only supplies a limit number
850          * of bytes from the underlying input stream.
851          * 
852          * @author <a href="mailto:dr@ina-germany.de">David Roden</a>
853          * @version $Id$
854          */
855         private static class LimitedInputStream extends FilterInputStream {
856
857                 /** The remaining number of bytes that can be read. */
858                 private long remaining;
859
860                 /**
861                  * Creates a new LimitedInputStream that supplies at most
862                  * <code>length</code> bytes from the given input stream.
863                  * 
864                  * @param inputStream
865                  *            The input stream
866                  * @param length
867                  *            The number of bytes to read
868                  */
869                 public LimitedInputStream(InputStream inputStream, long length) {
870                         super(inputStream);
871                         remaining = length;
872                 }
873
874                 /**
875                  * @see java.io.FilterInputStream#available()
876                  */
877                 @Override
878                 public synchronized int available() throws IOException {
879                         if (remaining == 0) {
880                                 return 0;
881                         }
882                         return (int) Math.min(super.available(), Math.min(Integer.MAX_VALUE, remaining));
883                 }
884
885                 /**
886                  * @see java.io.FilterInputStream#read()
887                  */
888                 @Override
889                 public synchronized int read() throws IOException {
890                         int read = -1;
891                         if (remaining > 0) {
892                                 read = super.read();
893                                 remaining--;
894                         }
895                         return read;
896                 }
897
898                 /**
899                  * @see java.io.FilterInputStream#read(byte[], int, int)
900                  */
901                 @Override
902                 public synchronized int read(byte[] b, int off, int len) throws IOException {
903                         if (remaining == 0) {
904                                 return -1;
905                         }
906                         int toCopy = (int) Math.min(len, Math.min(remaining, Integer.MAX_VALUE));
907                         int read = super.read(b, off, toCopy);
908                         remaining -= read;
909                         return read;
910                 }
911
912                 /**
913                  * @see java.io.FilterInputStream#skip(long)
914                  */
915                 @Override
916                 public synchronized long skip(long n) throws IOException {
917                         if ((n < 0) || (remaining == 0)) {
918                                 return 0;
919                         }
920                         long skipped = super.skip(Math.min(n, remaining));
921                         remaining -= skipped;
922                         return skipped;
923                 }
924
925                 /**
926                  * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
927                  * {@link #reset()} are not supported.
928                  * 
929                  * @see java.io.FilterInputStream#mark(int)
930                  */
931                 @Override
932                 public void mark(int readlimit) {
933                         /* do nothing. */
934                 }
935
936                 /**
937                  * {@inheritDoc}
938                  * 
939                  * @see java.io.FilterInputStream#markSupported()
940                  * @return <code>false</code>
941                  */
942                 @Override
943                 public boolean markSupported() {
944                         return false;
945                 }
946
947                 /**
948                  * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
949                  * {@link #reset()} are not supported.
950                  * 
951                  * @see java.io.FilterInputStream#reset()
952                  */
953                 @Override
954                 public void reset() throws IOException {
955                         /* do nothing. */
956                 }
957
958                 /**
959                  * Consumes the input stream, i.e. read all bytes until the limit is
960                  * reached.
961                  * 
962                  * @throws IOException
963                  *             if an I/O error occurs
964                  */
965                 public void consume() throws IOException {
966                         while (remaining > 0) {
967                                 skip(remaining);
968                         }
969                 }
970
971         }
972
973 }