add some logging
[jFCPlib.git] / src / net / pterodactylus / fcp / FcpConnection.java
1 /*
2  * jSite2 - FpcConnection.java -
3  * Copyright © 2008 David Roden
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18  */
19
20 package net.pterodactylus.fcp;
21
22 import java.io.FilterInputStream;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.OutputStream;
26 import java.net.InetAddress;
27 import java.net.Socket;
28 import java.net.UnknownHostException;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.HashMap;
32 import java.util.List;
33 import java.util.Map;
34 import java.util.logging.Logger;
35
36 /**
37  * An FCP connection to a Freenet node.
38  * 
39  * @author David ‘Bombe’ Roden <bombe@freenetproject.org>
40  * @version $Id$
41  */
42 public class FcpConnection {
43
44         /** Logger. */
45         private static final Logger logger = Logger.getLogger(FcpConnection.class.getName());
46
47         /** The default port for FCP v2. */
48         public static final int DEFAULT_PORT = 9481;
49
50         /** The list of FCP listeners. */
51         private final List<FcpListener> fcpListeners = new ArrayList<FcpListener>();
52
53         /** The address of the node. */
54         private final InetAddress address;
55
56         /** The port number of the node’s FCP port. */
57         private final int port;
58
59         /** The remote socket. */
60         private Socket remoteSocket;
61
62         /** The input stream from the node. */
63         private InputStream remoteInputStream;
64
65         /** The output stream to the node. */
66         private OutputStream remoteOutputStream;
67
68         /** The connection handler. */
69         private FcpConnectionHandler connectionHandler;
70
71         /** Incoming message statistics. */
72         private Map<String, Integer> incomingMessageStatistics = Collections.synchronizedMap(new HashMap<String, Integer>());
73
74         /**
75          * Creates a new FCP connection to the freenet node running on localhost,
76          * using the default port.
77          * 
78          * @throws UnknownHostException
79          *             if the hostname can not be resolved
80          */
81         public FcpConnection() throws UnknownHostException {
82                 this(InetAddress.getLocalHost());
83         }
84
85         /**
86          * Creates a new FCP connection to the Freenet node running on the given
87          * host, listening on the default port.
88          * 
89          * @param host
90          *            The hostname of the Freenet node
91          * @throws UnknownHostException
92          *             if <code>host</code> can not be resolved
93          */
94         public FcpConnection(String host) throws UnknownHostException {
95                 this(host, DEFAULT_PORT);
96         }
97
98         /**
99          * Creates a new FCP connection to the Freenet node running on the given
100          * host, listening on the given port.
101          * 
102          * @param host
103          *            The hostname of the Freenet node
104          * @param port
105          *            The port number of the node’s FCP port
106          * @throws UnknownHostException
107          *             if <code>host</code> can not be resolved
108          */
109         public FcpConnection(String host, int port) throws UnknownHostException {
110                 this(InetAddress.getByName(host), port);
111         }
112
113         /**
114          * Creates a new FCP connection to the Freenet node running at the given
115          * address, listening on the default port.
116          * 
117          * @param address
118          *            The address of the Freenet node
119          */
120         public FcpConnection(InetAddress address) {
121                 this(address, DEFAULT_PORT);
122         }
123
124         /**
125          * Creates a new FCP connection to the Freenet node running at the given
126          * address, listening on the given port.
127          * 
128          * @param address
129          *            The address of the Freenet node
130          * @param port
131          *            The port number of the node’s FCP port
132          */
133         public FcpConnection(InetAddress address, int port) {
134                 this.address = address;
135                 this.port = port;
136         }
137
138         //
139         // LISTENER MANAGEMENT
140         //
141
142         /**
143          * Adds the given listener to the list of listeners.
144          * 
145          * @param fcpListener
146          *            The listener to add
147          */
148         public void addFcpListener(FcpListener fcpListener) {
149                 fcpListeners.add(fcpListener);
150         }
151
152         /**
153          * Removes the given listener from the list of listeners.
154          * 
155          * @param fcpListener
156          *            The listener to remove
157          */
158         public void removeFcpListener(FcpListener fcpListener) {
159                 fcpListeners.remove(fcpListener);
160         }
161
162         /**
163          * Notifies listeners that a “NodeHello” message was received.
164          * 
165          * @see FcpListener#receivedNodeHello(FcpConnection, NodeHello)
166          * @param nodeHello
167          *            The “NodeHello” message
168          */
169         private void fireReceivedNodeHello(NodeHello nodeHello) {
170                 for (FcpListener fcpListener: fcpListeners) {
171                         fcpListener.receivedNodeHello(this, nodeHello);
172                 }
173         }
174
175         /**
176          * Notifies listeners that a “CloseConnectionDuplicateClientName” message
177          * was received.
178          * 
179          * @see FcpListener#receivedCloseConnectionDuplicateClientName(FcpConnection,
180          *      CloseConnectionDuplicateClientName)
181          * @param closeConnectionDuplicateClientName
182          *            The “CloseConnectionDuplicateClientName” message
183          */
184         private void fireReceivedCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
185                 for (FcpListener fcpListener: fcpListeners) {
186                         fcpListener.receivedCloseConnectionDuplicateClientName(this, closeConnectionDuplicateClientName);
187                 }
188         }
189
190         /**
191          * Notifies listeners that a “SSKKeypair” message was received.
192          * 
193          * @see FcpListener#receivedSSKKeypair(FcpConnection, SSKKeypair)
194          * @param sskKeypair
195          *            The “SSKKeypair” message
196          */
197         private void fireReceivedSSKKeypair(SSKKeypair sskKeypair) {
198                 for (FcpListener fcpListener: fcpListeners) {
199                         fcpListener.receivedSSKKeypair(this, sskKeypair);
200                 }
201         }
202
203         /**
204          * Notifies listeners that a “Peer” message was received.
205          * 
206          * @see FcpListener#receivedPeer(FcpConnection, Peer)
207          * @param peer
208          *            The “Peer” message
209          */
210         private void fireReceivedPeer(Peer peer) {
211                 for (FcpListener fcpListener: fcpListeners) {
212                         fcpListener.receivedPeer(this, peer);
213                 }
214         }
215
216         /**
217          * Notifies all listeners that an “EndListPeers” message was received.
218          * 
219          * @see FcpListener#receivedEndListPeers(FcpConnection, EndListPeers)
220          * @param endListPeers
221          *            The “EndListPeers” message
222          */
223         private void fireReceivedEndListPeers(EndListPeers endListPeers) {
224                 for (FcpListener fcpListener: fcpListeners) {
225                         fcpListener.receivedEndListPeers(this, endListPeers);
226                 }
227         }
228
229         /**
230          * Notifies all listeners that a “PeerNote” message was received.
231          * 
232          * @see FcpListener#receivedPeerNote(FcpConnection, PeerNote)
233          * @param peerNote
234          */
235         private void fireReceivedPeerNote(PeerNote peerNote) {
236                 for (FcpListener fcpListener: fcpListeners) {
237                         fcpListener.receivedPeerNote(this, peerNote);
238                 }
239         }
240
241         /**
242          * Notifies all listeners that an “EndListPeerNotes” message was received.
243          * 
244          * @see FcpListener#receivedEndListPeerNotes(FcpConnection,
245          *      EndListPeerNotes)
246          * @param endListPeerNotes
247          *            The “EndListPeerNotes” message
248          */
249         private void fireReceivedEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
250                 for (FcpListener fcpListener: fcpListeners) {
251                         fcpListener.receivedEndListPeerNotes(this, endListPeerNotes);
252                 }
253         }
254
255         /**
256          * Notifies all listeners that a “PeerRemoved” message was received.
257          * 
258          * @see FcpListener#receivedPeerRemoved(FcpConnection, PeerRemoved)
259          * @param peerRemoved
260          *            The “PeerRemoved” message
261          */
262         private void fireReceivedPeerRemoved(PeerRemoved peerRemoved) {
263                 for (FcpListener fcpListener: fcpListeners) {
264                         fcpListener.receivedPeerRemoved(this, peerRemoved);
265                 }
266         }
267
268         /**
269          * Notifies all listeners that a “NodeData” message was received.
270          * 
271          * @see FcpListener#receivedNodeData(FcpConnection, NodeData)
272          * @param nodeData
273          *            The “NodeData” message
274          */
275         private void fireReceivedNodeData(NodeData nodeData) {
276                 for (FcpListener fcpListener: fcpListeners) {
277                         fcpListener.receivedNodeData(this, nodeData);
278                 }
279         }
280
281         /**
282          * Notifies all listeners that a “TestDDAReply” message was received.
283          * 
284          * @see FcpListener#receivedTestDDAReply(FcpConnection, TestDDAReply)
285          * @param testDDAReply
286          *            The “TestDDAReply” message
287          */
288         private void fireReceivedTestDDAReply(TestDDAReply testDDAReply) {
289                 for (FcpListener fcpListener: fcpListeners) {
290                         fcpListener.receivedTestDDAReply(this, testDDAReply);
291                 }
292         }
293
294         /**
295          * Notifies all listeners that a “TestDDAComplete” message was received.
296          * 
297          * @see FcpListener#receivedTestDDAComplete(FcpConnection, TestDDAComplete)
298          * @param testDDAComplete
299          *            The “TestDDAComplete” message
300          */
301         private void fireReceivedTestDDAComplete(TestDDAComplete testDDAComplete) {
302                 for (FcpListener fcpListener: fcpListeners) {
303                         fcpListener.receivedTestDDAComplete(this, testDDAComplete);
304                 }
305         }
306
307         /**
308          * Notifies all listeners that a “PersistentGet” message was received.
309          * 
310          * @see FcpListener#receivedPersistentGet(FcpConnection, PersistentGet)
311          * @param persistentGet
312          *            The “PersistentGet” message
313          */
314         private void fireReceivedPersistentGet(PersistentGet persistentGet) {
315                 for (FcpListener fcpListener: fcpListeners) {
316                         fcpListener.receivedPersistentGet(this, persistentGet);
317                 }
318         }
319
320         /**
321          * Notifies all listeners that a “PersistentPut” message was received.
322          * 
323          * @see FcpListener#receivedPersistentPut(FcpConnection, PersistentPut)
324          * @param persistentPut
325          *            The “PersistentPut” message
326          */
327         private void fireReceivedPersistentPut(PersistentPut persistentPut) {
328                 for (FcpListener fcpListener: fcpListeners) {
329                         fcpListener.receivedPersistentPut(this, persistentPut);
330                 }
331         }
332
333         /**
334          * Notifies all listeners that a “EndListPersistentRequests” message was
335          * received.
336          * 
337          * @see FcpListener#receivedEndListPersistentRequests(FcpConnection,
338          *      EndListPersistentRequests)
339          * @param endListPersistentRequests
340          *            The “EndListPersistentRequests” message
341          */
342         private void fireReceivedEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
343                 for (FcpListener fcpListener: fcpListeners) {
344                         fcpListener.receivedEndListPersistentRequests(this, endListPersistentRequests);
345                 }
346         }
347
348         /**
349          * Notifies all listeners that a “URIGenerated” message was received.
350          * 
351          * @see FcpListener#receivedURIGenerated(FcpConnection, URIGenerated)
352          * @param uriGenerated
353          *            The “URIGenerated” message
354          */
355         private void fireReceivedURIGenerated(URIGenerated uriGenerated) {
356                 for (FcpListener fcpListener: fcpListeners) {
357                         fcpListener.receivedURIGenerated(this, uriGenerated);
358                 }
359         }
360
361         /**
362          * Notifies all listeners that a “DataFound” message was received.
363          * 
364          * @see FcpListener#receivedDataFound(FcpConnection, DataFound)
365          * @param dataFound
366          *            The “DataFound” message
367          */
368         private void fireReceivedDataFound(DataFound dataFound) {
369                 for (FcpListener fcpListener: fcpListeners) {
370                         fcpListener.receivedDataFound(this, dataFound);
371                 }
372         }
373
374         /**
375          * Notifies all listeners that an “AllData” message was received.
376          * 
377          * @see FcpListener#receivedAllData(FcpConnection, AllData)
378          * @param allData
379          *            The “AllData” message
380          */
381         private void fireReceivedAllData(AllData allData) {
382                 for (FcpListener fcpListener: fcpListeners) {
383                         fcpListener.receivedAllData(this, allData);
384                 }
385         }
386
387         /**
388          * Notifies all listeners that a “SimpleProgress” message was received.
389          * 
390          * @see FcpListener#receivedSimpleProgress(FcpConnection, SimpleProgress)
391          * @param simpleProgress
392          *            The “SimpleProgress” message
393          */
394         private void fireReceivedSimpleProgress(SimpleProgress simpleProgress) {
395                 for (FcpListener fcpListener: fcpListeners) {
396                         fcpListener.receivedSimpleProgress(this, simpleProgress);
397                 }
398         }
399
400         /**
401          * Notifies all listeners that a “StartedCompression” message was received.
402          * 
403          * @see FcpListener#receivedStartedCompression(FcpConnection,
404          *      StartedCompression)
405          * @param startedCompression
406          *            The “StartedCompression” message
407          */
408         private void fireReceivedStartedCompression(StartedCompression startedCompression) {
409                 for (FcpListener fcpListener: fcpListeners) {
410                         fcpListener.receivedStartedCompression(this, startedCompression);
411                 }
412         }
413
414         /**
415          * Notifies all listeners that a “FinishedCompression” message was received.
416          * 
417          * @see FcpListener#receviedFinishedCompression(FcpConnection,
418          *      FinishedCompression)
419          * @param finishedCompression
420          *            The “FinishedCompression” message
421          */
422         private void fireReceivedFinishedCompression(FinishedCompression finishedCompression) {
423                 for (FcpListener fcpListener: fcpListeners) {
424                         fcpListener.receviedFinishedCompression(this, finishedCompression);
425                 }
426         }
427
428         /**
429          * Notifies all listeners that an “UnknownPeerNoteType” message was
430          * received.
431          * 
432          * @see FcpListener#receivedUnknownPeerNoteType(FcpConnection,
433          *      UnknownPeerNoteType)
434          * @param unknownPeerNoteType
435          *            The “UnknownPeerNoteType” message
436          */
437         private void fireReceivedUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
438                 for (FcpListener fcpListener: fcpListeners) {
439                         fcpListener.receivedUnknownPeerNoteType(this, unknownPeerNoteType);
440                 }
441         }
442
443         /**
444          * Notifies all listeners that an “UnknownNodeIdentifier” message was
445          * received.
446          * 
447          * @see FcpListener#receivedUnknownNodeIdentifier(FcpConnection,
448          *      UnknownNodeIdentifier)
449          * @param unknownNodeIdentifier
450          *            The “UnknownNodeIdentifier” message
451          */
452         private void fireReceivedUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
453                 for (FcpListener fcpListener: fcpListeners) {
454                         fcpListener.receivedUnknownNodeIdentifier(this, unknownNodeIdentifier);
455                 }
456         }
457
458         /**
459          * Notifies all listeners that a “ConfigData” message was received.
460          * 
461          * @see FcpListener#receivedConfigData(FcpConnection, ConfigData)
462          * @param configData
463          *            The “ConfigData” message
464          */
465         private void fireReceivedConfigData(ConfigData configData) {
466                 for (FcpListener fcpListener: fcpListeners) {
467                         fcpListener.receivedConfigData(this, configData);
468                 }
469         }
470
471         /**
472          * Notifies all listeners that a “GetFailed” message was received.
473          * 
474          * @see FcpListener#receivedGetFailed(FcpConnection, GetFailed)
475          * @param getFailed
476          *            The “GetFailed” message
477          */
478         private void fireReceivedGetFailed(GetFailed getFailed) {
479                 for (FcpListener fcpListener: fcpListeners) {
480                         fcpListener.receivedGetFailed(this, getFailed);
481                 }
482         }
483
484         /**
485          * Notifies all listeners that a “PutFailed” message was received.
486          * 
487          * @see FcpListener#receivedPutFailed(FcpConnection, PutFailed)
488          * @param putFailed
489          *            The “PutFailed” message
490          */
491         private void fireReceivedPutFailed(PutFailed putFailed) {
492                 for (FcpListener fcpListener: fcpListeners) {
493                         fcpListener.receivedPutFailed(this, putFailed);
494                 }
495         }
496
497         /**
498          * Notifies all listeners that an “IdentifierCollision” message was
499          * received.
500          * 
501          * @see FcpListener#receivedIdentifierCollision(FcpConnection,
502          *      IdentifierCollision)
503          * @param identifierCollision
504          *            The “IdentifierCollision” message
505          */
506         private void fireReceivedIdentifierCollision(IdentifierCollision identifierCollision) {
507                 for (FcpListener fcpListener: fcpListeners) {
508                         fcpListener.receivedIdentifierCollision(this, identifierCollision);
509                 }
510         }
511
512         /**
513          * Notifies all listeners that an “PersistentPutDir” message was received.
514          * 
515          * @see FcpListener#receivedPersistentPutDir(FcpConnection,
516          *      PersistentPutDir)
517          * @param persistentPutDir
518          *            The “PersistentPutDir” message
519          */
520         private void fireReceivedPersistentPutDir(PersistentPutDir persistentPutDir) {
521                 for (FcpListener fcpListener: fcpListeners) {
522                         fcpListener.receivedPersistentPutDir(this, persistentPutDir);
523                 }
524         }
525
526         /**
527          * Notifies all listeners that a “PersistentRequestRemoved” message was
528          * received.
529          * 
530          * @see FcpListener#receivedPersistentRequestRemoved(FcpConnection,
531          *      PersistentRequestRemoved)
532          * @param persistentRequestRemoved
533          *            The “PersistentRequestRemoved” message
534          */
535         private void fireReceivedPersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) {
536                 for (FcpListener fcpListener: fcpListeners) {
537                         fcpListener.receivedPersistentRequestRemoved(this, persistentRequestRemoved);
538                 }
539         }
540
541         /**
542          * Notifies all listeners that a “SubscribedUSKUpdate” message was received.
543          * 
544          * @see FcpListener#receivedSubscribedUSKUpdate(FcpConnection,
545          *      SubscribedUSKUpdate)
546          * @param subscribedUSKUpdate
547          *            The “SubscribedUSKUpdate” message
548          */
549         private void fireReceivedSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) {
550                 for (FcpListener fcpListener: fcpListeners) {
551                         fcpListener.receivedSubscribedUSKUpdate(this, subscribedUSKUpdate);
552                 }
553         }
554
555         /**
556          * Notifies all listeners that a “PluginInfo” message was received.
557          * 
558          * @see FcpListener#receivedPluginInfo(FcpConnection, PluginInfo)
559          * @param pluginInfo
560          *            The “PluginInfo” message
561          */
562         private void fireReceivedPluginInfo(PluginInfo pluginInfo) {
563                 for (FcpListener fcpListener: fcpListeners) {
564                         fcpListener.receivedPluginInfo(this, pluginInfo);
565                 }
566         }
567
568         /**
569          * Notifies all listeners that an “FCPPluginReply” message was received.
570          * 
571          * @see FcpListener#receivedFCPPluginReply(FcpConnection, FCPPluginReply)
572          * @param fcpPluginReply
573          *            The “FCPPluginReply” message
574          */
575         private void fireReceivedFCPPluginReply(FCPPluginReply fcpPluginReply) {
576                 for (FcpListener fcpListener: fcpListeners) {
577                         fcpListener.receivedFCPPluginReply(this, fcpPluginReply);
578                 }
579         }
580
581         /**
582          * Notifies all listeners that a “PersistentRequestModified” message was
583          * received.
584          * 
585          * @see FcpListener#receivedPersistentRequestModified(FcpConnection,
586          *      PersistentRequestModified)
587          * @param persistentRequestModified
588          *            The “PersistentRequestModified” message
589          */
590         private void fireReceivedPersistentRequestModified(PersistentRequestModified persistentRequestModified) {
591                 for (FcpListener fcpListener: fcpListeners) {
592                         fcpListener.receivedPersistentRequestModified(this, persistentRequestModified);
593                 }
594         }
595
596         /**
597          * Notifies all listeners that a “PutSuccessful” message was received.
598          * 
599          * @see FcpListener#receivedPutSuccessful(FcpConnection, PutSuccessful)
600          * @param putSuccessful
601          *            The “PutSuccessful” message
602          */
603         private void fireReceivedPutSuccessful(PutSuccessful putSuccessful) {
604                 for (FcpListener fcpListener: fcpListeners) {
605                         fcpListener.receivedPutSuccessful(this, putSuccessful);
606                 }
607         }
608
609         /**
610          * Notifies all listeners that a “PutFetchable” message was received.
611          * 
612          * @see FcpListener#receivedPutFetchable(FcpConnection, PutFetchable)
613          * @param putFetchable
614          *            The “PutFetchable” message
615          */
616         private void fireReceivedPutFetchable(PutFetchable putFetchable) {
617                 for (FcpListener fcpListener: fcpListeners) {
618                         fcpListener.receivedPutFetchable(this, putFetchable);
619                 }
620         }
621
622         /**
623          * Notifies all listeners that a “ProtocolError” message was received.
624          * 
625          * @see FcpListener#receivedProtocolError(FcpConnection, ProtocolError)
626          * @param protocolError
627          *            The “ProtocolError” message
628          */
629         private void fireReceivedProtocolError(ProtocolError protocolError) {
630                 for (FcpListener fcpListener: fcpListeners) {
631                         fcpListener.receivedProtocolError(this, protocolError);
632                 }
633         }
634
635         /**
636          * Notifies all registered listeners that a message has been received.
637          * 
638          * @see FcpListener#receivedMessage(FcpConnection, FcpMessage)
639          * @param fcpMessage
640          *            The message that was received
641          */
642         private void fireMessageReceived(FcpMessage fcpMessage) {
643                 for (FcpListener fcpListener: fcpListeners) {
644                         fcpListener.receivedMessage(this, fcpMessage);
645                 }
646         }
647
648         /**
649          * Notifies all listeners that the connection to the node was closed.
650          * 
651          * @see FcpListener#connectionClosed(FcpConnection)
652          */
653         private void fireConnectionClosed() {
654                 for (FcpListener fcpListener: fcpListeners) {
655                         fcpListener.connectionClosed(this);
656                 }
657         }
658
659         //
660         // ACTIONS
661         //
662
663         /**
664          * Connects to the node.
665          * 
666          * @throws IOException
667          *             if an I/O error occurs
668          * @throws IllegalStateException
669          *             if there is already a connection to the node
670          */
671         public synchronized void connect() throws IOException, IllegalStateException {
672                 if (connectionHandler != null) {
673                         throw new IllegalStateException("already connected, disconnect first");
674                 }
675                 logger.info("connecting to " + address + ":" + port + "…");
676                 remoteSocket = new Socket(address, port);
677                 remoteInputStream = remoteSocket.getInputStream();
678                 remoteOutputStream = remoteSocket.getOutputStream();
679                 new Thread(connectionHandler = new FcpConnectionHandler(this, remoteInputStream)).start();
680         }
681
682         /**
683          * Disconnects from the node. If there is no connection to the node, this
684          * method does nothing.
685          */
686         public synchronized void disconnect() {
687                 if (connectionHandler == null) {
688                         return;
689                 }
690                 logger.info("disconnecting…");
691                 FcpUtils.close(remoteSocket);
692                 connectionHandler.stop();
693                 connectionHandler = null;
694                 fireConnectionClosed();
695         }
696
697         /**
698          * Sends the given FCP message.
699          * 
700          * @param fcpMessage
701          *            The FCP message to send
702          * @throws IOException
703          *             if an I/O error occurs
704          */
705         public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException {
706                 logger.fine("sending message: " + fcpMessage.getName());
707                 fcpMessage.write(remoteOutputStream);
708         }
709
710         //
711         // PACKAGE-PRIVATE METHODS
712         //
713
714         /**
715          * Handles the given message, notifying listeners. This message should only
716          * be called by {@link FcpConnectionHandler}.
717          * 
718          * @param fcpMessage
719          *            The received message
720          */
721         void handleMessage(FcpMessage fcpMessage) {
722                 logger.fine("received message: " + fcpMessage.getName());
723                 String messageName = fcpMessage.getName();
724                 countMessage(messageName);
725                 if ("SimpleProgress".equals(messageName)) {
726                         fireReceivedSimpleProgress(new SimpleProgress(fcpMessage));
727                 } else if ("ProtocolError".equals(messageName)) {
728                         fireReceivedProtocolError(new ProtocolError(fcpMessage));
729                 } else if ("PersistentGet".equals(messageName)) {
730                         fireReceivedPersistentGet(new PersistentGet(fcpMessage));
731                 } else if ("PersistentPut".equals(messageName)) {
732                         fireReceivedPersistentPut(new PersistentPut(fcpMessage));
733                 } else if ("PersistentPutDir".equals(messageName)) {
734                         fireReceivedPersistentPutDir(new PersistentPutDir(fcpMessage));
735                 } else if ("URIGenerated".equals(messageName)) {
736                         fireReceivedURIGenerated(new URIGenerated(fcpMessage));
737                 } else if ("EndListPersistentRequests".equals(messageName)) {
738                         fireReceivedEndListPersistentRequests(new EndListPersistentRequests(fcpMessage));
739                 } else if ("Peer".equals(messageName)) {
740                         fireReceivedPeer(new Peer(fcpMessage));
741                 } else if ("PeerNote".equals(messageName)) {
742                         fireReceivedPeerNote(new PeerNote(fcpMessage));
743                 } else if ("StartedCompression".equals(messageName)) {
744                         fireReceivedStartedCompression(new StartedCompression(fcpMessage));
745                 } else if ("FinishedCompression".equals(messageName)) {
746                         fireReceivedFinishedCompression(new FinishedCompression(fcpMessage));
747                 } else if ("GetFailed".equals(messageName)) {
748                         fireReceivedGetFailed(new GetFailed(fcpMessage));
749                 } else if ("PutFetchable".equals(messageName)) {
750                         fireReceivedPutFetchable(new PutFetchable(fcpMessage));
751                 } else if ("PutSuccessful".equals(messageName)) {
752                         fireReceivedPutSuccessful(new PutSuccessful(fcpMessage));
753                 } else if ("PutFailed".equals(messageName)) {
754                         fireReceivedPutFailed(new PutFailed(fcpMessage));
755                 } else if ("DataFound".equals(messageName)) {
756                         fireReceivedDataFound(new DataFound(fcpMessage));
757                 } else if ("SubscribedUSKUpdate".equals(messageName)) {
758                         fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage));
759                 } else if ("IdentifierCollision".equals(messageName)) {
760                         fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage));
761                 } else if ("AllData".equals(messageName)) {
762                         LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
763                         fireReceivedAllData(new AllData(fcpMessage, payloadInputStream));
764                         try {
765                                 payloadInputStream.consume();
766                         } catch (IOException ioe1) {
767                                 /* well, ignore. when the connection handler fails, all fails. */
768                         }
769                 } else if ("EndListPeerNotes".equals(messageName)) {
770                         fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage));
771                 } else if ("EndListPeers".equals(messageName)) {
772                         fireReceivedEndListPeers(new EndListPeers(fcpMessage));
773                 } else if ("SSKKeypair".equals(messageName)) {
774                         fireReceivedSSKKeypair(new SSKKeypair(fcpMessage));
775                 } else if ("PeerRemoved".equals(messageName)) {
776                         fireReceivedPeerRemoved(new PeerRemoved(fcpMessage));
777                 } else if ("PersistentRequestModified".equals(messageName)) {
778                         fireReceivedPersistentRequestModified(new PersistentRequestModified(fcpMessage));
779                 } else if ("PersistentRequestRemoved".equals(messageName)) {
780                         fireReceivedPersistentRequestRemoved(new PersistentRequestRemoved(fcpMessage));
781                 } else if ("UnknownPeerNoteType".equals(messageName)) {
782                         fireReceivedUnknownPeerNoteType(new UnknownPeerNoteType(fcpMessage));
783                 } else if ("UnknownNodeIdentifier".equals(messageName)) {
784                         fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage));
785                 } else if ("FCPPluginReply".equals(messageName)) {
786                         LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
787                         fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream));
788                         try {
789                                 payloadInputStream.consume();
790                         } catch (IOException ioe1) {
791                                 /* ignore. */
792                         }
793                 } else if ("PluginInfo".equals(messageName)) {
794                         fireReceivedPluginInfo(new PluginInfo(fcpMessage));
795                 } else if ("NodeData".equals(messageName)) {
796                         fireReceivedNodeData(new NodeData(fcpMessage));
797                 } else if ("TestDDAReply".equals(messageName)) {
798                         fireReceivedTestDDAReply(new TestDDAReply(fcpMessage));
799                 } else if ("TestDDAComplete".equals(messageName)) {
800                         fireReceivedTestDDAComplete(new TestDDAComplete(fcpMessage));
801                 } else if ("ConfigData".equals(messageName)) {
802                         fireReceivedConfigData(new ConfigData(fcpMessage));
803                 } else if ("NodeHello".equals(messageName)) {
804                         fireReceivedNodeHello(new NodeHello(fcpMessage));
805                 } else if ("CloseConnectionDuplicateClientName".equals(messageName)) {
806                         fireReceivedCloseConnectionDuplicateClientName(new CloseConnectionDuplicateClientName(fcpMessage));
807                 } else {
808                         fireMessageReceived(fcpMessage);
809                 }
810         }
811
812         /**
813          * Handles a disconnect from the node.
814          */
815         synchronized void handleDisconnect() {
816                 FcpUtils.close(remoteInputStream);
817                 FcpUtils.close(remoteOutputStream);
818                 FcpUtils.close(remoteSocket);
819                 connectionHandler = null;
820                 fireConnectionClosed();
821         }
822
823         //
824         // PRIVATE METHODS
825         //
826
827         /**
828          * Incremets the counter in {@link #incomingMessageStatistics} by <cod>1</code>
829          * for the given message name.
830          * 
831          * @param name
832          *            The name of the message to count
833          */
834         private void countMessage(String name) {
835                 int oldValue = 0;
836                 if (incomingMessageStatistics.containsKey(name)) {
837                         oldValue = incomingMessageStatistics.get(name);
838                 }
839                 incomingMessageStatistics.put(name, oldValue + 1);
840                 logger.finest("count for " + name + ": " + (oldValue + 1));
841         }
842
843         /**
844          * Returns a limited input stream from the node’s input stream.
845          * 
846          * @param dataLength
847          *            The length of the stream
848          * @return The limited input stream
849          */
850         private LimitedInputStream getInputStream(long dataLength) {
851                 if (dataLength <= 0) {
852                         return new LimitedInputStream(null, 0);
853                 }
854                 return new LimitedInputStream(remoteInputStream, dataLength);
855         }
856
857         /**
858          * A wrapper around an {@link InputStream} that only supplies a limit number
859          * of bytes from the underlying input stream.
860          * 
861          * @author <a href="mailto:dr@ina-germany.de">David Roden</a>
862          * @version $Id$
863          */
864         private static class LimitedInputStream extends FilterInputStream {
865
866                 /** The remaining number of bytes that can be read. */
867                 private long remaining;
868
869                 /**
870                  * Creates a new LimitedInputStream that supplies at most
871                  * <code>length</code> bytes from the given input stream.
872                  * 
873                  * @param inputStream
874                  *            The input stream
875                  * @param length
876                  *            The number of bytes to read
877                  */
878                 public LimitedInputStream(InputStream inputStream, long length) {
879                         super(inputStream);
880                         remaining = length;
881                 }
882
883                 /**
884                  * @see java.io.FilterInputStream#available()
885                  */
886                 @Override
887                 public synchronized int available() throws IOException {
888                         if (remaining == 0) {
889                                 return 0;
890                         }
891                         return (int) Math.min(super.available(), Math.min(Integer.MAX_VALUE, remaining));
892                 }
893
894                 /**
895                  * @see java.io.FilterInputStream#read()
896                  */
897                 @Override
898                 public synchronized int read() throws IOException {
899                         int read = -1;
900                         if (remaining > 0) {
901                                 read = super.read();
902                                 remaining--;
903                         }
904                         return read;
905                 }
906
907                 /**
908                  * @see java.io.FilterInputStream#read(byte[], int, int)
909                  */
910                 @Override
911                 public synchronized int read(byte[] b, int off, int len) throws IOException {
912                         if (remaining == 0) {
913                                 return -1;
914                         }
915                         int toCopy = (int) Math.min(len, Math.min(remaining, Integer.MAX_VALUE));
916                         int read = super.read(b, off, toCopy);
917                         remaining -= read;
918                         return read;
919                 }
920
921                 /**
922                  * @see java.io.FilterInputStream#skip(long)
923                  */
924                 @Override
925                 public synchronized long skip(long n) throws IOException {
926                         if ((n < 0) || (remaining == 0)) {
927                                 return 0;
928                         }
929                         long skipped = super.skip(Math.min(n, remaining));
930                         remaining -= skipped;
931                         return skipped;
932                 }
933
934                 /**
935                  * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
936                  * {@link #reset()} are not supported.
937                  * 
938                  * @see java.io.FilterInputStream#mark(int)
939                  */
940                 @Override
941                 public void mark(int readlimit) {
942                         /* do nothing. */
943                 }
944
945                 /**
946                  * {@inheritDoc}
947                  * 
948                  * @see java.io.FilterInputStream#markSupported()
949                  * @return <code>false</code>
950                  */
951                 @Override
952                 public boolean markSupported() {
953                         return false;
954                 }
955
956                 /**
957                  * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
958                  * {@link #reset()} are not supported.
959                  * 
960                  * @see java.io.FilterInputStream#reset()
961                  */
962                 @Override
963                 public void reset() throws IOException {
964                         /* do nothing. */
965                 }
966
967                 /**
968                  * Consumes the input stream, i.e. read all bytes until the limit is
969                  * reached.
970                  * 
971                  * @throws IOException
972                  *             if an I/O error occurs
973                  */
974                 public void consume() throws IOException {
975                         while (remaining > 0) {
976                                 skip(remaining);
977                         }
978                 }
979
980         }
981
982 }