fix email address
[jFCPlib.git] / src / net / pterodactylus / fcp / FcpConnection.java
1 /*
2  * jSite2 - FpcConnection.java -
3  * Copyright © 2008 David Roden
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18  */
19
20 package net.pterodactylus.fcp;
21
22 import java.io.Closeable;
23 import java.io.FilterInputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.OutputStream;
27 import java.net.InetAddress;
28 import java.net.Socket;
29 import java.net.UnknownHostException;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.logging.Logger;
36
37 /**
38  * An FCP connection to a Freenet node.
39  * 
40  * @author David ‘Bombe’ Roden <bombe@freenetproject.org>
41  */
42 public class FcpConnection implements Closeable {
43
44         /** Logger. */
45         private static final Logger logger = Logger.getLogger(FcpConnection.class.getName());
46
47         /** The default port for FCP v2. */
48         public static final int DEFAULT_PORT = 9481;
49
50         /** The list of FCP listeners. */
51         private final List<FcpListener> fcpListeners = new ArrayList<FcpListener>();
52
53         /** The address of the node. */
54         private final InetAddress address;
55
56         /** The port number of the node’s FCP port. */
57         private final int port;
58
59         /** The remote socket. */
60         private Socket remoteSocket;
61
62         /** The input stream from the node. */
63         private InputStream remoteInputStream;
64
65         /** The output stream to the node. */
66         private OutputStream remoteOutputStream;
67
68         /** The connection handler. */
69         private FcpConnectionHandler connectionHandler;
70
71         /** Incoming message statistics. */
72         private Map<String, Integer> incomingMessageStatistics = Collections.synchronizedMap(new HashMap<String, Integer>());
73
74         /**
75          * Creates a new FCP connection to the freenet node running on localhost,
76          * using the default port.
77          * 
78          * @throws UnknownHostException
79          *             if the hostname can not be resolved
80          */
81         public FcpConnection() throws UnknownHostException {
82                 this(InetAddress.getLocalHost());
83         }
84
85         /**
86          * Creates a new FCP connection to the Freenet node running on the given
87          * host, listening on the default port.
88          * 
89          * @param host
90          *            The hostname of the Freenet node
91          * @throws UnknownHostException
92          *             if <code>host</code> can not be resolved
93          */
94         public FcpConnection(String host) throws UnknownHostException {
95                 this(host, DEFAULT_PORT);
96         }
97
98         /**
99          * Creates a new FCP connection to the Freenet node running on the given
100          * host, listening on the given port.
101          * 
102          * @param host
103          *            The hostname of the Freenet node
104          * @param port
105          *            The port number of the node’s FCP port
106          * @throws UnknownHostException
107          *             if <code>host</code> can not be resolved
108          */
109         public FcpConnection(String host, int port) throws UnknownHostException {
110                 this(InetAddress.getByName(host), port);
111         }
112
113         /**
114          * Creates a new FCP connection to the Freenet node running at the given
115          * address, listening on the default port.
116          * 
117          * @param address
118          *            The address of the Freenet node
119          */
120         public FcpConnection(InetAddress address) {
121                 this(address, DEFAULT_PORT);
122         }
123
124         /**
125          * Creates a new FCP connection to the Freenet node running at the given
126          * address, listening on the given port.
127          * 
128          * @param address
129          *            The address of the Freenet node
130          * @param port
131          *            The port number of the node’s FCP port
132          */
133         public FcpConnection(InetAddress address, int port) {
134                 this.address = address;
135                 this.port = port;
136         }
137
138         //
139         // LISTENER MANAGEMENT
140         //
141
142         /**
143          * Adds the given listener to the list of listeners.
144          * 
145          * @param fcpListener
146          *            The listener to add
147          */
148         public void addFcpListener(FcpListener fcpListener) {
149                 fcpListeners.add(fcpListener);
150         }
151
152         /**
153          * Removes the given listener from the list of listeners.
154          * 
155          * @param fcpListener
156          *            The listener to remove
157          */
158         public void removeFcpListener(FcpListener fcpListener) {
159                 fcpListeners.remove(fcpListener);
160         }
161
162         /**
163          * Notifies listeners that a “NodeHello” message was received.
164          * 
165          * @see FcpListener#receivedNodeHello(FcpConnection, NodeHello)
166          * @param nodeHello
167          *            The “NodeHello” message
168          */
169         private void fireReceivedNodeHello(NodeHello nodeHello) {
170                 for (FcpListener fcpListener: fcpListeners) {
171                         fcpListener.receivedNodeHello(this, nodeHello);
172                 }
173         }
174
175         /**
176          * Notifies listeners that a “CloseConnectionDuplicateClientName” message
177          * was received.
178          * 
179          * @see FcpListener#receivedCloseConnectionDuplicateClientName(FcpConnection,
180          *      CloseConnectionDuplicateClientName)
181          * @param closeConnectionDuplicateClientName
182          *            The “CloseConnectionDuplicateClientName” message
183          */
184         private void fireReceivedCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
185                 for (FcpListener fcpListener: fcpListeners) {
186                         fcpListener.receivedCloseConnectionDuplicateClientName(this, closeConnectionDuplicateClientName);
187                 }
188         }
189
190         /**
191          * Notifies listeners that a “SSKKeypair” message was received.
192          * 
193          * @see FcpListener#receivedSSKKeypair(FcpConnection, SSKKeypair)
194          * @param sskKeypair
195          *            The “SSKKeypair” message
196          */
197         private void fireReceivedSSKKeypair(SSKKeypair sskKeypair) {
198                 for (FcpListener fcpListener: fcpListeners) {
199                         fcpListener.receivedSSKKeypair(this, sskKeypair);
200                 }
201         }
202
203         /**
204          * Notifies listeners that a “Peer” message was received.
205          * 
206          * @see FcpListener#receivedPeer(FcpConnection, Peer)
207          * @param peer
208          *            The “Peer” message
209          */
210         private void fireReceivedPeer(Peer peer) {
211                 for (FcpListener fcpListener: fcpListeners) {
212                         fcpListener.receivedPeer(this, peer);
213                 }
214         }
215
216         /**
217          * Notifies all listeners that an “EndListPeers” message was received.
218          * 
219          * @see FcpListener#receivedEndListPeers(FcpConnection, EndListPeers)
220          * @param endListPeers
221          *            The “EndListPeers” message
222          */
223         private void fireReceivedEndListPeers(EndListPeers endListPeers) {
224                 for (FcpListener fcpListener: fcpListeners) {
225                         fcpListener.receivedEndListPeers(this, endListPeers);
226                 }
227         }
228
229         /**
230          * Notifies all listeners that a “PeerNote” message was received.
231          * 
232          * @see FcpListener#receivedPeerNote(FcpConnection, PeerNote)
233          * @param peerNote
234          */
235         private void fireReceivedPeerNote(PeerNote peerNote) {
236                 for (FcpListener fcpListener: fcpListeners) {
237                         fcpListener.receivedPeerNote(this, peerNote);
238                 }
239         }
240
241         /**
242          * Notifies all listeners that an “EndListPeerNotes” message was received.
243          * 
244          * @see FcpListener#receivedEndListPeerNotes(FcpConnection,
245          *      EndListPeerNotes)
246          * @param endListPeerNotes
247          *            The “EndListPeerNotes” message
248          */
249         private void fireReceivedEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
250                 for (FcpListener fcpListener: fcpListeners) {
251                         fcpListener.receivedEndListPeerNotes(this, endListPeerNotes);
252                 }
253         }
254
255         /**
256          * Notifies all listeners that a “PeerRemoved” message was received.
257          * 
258          * @see FcpListener#receivedPeerRemoved(FcpConnection, PeerRemoved)
259          * @param peerRemoved
260          *            The “PeerRemoved” message
261          */
262         private void fireReceivedPeerRemoved(PeerRemoved peerRemoved) {
263                 for (FcpListener fcpListener: fcpListeners) {
264                         fcpListener.receivedPeerRemoved(this, peerRemoved);
265                 }
266         }
267
268         /**
269          * Notifies all listeners that a “NodeData” message was received.
270          * 
271          * @see FcpListener#receivedNodeData(FcpConnection, NodeData)
272          * @param nodeData
273          *            The “NodeData” message
274          */
275         private void fireReceivedNodeData(NodeData nodeData) {
276                 for (FcpListener fcpListener: fcpListeners) {
277                         fcpListener.receivedNodeData(this, nodeData);
278                 }
279         }
280
281         /**
282          * Notifies all listeners that a “TestDDAReply” message was received.
283          * 
284          * @see FcpListener#receivedTestDDAReply(FcpConnection, TestDDAReply)
285          * @param testDDAReply
286          *            The “TestDDAReply” message
287          */
288         private void fireReceivedTestDDAReply(TestDDAReply testDDAReply) {
289                 for (FcpListener fcpListener: fcpListeners) {
290                         fcpListener.receivedTestDDAReply(this, testDDAReply);
291                 }
292         }
293
294         /**
295          * Notifies all listeners that a “TestDDAComplete” message was received.
296          * 
297          * @see FcpListener#receivedTestDDAComplete(FcpConnection, TestDDAComplete)
298          * @param testDDAComplete
299          *            The “TestDDAComplete” message
300          */
301         private void fireReceivedTestDDAComplete(TestDDAComplete testDDAComplete) {
302                 for (FcpListener fcpListener: fcpListeners) {
303                         fcpListener.receivedTestDDAComplete(this, testDDAComplete);
304                 }
305         }
306
307         /**
308          * Notifies all listeners that a “PersistentGet” message was received.
309          * 
310          * @see FcpListener#receivedPersistentGet(FcpConnection, PersistentGet)
311          * @param persistentGet
312          *            The “PersistentGet” message
313          */
314         private void fireReceivedPersistentGet(PersistentGet persistentGet) {
315                 for (FcpListener fcpListener: fcpListeners) {
316                         fcpListener.receivedPersistentGet(this, persistentGet);
317                 }
318         }
319
320         /**
321          * Notifies all listeners that a “PersistentPut” message was received.
322          * 
323          * @see FcpListener#receivedPersistentPut(FcpConnection, PersistentPut)
324          * @param persistentPut
325          *            The “PersistentPut” message
326          */
327         private void fireReceivedPersistentPut(PersistentPut persistentPut) {
328                 for (FcpListener fcpListener: fcpListeners) {
329                         fcpListener.receivedPersistentPut(this, persistentPut);
330                 }
331         }
332
333         /**
334          * Notifies all listeners that a “EndListPersistentRequests” message was
335          * received.
336          * 
337          * @see FcpListener#receivedEndListPersistentRequests(FcpConnection,
338          *      EndListPersistentRequests)
339          * @param endListPersistentRequests
340          *            The “EndListPersistentRequests” message
341          */
342         private void fireReceivedEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
343                 for (FcpListener fcpListener: fcpListeners) {
344                         fcpListener.receivedEndListPersistentRequests(this, endListPersistentRequests);
345                 }
346         }
347
348         /**
349          * Notifies all listeners that a “URIGenerated” message was received.
350          * 
351          * @see FcpListener#receivedURIGenerated(FcpConnection, URIGenerated)
352          * @param uriGenerated
353          *            The “URIGenerated” message
354          */
355         private void fireReceivedURIGenerated(URIGenerated uriGenerated) {
356                 for (FcpListener fcpListener: fcpListeners) {
357                         fcpListener.receivedURIGenerated(this, uriGenerated);
358                 }
359         }
360
361         /**
362          * Notifies all listeners that a “DataFound” message was received.
363          * 
364          * @see FcpListener#receivedDataFound(FcpConnection, DataFound)
365          * @param dataFound
366          *            The “DataFound” message
367          */
368         private void fireReceivedDataFound(DataFound dataFound) {
369                 for (FcpListener fcpListener: fcpListeners) {
370                         fcpListener.receivedDataFound(this, dataFound);
371                 }
372         }
373
374         /**
375          * Notifies all listeners that an “AllData” message was received.
376          * 
377          * @see FcpListener#receivedAllData(FcpConnection, AllData)
378          * @param allData
379          *            The “AllData” message
380          */
381         private void fireReceivedAllData(AllData allData) {
382                 for (FcpListener fcpListener: fcpListeners) {
383                         fcpListener.receivedAllData(this, allData);
384                 }
385         }
386
387         /**
388          * Notifies all listeners that a “SimpleProgress” message was received.
389          * 
390          * @see FcpListener#receivedSimpleProgress(FcpConnection, SimpleProgress)
391          * @param simpleProgress
392          *            The “SimpleProgress” message
393          */
394         private void fireReceivedSimpleProgress(SimpleProgress simpleProgress) {
395                 for (FcpListener fcpListener: fcpListeners) {
396                         fcpListener.receivedSimpleProgress(this, simpleProgress);
397                 }
398         }
399
400         /**
401          * Notifies all listeners that a “StartedCompression” message was received.
402          * 
403          * @see FcpListener#receivedStartedCompression(FcpConnection,
404          *      StartedCompression)
405          * @param startedCompression
406          *            The “StartedCompression” message
407          */
408         private void fireReceivedStartedCompression(StartedCompression startedCompression) {
409                 for (FcpListener fcpListener: fcpListeners) {
410                         fcpListener.receivedStartedCompression(this, startedCompression);
411                 }
412         }
413
414         /**
415          * Notifies all listeners that a “FinishedCompression” message was received.
416          * 
417          * @see FcpListener#receviedFinishedCompression(FcpConnection,
418          *      FinishedCompression)
419          * @param finishedCompression
420          *            The “FinishedCompression” message
421          */
422         private void fireReceivedFinishedCompression(FinishedCompression finishedCompression) {
423                 for (FcpListener fcpListener: fcpListeners) {
424                         fcpListener.receviedFinishedCompression(this, finishedCompression);
425                 }
426         }
427
428         /**
429          * Notifies all listeners that an “UnknownPeerNoteType” message was
430          * received.
431          * 
432          * @see FcpListener#receivedUnknownPeerNoteType(FcpConnection,
433          *      UnknownPeerNoteType)
434          * @param unknownPeerNoteType
435          *            The “UnknownPeerNoteType” message
436          */
437         private void fireReceivedUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
438                 for (FcpListener fcpListener: fcpListeners) {
439                         fcpListener.receivedUnknownPeerNoteType(this, unknownPeerNoteType);
440                 }
441         }
442
443         /**
444          * Notifies all listeners that an “UnknownNodeIdentifier” message was
445          * received.
446          * 
447          * @see FcpListener#receivedUnknownNodeIdentifier(FcpConnection,
448          *      UnknownNodeIdentifier)
449          * @param unknownNodeIdentifier
450          *            The “UnknownNodeIdentifier” message
451          */
452         private void fireReceivedUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
453                 for (FcpListener fcpListener: fcpListeners) {
454                         fcpListener.receivedUnknownNodeIdentifier(this, unknownNodeIdentifier);
455                 }
456         }
457
458         /**
459          * Notifies all listeners that a “ConfigData” message was received.
460          * 
461          * @see FcpListener#receivedConfigData(FcpConnection, ConfigData)
462          * @param configData
463          *            The “ConfigData” message
464          */
465         private void fireReceivedConfigData(ConfigData configData) {
466                 for (FcpListener fcpListener: fcpListeners) {
467                         fcpListener.receivedConfigData(this, configData);
468                 }
469         }
470
471         /**
472          * Notifies all listeners that a “GetFailed” message was received.
473          * 
474          * @see FcpListener#receivedGetFailed(FcpConnection, GetFailed)
475          * @param getFailed
476          *            The “GetFailed” message
477          */
478         private void fireReceivedGetFailed(GetFailed getFailed) {
479                 for (FcpListener fcpListener: fcpListeners) {
480                         fcpListener.receivedGetFailed(this, getFailed);
481                 }
482         }
483
484         /**
485          * Notifies all listeners that a “PutFailed” message was received.
486          * 
487          * @see FcpListener#receivedPutFailed(FcpConnection, PutFailed)
488          * @param putFailed
489          *            The “PutFailed” message
490          */
491         private void fireReceivedPutFailed(PutFailed putFailed) {
492                 for (FcpListener fcpListener: fcpListeners) {
493                         fcpListener.receivedPutFailed(this, putFailed);
494                 }
495         }
496
497         /**
498          * Notifies all listeners that an “IdentifierCollision” message was
499          * received.
500          * 
501          * @see FcpListener#receivedIdentifierCollision(FcpConnection,
502          *      IdentifierCollision)
503          * @param identifierCollision
504          *            The “IdentifierCollision” message
505          */
506         private void fireReceivedIdentifierCollision(IdentifierCollision identifierCollision) {
507                 for (FcpListener fcpListener: fcpListeners) {
508                         fcpListener.receivedIdentifierCollision(this, identifierCollision);
509                 }
510         }
511
512         /**
513          * Notifies all listeners that an “PersistentPutDir” message was received.
514          * 
515          * @see FcpListener#receivedPersistentPutDir(FcpConnection,
516          *      PersistentPutDir)
517          * @param persistentPutDir
518          *            The “PersistentPutDir” message
519          */
520         private void fireReceivedPersistentPutDir(PersistentPutDir persistentPutDir) {
521                 for (FcpListener fcpListener: fcpListeners) {
522                         fcpListener.receivedPersistentPutDir(this, persistentPutDir);
523                 }
524         }
525
526         /**
527          * Notifies all listeners that a “PersistentRequestRemoved” message was
528          * received.
529          * 
530          * @see FcpListener#receivedPersistentRequestRemoved(FcpConnection,
531          *      PersistentRequestRemoved)
532          * @param persistentRequestRemoved
533          *            The “PersistentRequestRemoved” message
534          */
535         private void fireReceivedPersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) {
536                 for (FcpListener fcpListener: fcpListeners) {
537                         fcpListener.receivedPersistentRequestRemoved(this, persistentRequestRemoved);
538                 }
539         }
540
541         /**
542          * Notifies all listeners that a “SubscribedUSKUpdate” message was received.
543          * 
544          * @see FcpListener#receivedSubscribedUSKUpdate(FcpConnection,
545          *      SubscribedUSKUpdate)
546          * @param subscribedUSKUpdate
547          *            The “SubscribedUSKUpdate” message
548          */
549         private void fireReceivedSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) {
550                 for (FcpListener fcpListener: fcpListeners) {
551                         fcpListener.receivedSubscribedUSKUpdate(this, subscribedUSKUpdate);
552                 }
553         }
554
555         /**
556          * Notifies all listeners that a “PluginInfo” message was received.
557          * 
558          * @see FcpListener#receivedPluginInfo(FcpConnection, PluginInfo)
559          * @param pluginInfo
560          *            The “PluginInfo” message
561          */
562         private void fireReceivedPluginInfo(PluginInfo pluginInfo) {
563                 for (FcpListener fcpListener: fcpListeners) {
564                         fcpListener.receivedPluginInfo(this, pluginInfo);
565                 }
566         }
567
568         /**
569          * Notifies all listeners that an “FCPPluginReply” message was received.
570          * 
571          * @see FcpListener#receivedFCPPluginReply(FcpConnection, FCPPluginReply)
572          * @param fcpPluginReply
573          *            The “FCPPluginReply” message
574          */
575         private void fireReceivedFCPPluginReply(FCPPluginReply fcpPluginReply) {
576                 for (FcpListener fcpListener: fcpListeners) {
577                         fcpListener.receivedFCPPluginReply(this, fcpPluginReply);
578                 }
579         }
580
581         /**
582          * Notifies all listeners that a “PersistentRequestModified” message was
583          * received.
584          * 
585          * @see FcpListener#receivedPersistentRequestModified(FcpConnection,
586          *      PersistentRequestModified)
587          * @param persistentRequestModified
588          *            The “PersistentRequestModified” message
589          */
590         private void fireReceivedPersistentRequestModified(PersistentRequestModified persistentRequestModified) {
591                 for (FcpListener fcpListener: fcpListeners) {
592                         fcpListener.receivedPersistentRequestModified(this, persistentRequestModified);
593                 }
594         }
595
596         /**
597          * Notifies all listeners that a “PutSuccessful” message was received.
598          * 
599          * @see FcpListener#receivedPutSuccessful(FcpConnection, PutSuccessful)
600          * @param putSuccessful
601          *            The “PutSuccessful” message
602          */
603         private void fireReceivedPutSuccessful(PutSuccessful putSuccessful) {
604                 for (FcpListener fcpListener: fcpListeners) {
605                         fcpListener.receivedPutSuccessful(this, putSuccessful);
606                 }
607         }
608
609         /**
610          * Notifies all listeners that a “PutFetchable” message was received.
611          * 
612          * @see FcpListener#receivedPutFetchable(FcpConnection, PutFetchable)
613          * @param putFetchable
614          *            The “PutFetchable” message
615          */
616         private void fireReceivedPutFetchable(PutFetchable putFetchable) {
617                 for (FcpListener fcpListener: fcpListeners) {
618                         fcpListener.receivedPutFetchable(this, putFetchable);
619                 }
620         }
621
622         /**
623          * Notifies all listeners that a “ProtocolError” message was received.
624          * 
625          * @see FcpListener#receivedProtocolError(FcpConnection, ProtocolError)
626          * @param protocolError
627          *            The “ProtocolError” message
628          */
629         private void fireReceivedProtocolError(ProtocolError protocolError) {
630                 for (FcpListener fcpListener: fcpListeners) {
631                         fcpListener.receivedProtocolError(this, protocolError);
632                 }
633         }
634
635         /**
636          * Notifies all registered listeners that a message has been received.
637          * 
638          * @see FcpListener#receivedMessage(FcpConnection, FcpMessage)
639          * @param fcpMessage
640          *            The message that was received
641          */
642         private void fireMessageReceived(FcpMessage fcpMessage) {
643                 for (FcpListener fcpListener: fcpListeners) {
644                         fcpListener.receivedMessage(this, fcpMessage);
645                 }
646         }
647
648         /**
649          * Notifies all listeners that the connection to the node was closed.
650          * 
651          * @param throwable
652          *            The exception that caused the disconnect, or <code>null</code>
653          *            if there was no exception
654          * @see FcpListener#connectionClosed(FcpConnection, Throwable)
655          */
656         private void fireConnectionClosed(Throwable throwable) {
657                 for (FcpListener fcpListener: fcpListeners) {
658                         fcpListener.connectionClosed(this, throwable);
659                 }
660         }
661
662         //
663         // ACTIONS
664         //
665
666         /**
667          * Connects to the node.
668          * 
669          * @throws IOException
670          *             if an I/O error occurs
671          * @throws IllegalStateException
672          *             if there is already a connection to the node
673          */
674         public synchronized void connect() throws IOException, IllegalStateException {
675                 if (connectionHandler != null) {
676                         throw new IllegalStateException("already connected, disconnect first");
677                 }
678                 logger.info("connecting to " + address + ":" + port + "…");
679                 remoteSocket = new Socket(address, port);
680                 remoteInputStream = remoteSocket.getInputStream();
681                 remoteOutputStream = remoteSocket.getOutputStream();
682                 new Thread(connectionHandler = new FcpConnectionHandler(this, remoteInputStream)).start();
683         }
684
685         /**
686          * Disconnects from the node. If there is no connection to the node, this
687          * method does nothing.
688          * 
689          * @deprecated Use {@link #close()} instead
690          */
691         @Deprecated
692         public synchronized void disconnect() {
693                 close();
694         }
695
696         /**
697          * Closes the connection. If there is no connection to the node, this method
698          * does nothing.
699          */
700         public void close() {
701                 handleDisconnect(null);
702         }
703
704         /**
705          * Sends the given FCP message.
706          * 
707          * @param fcpMessage
708          *            The FCP message to send
709          * @throws IOException
710          *             if an I/O error occurs
711          */
712         public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException {
713                 logger.fine("sending message: " + fcpMessage.getName());
714                 fcpMessage.write(remoteOutputStream);
715         }
716
717         //
718         // PACKAGE-PRIVATE METHODS
719         //
720
721         /**
722          * Handles the given message, notifying listeners. This message should only
723          * be called by {@link FcpConnectionHandler}.
724          * 
725          * @param fcpMessage
726          *            The received message
727          */
728         void handleMessage(FcpMessage fcpMessage) {
729                 logger.fine("received message: " + fcpMessage.getName());
730                 String messageName = fcpMessage.getName();
731                 countMessage(messageName);
732                 if ("SimpleProgress".equals(messageName)) {
733                         fireReceivedSimpleProgress(new SimpleProgress(fcpMessage));
734                 } else if ("ProtocolError".equals(messageName)) {
735                         fireReceivedProtocolError(new ProtocolError(fcpMessage));
736                 } else if ("PersistentGet".equals(messageName)) {
737                         fireReceivedPersistentGet(new PersistentGet(fcpMessage));
738                 } else if ("PersistentPut".equals(messageName)) {
739                         fireReceivedPersistentPut(new PersistentPut(fcpMessage));
740                 } else if ("PersistentPutDir".equals(messageName)) {
741                         fireReceivedPersistentPutDir(new PersistentPutDir(fcpMessage));
742                 } else if ("URIGenerated".equals(messageName)) {
743                         fireReceivedURIGenerated(new URIGenerated(fcpMessage));
744                 } else if ("EndListPersistentRequests".equals(messageName)) {
745                         fireReceivedEndListPersistentRequests(new EndListPersistentRequests(fcpMessage));
746                 } else if ("Peer".equals(messageName)) {
747                         fireReceivedPeer(new Peer(fcpMessage));
748                 } else if ("PeerNote".equals(messageName)) {
749                         fireReceivedPeerNote(new PeerNote(fcpMessage));
750                 } else if ("StartedCompression".equals(messageName)) {
751                         fireReceivedStartedCompression(new StartedCompression(fcpMessage));
752                 } else if ("FinishedCompression".equals(messageName)) {
753                         fireReceivedFinishedCompression(new FinishedCompression(fcpMessage));
754                 } else if ("GetFailed".equals(messageName)) {
755                         fireReceivedGetFailed(new GetFailed(fcpMessage));
756                 } else if ("PutFetchable".equals(messageName)) {
757                         fireReceivedPutFetchable(new PutFetchable(fcpMessage));
758                 } else if ("PutSuccessful".equals(messageName)) {
759                         fireReceivedPutSuccessful(new PutSuccessful(fcpMessage));
760                 } else if ("PutFailed".equals(messageName)) {
761                         fireReceivedPutFailed(new PutFailed(fcpMessage));
762                 } else if ("DataFound".equals(messageName)) {
763                         fireReceivedDataFound(new DataFound(fcpMessage));
764                 } else if ("SubscribedUSKUpdate".equals(messageName)) {
765                         fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage));
766                 } else if ("IdentifierCollision".equals(messageName)) {
767                         fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage));
768                 } else if ("AllData".equals(messageName)) {
769                         LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
770                         fireReceivedAllData(new AllData(fcpMessage, payloadInputStream));
771                         try {
772                                 payloadInputStream.consume();
773                         } catch (IOException ioe1) {
774                                 /* well, ignore. when the connection handler fails, all fails. */
775                         }
776                 } else if ("EndListPeerNotes".equals(messageName)) {
777                         fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage));
778                 } else if ("EndListPeers".equals(messageName)) {
779                         fireReceivedEndListPeers(new EndListPeers(fcpMessage));
780                 } else if ("SSKKeypair".equals(messageName)) {
781                         fireReceivedSSKKeypair(new SSKKeypair(fcpMessage));
782                 } else if ("PeerRemoved".equals(messageName)) {
783                         fireReceivedPeerRemoved(new PeerRemoved(fcpMessage));
784                 } else if ("PersistentRequestModified".equals(messageName)) {
785                         fireReceivedPersistentRequestModified(new PersistentRequestModified(fcpMessage));
786                 } else if ("PersistentRequestRemoved".equals(messageName)) {
787                         fireReceivedPersistentRequestRemoved(new PersistentRequestRemoved(fcpMessage));
788                 } else if ("UnknownPeerNoteType".equals(messageName)) {
789                         fireReceivedUnknownPeerNoteType(new UnknownPeerNoteType(fcpMessage));
790                 } else if ("UnknownNodeIdentifier".equals(messageName)) {
791                         fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage));
792                 } else if ("FCPPluginReply".equals(messageName)) {
793                         LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
794                         fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream));
795                         try {
796                                 payloadInputStream.consume();
797                         } catch (IOException ioe1) {
798                                 /* ignore. */
799                         }
800                 } else if ("PluginInfo".equals(messageName)) {
801                         fireReceivedPluginInfo(new PluginInfo(fcpMessage));
802                 } else if ("NodeData".equals(messageName)) {
803                         fireReceivedNodeData(new NodeData(fcpMessage));
804                 } else if ("TestDDAReply".equals(messageName)) {
805                         fireReceivedTestDDAReply(new TestDDAReply(fcpMessage));
806                 } else if ("TestDDAComplete".equals(messageName)) {
807                         fireReceivedTestDDAComplete(new TestDDAComplete(fcpMessage));
808                 } else if ("ConfigData".equals(messageName)) {
809                         fireReceivedConfigData(new ConfigData(fcpMessage));
810                 } else if ("NodeHello".equals(messageName)) {
811                         fireReceivedNodeHello(new NodeHello(fcpMessage));
812                 } else if ("CloseConnectionDuplicateClientName".equals(messageName)) {
813                         fireReceivedCloseConnectionDuplicateClientName(new CloseConnectionDuplicateClientName(fcpMessage));
814                 } else {
815                         fireMessageReceived(fcpMessage);
816                 }
817         }
818
819         /**
820          * Handles a disconnect from the node.
821          * 
822          * @param throwable
823          *            The exception that caused the disconnect, or <code>null</code>
824          *            if there was no exception
825          */
826         synchronized void handleDisconnect(Throwable throwable) {
827                 FcpUtils.close(remoteInputStream);
828                 FcpUtils.close(remoteOutputStream);
829                 FcpUtils.close(remoteSocket);
830                 if (connectionHandler != null) {
831                         connectionHandler.stop();
832                         connectionHandler = null;
833                         fireConnectionClosed(throwable);
834                 }
835         }
836
837         //
838         // PRIVATE METHODS
839         //
840
841         /**
842          * Incremets the counter in {@link #incomingMessageStatistics} by <cod>1</code>
843          * for the given message name.
844          * 
845          * @param name
846          *            The name of the message to count
847          */
848         private void countMessage(String name) {
849                 int oldValue = 0;
850                 if (incomingMessageStatistics.containsKey(name)) {
851                         oldValue = incomingMessageStatistics.get(name);
852                 }
853                 incomingMessageStatistics.put(name, oldValue + 1);
854                 logger.finest("count for " + name + ": " + (oldValue + 1));
855         }
856
857         /**
858          * Returns a limited input stream from the node’s input stream.
859          * 
860          * @param dataLength
861          *            The length of the stream
862          * @return The limited input stream
863          */
864         private LimitedInputStream getInputStream(long dataLength) {
865                 if (dataLength <= 0) {
866                         return new LimitedInputStream(null, 0);
867                 }
868                 return new LimitedInputStream(remoteInputStream, dataLength);
869         }
870
871         /**
872          * A wrapper around an {@link InputStream} that only supplies a limit number
873          * of bytes from the underlying input stream.
874          * 
875          * @author David ‘Bombe’ Roden &lt;bombe@freenetproject.org&gt;
876          */
877         private static class LimitedInputStream extends FilterInputStream {
878
879                 /** The remaining number of bytes that can be read. */
880                 private long remaining;
881
882                 /**
883                  * Creates a new LimitedInputStream that supplies at most
884                  * <code>length</code> bytes from the given input stream.
885                  * 
886                  * @param inputStream
887                  *            The input stream
888                  * @param length
889                  *            The number of bytes to read
890                  */
891                 public LimitedInputStream(InputStream inputStream, long length) {
892                         super(inputStream);
893                         remaining = length;
894                 }
895
896                 /**
897                  * @see java.io.FilterInputStream#available()
898                  */
899                 @Override
900                 public synchronized int available() throws IOException {
901                         if (remaining == 0) {
902                                 return 0;
903                         }
904                         return (int) Math.min(super.available(), Math.min(Integer.MAX_VALUE, remaining));
905                 }
906
907                 /**
908                  * @see java.io.FilterInputStream#read()
909                  */
910                 @Override
911                 public synchronized int read() throws IOException {
912                         int read = -1;
913                         if (remaining > 0) {
914                                 read = super.read();
915                                 remaining--;
916                         }
917                         return read;
918                 }
919
920                 /**
921                  * @see java.io.FilterInputStream#read(byte[], int, int)
922                  */
923                 @Override
924                 public synchronized int read(byte[] b, int off, int len) throws IOException {
925                         if (remaining == 0) {
926                                 return -1;
927                         }
928                         int toCopy = (int) Math.min(len, Math.min(remaining, Integer.MAX_VALUE));
929                         int read = super.read(b, off, toCopy);
930                         remaining -= read;
931                         return read;
932                 }
933
934                 /**
935                  * @see java.io.FilterInputStream#skip(long)
936                  */
937                 @Override
938                 public synchronized long skip(long n) throws IOException {
939                         if ((n < 0) || (remaining == 0)) {
940                                 return 0;
941                         }
942                         long skipped = super.skip(Math.min(n, remaining));
943                         remaining -= skipped;
944                         return skipped;
945                 }
946
947                 /**
948                  * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
949                  * {@link #reset()} are not supported.
950                  * 
951                  * @see java.io.FilterInputStream#mark(int)
952                  */
953                 @Override
954                 public void mark(int readlimit) {
955                         /* do nothing. */
956                 }
957
958                 /**
959                  * {@inheritDoc}
960                  * 
961                  * @see java.io.FilterInputStream#markSupported()
962                  * @return <code>false</code>
963                  */
964                 @Override
965                 public boolean markSupported() {
966                         return false;
967                 }
968
969                 /**
970                  * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
971                  * {@link #reset()} are not supported.
972                  * 
973                  * @see java.io.FilterInputStream#reset()
974                  */
975                 @Override
976                 public void reset() throws IOException {
977                         /* do nothing. */
978                 }
979
980                 /**
981                  * Consumes the input stream, i.e. read all bytes until the limit is
982                  * reached.
983                  * 
984                  * @throws IOException
985                  *             if an I/O error occurs
986                  */
987                 public void consume() throws IOException {
988                         while (remaining > 0) {
989                                 skip(remaining);
990                         }
991                 }
992
993         }
994
995 }