implement closeable interface
[jFCPlib.git] / src / net / pterodactylus / fcp / FcpConnection.java
1 /*
2  * jSite2 - FpcConnection.java -
3  * Copyright © 2008 David Roden
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18  */
19
20 package net.pterodactylus.fcp;
21
22 import java.io.Closeable;
23 import java.io.FilterInputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.OutputStream;
27 import java.net.InetAddress;
28 import java.net.Socket;
29 import java.net.UnknownHostException;
30 import java.util.ArrayList;
31 import java.util.Collections;
32 import java.util.HashMap;
33 import java.util.List;
34 import java.util.Map;
35 import java.util.logging.Logger;
36
37 /**
38  * An FCP connection to a Freenet node.
39  *
40  * @author David ‘Bombe’ Roden <bombe@freenetproject.org>
41  * @version $Id$
42  */
43 public class FcpConnection implements Closeable {
44
45         /** Logger. */
46         private static final Logger logger = Logger.getLogger(FcpConnection.class.getName());
47
48         /** The default port for FCP v2. */
49         public static final int DEFAULT_PORT = 9481;
50
51         /** The list of FCP listeners. */
52         private final List<FcpListener> fcpListeners = new ArrayList<FcpListener>();
53
54         /** The address of the node. */
55         private final InetAddress address;
56
57         /** The port number of the node’s FCP port. */
58         private final int port;
59
60         /** The remote socket. */
61         private Socket remoteSocket;
62
63         /** The input stream from the node. */
64         private InputStream remoteInputStream;
65
66         /** The output stream to the node. */
67         private OutputStream remoteOutputStream;
68
69         /** The connection handler. */
70         private FcpConnectionHandler connectionHandler;
71
72         /** Incoming message statistics. */
73         private Map<String, Integer> incomingMessageStatistics = Collections.synchronizedMap(new HashMap<String, Integer>());
74
75         /**
76          * Creates a new FCP connection to the freenet node running on localhost,
77          * using the default port.
78          *
79          * @throws UnknownHostException
80          *             if the hostname can not be resolved
81          */
82         public FcpConnection() throws UnknownHostException {
83                 this(InetAddress.getLocalHost());
84         }
85
86         /**
87          * Creates a new FCP connection to the Freenet node running on the given
88          * host, listening on the default port.
89          *
90          * @param host
91          *            The hostname of the Freenet node
92          * @throws UnknownHostException
93          *             if <code>host</code> can not be resolved
94          */
95         public FcpConnection(String host) throws UnknownHostException {
96                 this(host, DEFAULT_PORT);
97         }
98
99         /**
100          * Creates a new FCP connection to the Freenet node running on the given
101          * host, listening on the given port.
102          *
103          * @param host
104          *            The hostname of the Freenet node
105          * @param port
106          *            The port number of the node’s FCP port
107          * @throws UnknownHostException
108          *             if <code>host</code> can not be resolved
109          */
110         public FcpConnection(String host, int port) throws UnknownHostException {
111                 this(InetAddress.getByName(host), port);
112         }
113
114         /**
115          * Creates a new FCP connection to the Freenet node running at the given
116          * address, listening on the default port.
117          *
118          * @param address
119          *            The address of the Freenet node
120          */
121         public FcpConnection(InetAddress address) {
122                 this(address, DEFAULT_PORT);
123         }
124
125         /**
126          * Creates a new FCP connection to the Freenet node running at the given
127          * address, listening on the given port.
128          *
129          * @param address
130          *            The address of the Freenet node
131          * @param port
132          *            The port number of the node’s FCP port
133          */
134         public FcpConnection(InetAddress address, int port) {
135                 this.address = address;
136                 this.port = port;
137         }
138
139         //
140         // LISTENER MANAGEMENT
141         //
142
143         /**
144          * Adds the given listener to the list of listeners.
145          *
146          * @param fcpListener
147          *            The listener to add
148          */
149         public void addFcpListener(FcpListener fcpListener) {
150                 fcpListeners.add(fcpListener);
151         }
152
153         /**
154          * Removes the given listener from the list of listeners.
155          *
156          * @param fcpListener
157          *            The listener to remove
158          */
159         public void removeFcpListener(FcpListener fcpListener) {
160                 fcpListeners.remove(fcpListener);
161         }
162
163         /**
164          * Notifies listeners that a “NodeHello” message was received.
165          *
166          * @see FcpListener#receivedNodeHello(FcpConnection, NodeHello)
167          * @param nodeHello
168          *            The “NodeHello” message
169          */
170         private void fireReceivedNodeHello(NodeHello nodeHello) {
171                 for (FcpListener fcpListener: fcpListeners) {
172                         fcpListener.receivedNodeHello(this, nodeHello);
173                 }
174         }
175
176         /**
177          * Notifies listeners that a “CloseConnectionDuplicateClientName” message
178          * was received.
179          *
180          * @see FcpListener#receivedCloseConnectionDuplicateClientName(FcpConnection,
181          *      CloseConnectionDuplicateClientName)
182          * @param closeConnectionDuplicateClientName
183          *            The “CloseConnectionDuplicateClientName” message
184          */
185         private void fireReceivedCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
186                 for (FcpListener fcpListener: fcpListeners) {
187                         fcpListener.receivedCloseConnectionDuplicateClientName(this, closeConnectionDuplicateClientName);
188                 }
189         }
190
191         /**
192          * Notifies listeners that a “SSKKeypair” message was received.
193          *
194          * @see FcpListener#receivedSSKKeypair(FcpConnection, SSKKeypair)
195          * @param sskKeypair
196          *            The “SSKKeypair” message
197          */
198         private void fireReceivedSSKKeypair(SSKKeypair sskKeypair) {
199                 for (FcpListener fcpListener: fcpListeners) {
200                         fcpListener.receivedSSKKeypair(this, sskKeypair);
201                 }
202         }
203
204         /**
205          * Notifies listeners that a “Peer” message was received.
206          *
207          * @see FcpListener#receivedPeer(FcpConnection, Peer)
208          * @param peer
209          *            The “Peer” message
210          */
211         private void fireReceivedPeer(Peer peer) {
212                 for (FcpListener fcpListener: fcpListeners) {
213                         fcpListener.receivedPeer(this, peer);
214                 }
215         }
216
217         /**
218          * Notifies all listeners that an “EndListPeers” message was received.
219          *
220          * @see FcpListener#receivedEndListPeers(FcpConnection, EndListPeers)
221          * @param endListPeers
222          *            The “EndListPeers” message
223          */
224         private void fireReceivedEndListPeers(EndListPeers endListPeers) {
225                 for (FcpListener fcpListener: fcpListeners) {
226                         fcpListener.receivedEndListPeers(this, endListPeers);
227                 }
228         }
229
230         /**
231          * Notifies all listeners that a “PeerNote” message was received.
232          *
233          * @see FcpListener#receivedPeerNote(FcpConnection, PeerNote)
234          * @param peerNote
235          */
236         private void fireReceivedPeerNote(PeerNote peerNote) {
237                 for (FcpListener fcpListener: fcpListeners) {
238                         fcpListener.receivedPeerNote(this, peerNote);
239                 }
240         }
241
242         /**
243          * Notifies all listeners that an “EndListPeerNotes” message was received.
244          *
245          * @see FcpListener#receivedEndListPeerNotes(FcpConnection,
246          *      EndListPeerNotes)
247          * @param endListPeerNotes
248          *            The “EndListPeerNotes” message
249          */
250         private void fireReceivedEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
251                 for (FcpListener fcpListener: fcpListeners) {
252                         fcpListener.receivedEndListPeerNotes(this, endListPeerNotes);
253                 }
254         }
255
256         /**
257          * Notifies all listeners that a “PeerRemoved” message was received.
258          *
259          * @see FcpListener#receivedPeerRemoved(FcpConnection, PeerRemoved)
260          * @param peerRemoved
261          *            The “PeerRemoved” message
262          */
263         private void fireReceivedPeerRemoved(PeerRemoved peerRemoved) {
264                 for (FcpListener fcpListener: fcpListeners) {
265                         fcpListener.receivedPeerRemoved(this, peerRemoved);
266                 }
267         }
268
269         /**
270          * Notifies all listeners that a “NodeData” message was received.
271          *
272          * @see FcpListener#receivedNodeData(FcpConnection, NodeData)
273          * @param nodeData
274          *            The “NodeData” message
275          */
276         private void fireReceivedNodeData(NodeData nodeData) {
277                 for (FcpListener fcpListener: fcpListeners) {
278                         fcpListener.receivedNodeData(this, nodeData);
279                 }
280         }
281
282         /**
283          * Notifies all listeners that a “TestDDAReply” message was received.
284          *
285          * @see FcpListener#receivedTestDDAReply(FcpConnection, TestDDAReply)
286          * @param testDDAReply
287          *            The “TestDDAReply” message
288          */
289         private void fireReceivedTestDDAReply(TestDDAReply testDDAReply) {
290                 for (FcpListener fcpListener: fcpListeners) {
291                         fcpListener.receivedTestDDAReply(this, testDDAReply);
292                 }
293         }
294
295         /**
296          * Notifies all listeners that a “TestDDAComplete” message was received.
297          *
298          * @see FcpListener#receivedTestDDAComplete(FcpConnection, TestDDAComplete)
299          * @param testDDAComplete
300          *            The “TestDDAComplete” message
301          */
302         private void fireReceivedTestDDAComplete(TestDDAComplete testDDAComplete) {
303                 for (FcpListener fcpListener: fcpListeners) {
304                         fcpListener.receivedTestDDAComplete(this, testDDAComplete);
305                 }
306         }
307
308         /**
309          * Notifies all listeners that a “PersistentGet” message was received.
310          *
311          * @see FcpListener#receivedPersistentGet(FcpConnection, PersistentGet)
312          * @param persistentGet
313          *            The “PersistentGet” message
314          */
315         private void fireReceivedPersistentGet(PersistentGet persistentGet) {
316                 for (FcpListener fcpListener: fcpListeners) {
317                         fcpListener.receivedPersistentGet(this, persistentGet);
318                 }
319         }
320
321         /**
322          * Notifies all listeners that a “PersistentPut” message was received.
323          *
324          * @see FcpListener#receivedPersistentPut(FcpConnection, PersistentPut)
325          * @param persistentPut
326          *            The “PersistentPut” message
327          */
328         private void fireReceivedPersistentPut(PersistentPut persistentPut) {
329                 for (FcpListener fcpListener: fcpListeners) {
330                         fcpListener.receivedPersistentPut(this, persistentPut);
331                 }
332         }
333
334         /**
335          * Notifies all listeners that a “EndListPersistentRequests” message was
336          * received.
337          *
338          * @see FcpListener#receivedEndListPersistentRequests(FcpConnection,
339          *      EndListPersistentRequests)
340          * @param endListPersistentRequests
341          *            The “EndListPersistentRequests” message
342          */
343         private void fireReceivedEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
344                 for (FcpListener fcpListener: fcpListeners) {
345                         fcpListener.receivedEndListPersistentRequests(this, endListPersistentRequests);
346                 }
347         }
348
349         /**
350          * Notifies all listeners that a “URIGenerated” message was received.
351          *
352          * @see FcpListener#receivedURIGenerated(FcpConnection, URIGenerated)
353          * @param uriGenerated
354          *            The “URIGenerated” message
355          */
356         private void fireReceivedURIGenerated(URIGenerated uriGenerated) {
357                 for (FcpListener fcpListener: fcpListeners) {
358                         fcpListener.receivedURIGenerated(this, uriGenerated);
359                 }
360         }
361
362         /**
363          * Notifies all listeners that a “DataFound” message was received.
364          *
365          * @see FcpListener#receivedDataFound(FcpConnection, DataFound)
366          * @param dataFound
367          *            The “DataFound” message
368          */
369         private void fireReceivedDataFound(DataFound dataFound) {
370                 for (FcpListener fcpListener: fcpListeners) {
371                         fcpListener.receivedDataFound(this, dataFound);
372                 }
373         }
374
375         /**
376          * Notifies all listeners that an “AllData” message was received.
377          *
378          * @see FcpListener#receivedAllData(FcpConnection, AllData)
379          * @param allData
380          *            The “AllData” message
381          */
382         private void fireReceivedAllData(AllData allData) {
383                 for (FcpListener fcpListener: fcpListeners) {
384                         fcpListener.receivedAllData(this, allData);
385                 }
386         }
387
388         /**
389          * Notifies all listeners that a “SimpleProgress” message was received.
390          *
391          * @see FcpListener#receivedSimpleProgress(FcpConnection, SimpleProgress)
392          * @param simpleProgress
393          *            The “SimpleProgress” message
394          */
395         private void fireReceivedSimpleProgress(SimpleProgress simpleProgress) {
396                 for (FcpListener fcpListener: fcpListeners) {
397                         fcpListener.receivedSimpleProgress(this, simpleProgress);
398                 }
399         }
400
401         /**
402          * Notifies all listeners that a “StartedCompression” message was received.
403          *
404          * @see FcpListener#receivedStartedCompression(FcpConnection,
405          *      StartedCompression)
406          * @param startedCompression
407          *            The “StartedCompression” message
408          */
409         private void fireReceivedStartedCompression(StartedCompression startedCompression) {
410                 for (FcpListener fcpListener: fcpListeners) {
411                         fcpListener.receivedStartedCompression(this, startedCompression);
412                 }
413         }
414
415         /**
416          * Notifies all listeners that a “FinishedCompression” message was received.
417          *
418          * @see FcpListener#receviedFinishedCompression(FcpConnection,
419          *      FinishedCompression)
420          * @param finishedCompression
421          *            The “FinishedCompression” message
422          */
423         private void fireReceivedFinishedCompression(FinishedCompression finishedCompression) {
424                 for (FcpListener fcpListener: fcpListeners) {
425                         fcpListener.receviedFinishedCompression(this, finishedCompression);
426                 }
427         }
428
429         /**
430          * Notifies all listeners that an “UnknownPeerNoteType” message was
431          * received.
432          *
433          * @see FcpListener#receivedUnknownPeerNoteType(FcpConnection,
434          *      UnknownPeerNoteType)
435          * @param unknownPeerNoteType
436          *            The “UnknownPeerNoteType” message
437          */
438         private void fireReceivedUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
439                 for (FcpListener fcpListener: fcpListeners) {
440                         fcpListener.receivedUnknownPeerNoteType(this, unknownPeerNoteType);
441                 }
442         }
443
444         /**
445          * Notifies all listeners that an “UnknownNodeIdentifier” message was
446          * received.
447          *
448          * @see FcpListener#receivedUnknownNodeIdentifier(FcpConnection,
449          *      UnknownNodeIdentifier)
450          * @param unknownNodeIdentifier
451          *            The “UnknownNodeIdentifier” message
452          */
453         private void fireReceivedUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
454                 for (FcpListener fcpListener: fcpListeners) {
455                         fcpListener.receivedUnknownNodeIdentifier(this, unknownNodeIdentifier);
456                 }
457         }
458
459         /**
460          * Notifies all listeners that a “ConfigData” message was received.
461          *
462          * @see FcpListener#receivedConfigData(FcpConnection, ConfigData)
463          * @param configData
464          *            The “ConfigData” message
465          */
466         private void fireReceivedConfigData(ConfigData configData) {
467                 for (FcpListener fcpListener: fcpListeners) {
468                         fcpListener.receivedConfigData(this, configData);
469                 }
470         }
471
472         /**
473          * Notifies all listeners that a “GetFailed” message was received.
474          *
475          * @see FcpListener#receivedGetFailed(FcpConnection, GetFailed)
476          * @param getFailed
477          *            The “GetFailed” message
478          */
479         private void fireReceivedGetFailed(GetFailed getFailed) {
480                 for (FcpListener fcpListener: fcpListeners) {
481                         fcpListener.receivedGetFailed(this, getFailed);
482                 }
483         }
484
485         /**
486          * Notifies all listeners that a “PutFailed” message was received.
487          *
488          * @see FcpListener#receivedPutFailed(FcpConnection, PutFailed)
489          * @param putFailed
490          *            The “PutFailed” message
491          */
492         private void fireReceivedPutFailed(PutFailed putFailed) {
493                 for (FcpListener fcpListener: fcpListeners) {
494                         fcpListener.receivedPutFailed(this, putFailed);
495                 }
496         }
497
498         /**
499          * Notifies all listeners that an “IdentifierCollision” message was
500          * received.
501          *
502          * @see FcpListener#receivedIdentifierCollision(FcpConnection,
503          *      IdentifierCollision)
504          * @param identifierCollision
505          *            The “IdentifierCollision” message
506          */
507         private void fireReceivedIdentifierCollision(IdentifierCollision identifierCollision) {
508                 for (FcpListener fcpListener: fcpListeners) {
509                         fcpListener.receivedIdentifierCollision(this, identifierCollision);
510                 }
511         }
512
513         /**
514          * Notifies all listeners that an “PersistentPutDir” message was received.
515          *
516          * @see FcpListener#receivedPersistentPutDir(FcpConnection,
517          *      PersistentPutDir)
518          * @param persistentPutDir
519          *            The “PersistentPutDir” message
520          */
521         private void fireReceivedPersistentPutDir(PersistentPutDir persistentPutDir) {
522                 for (FcpListener fcpListener: fcpListeners) {
523                         fcpListener.receivedPersistentPutDir(this, persistentPutDir);
524                 }
525         }
526
527         /**
528          * Notifies all listeners that a “PersistentRequestRemoved” message was
529          * received.
530          *
531          * @see FcpListener#receivedPersistentRequestRemoved(FcpConnection,
532          *      PersistentRequestRemoved)
533          * @param persistentRequestRemoved
534          *            The “PersistentRequestRemoved” message
535          */
536         private void fireReceivedPersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) {
537                 for (FcpListener fcpListener: fcpListeners) {
538                         fcpListener.receivedPersistentRequestRemoved(this, persistentRequestRemoved);
539                 }
540         }
541
542         /**
543          * Notifies all listeners that a “SubscribedUSKUpdate” message was received.
544          *
545          * @see FcpListener#receivedSubscribedUSKUpdate(FcpConnection,
546          *      SubscribedUSKUpdate)
547          * @param subscribedUSKUpdate
548          *            The “SubscribedUSKUpdate” message
549          */
550         private void fireReceivedSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) {
551                 for (FcpListener fcpListener: fcpListeners) {
552                         fcpListener.receivedSubscribedUSKUpdate(this, subscribedUSKUpdate);
553                 }
554         }
555
556         /**
557          * Notifies all listeners that a “PluginInfo” message was received.
558          *
559          * @see FcpListener#receivedPluginInfo(FcpConnection, PluginInfo)
560          * @param pluginInfo
561          *            The “PluginInfo” message
562          */
563         private void fireReceivedPluginInfo(PluginInfo pluginInfo) {
564                 for (FcpListener fcpListener: fcpListeners) {
565                         fcpListener.receivedPluginInfo(this, pluginInfo);
566                 }
567         }
568
569         /**
570          * Notifies all listeners that an “FCPPluginReply” message was received.
571          *
572          * @see FcpListener#receivedFCPPluginReply(FcpConnection, FCPPluginReply)
573          * @param fcpPluginReply
574          *            The “FCPPluginReply” message
575          */
576         private void fireReceivedFCPPluginReply(FCPPluginReply fcpPluginReply) {
577                 for (FcpListener fcpListener: fcpListeners) {
578                         fcpListener.receivedFCPPluginReply(this, fcpPluginReply);
579                 }
580         }
581
582         /**
583          * Notifies all listeners that a “PersistentRequestModified” message was
584          * received.
585          *
586          * @see FcpListener#receivedPersistentRequestModified(FcpConnection,
587          *      PersistentRequestModified)
588          * @param persistentRequestModified
589          *            The “PersistentRequestModified” message
590          */
591         private void fireReceivedPersistentRequestModified(PersistentRequestModified persistentRequestModified) {
592                 for (FcpListener fcpListener: fcpListeners) {
593                         fcpListener.receivedPersistentRequestModified(this, persistentRequestModified);
594                 }
595         }
596
597         /**
598          * Notifies all listeners that a “PutSuccessful” message was received.
599          *
600          * @see FcpListener#receivedPutSuccessful(FcpConnection, PutSuccessful)
601          * @param putSuccessful
602          *            The “PutSuccessful” message
603          */
604         private void fireReceivedPutSuccessful(PutSuccessful putSuccessful) {
605                 for (FcpListener fcpListener: fcpListeners) {
606                         fcpListener.receivedPutSuccessful(this, putSuccessful);
607                 }
608         }
609
610         /**
611          * Notifies all listeners that a “PutFetchable” message was received.
612          *
613          * @see FcpListener#receivedPutFetchable(FcpConnection, PutFetchable)
614          * @param putFetchable
615          *            The “PutFetchable” message
616          */
617         private void fireReceivedPutFetchable(PutFetchable putFetchable) {
618                 for (FcpListener fcpListener: fcpListeners) {
619                         fcpListener.receivedPutFetchable(this, putFetchable);
620                 }
621         }
622
623         /**
624          * Notifies all listeners that a “ProtocolError” message was received.
625          *
626          * @see FcpListener#receivedProtocolError(FcpConnection, ProtocolError)
627          * @param protocolError
628          *            The “ProtocolError” message
629          */
630         private void fireReceivedProtocolError(ProtocolError protocolError) {
631                 for (FcpListener fcpListener: fcpListeners) {
632                         fcpListener.receivedProtocolError(this, protocolError);
633                 }
634         }
635
636         /**
637          * Notifies all registered listeners that a message has been received.
638          *
639          * @see FcpListener#receivedMessage(FcpConnection, FcpMessage)
640          * @param fcpMessage
641          *            The message that was received
642          */
643         private void fireMessageReceived(FcpMessage fcpMessage) {
644                 for (FcpListener fcpListener: fcpListeners) {
645                         fcpListener.receivedMessage(this, fcpMessage);
646                 }
647         }
648
649         /**
650          * Notifies all listeners that the connection to the node was closed.
651          *
652          * @see FcpListener#connectionClosed(FcpConnection)
653          */
654         private void fireConnectionClosed() {
655                 for (FcpListener fcpListener: fcpListeners) {
656                         fcpListener.connectionClosed(this);
657                 }
658         }
659
660         //
661         // ACTIONS
662         //
663
664         /**
665          * Connects to the node.
666          *
667          * @throws IOException
668          *             if an I/O error occurs
669          * @throws IllegalStateException
670          *             if there is already a connection to the node
671          */
672         public synchronized void connect() throws IOException, IllegalStateException {
673                 if (connectionHandler != null) {
674                         throw new IllegalStateException("already connected, disconnect first");
675                 }
676                 logger.info("connecting to " + address + ":" + port + "…");
677                 remoteSocket = new Socket(address, port);
678                 remoteInputStream = remoteSocket.getInputStream();
679                 remoteOutputStream = remoteSocket.getOutputStream();
680                 new Thread(connectionHandler = new FcpConnectionHandler(this, remoteInputStream)).start();
681         }
682
683         /**
684          * Disconnects from the node. If there is no connection to the node, this
685          * method does nothing.
686          *
687          * @deprecated Use {@link #close()} instead
688          */
689         @Deprecated
690         public synchronized void disconnect() {
691                 close();
692         }
693
694         /**
695          * Closes the connection. If there is no connection to the node, this method
696          * does nothing.
697          */
698         public void close() {
699                 if (connectionHandler == null) {
700                         return;
701                 }
702                 logger.info("disconnecting…");
703                 FcpUtils.close(remoteSocket);
704                 connectionHandler.stop();
705                 connectionHandler = null;
706                 fireConnectionClosed();
707         }
708
709         /**
710          * Sends the given FCP message.
711          *
712          * @param fcpMessage
713          *            The FCP message to send
714          * @throws IOException
715          *             if an I/O error occurs
716          */
717         public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException {
718                 logger.fine("sending message: " + fcpMessage.getName());
719                 fcpMessage.write(remoteOutputStream);
720         }
721
722         //
723         // PACKAGE-PRIVATE METHODS
724         //
725
726         /**
727          * Handles the given message, notifying listeners. This message should only
728          * be called by {@link FcpConnectionHandler}.
729          *
730          * @param fcpMessage
731          *            The received message
732          */
733         void handleMessage(FcpMessage fcpMessage) {
734                 logger.fine("received message: " + fcpMessage.getName());
735                 String messageName = fcpMessage.getName();
736                 countMessage(messageName);
737                 if ("SimpleProgress".equals(messageName)) {
738                         fireReceivedSimpleProgress(new SimpleProgress(fcpMessage));
739                 } else if ("ProtocolError".equals(messageName)) {
740                         fireReceivedProtocolError(new ProtocolError(fcpMessage));
741                 } else if ("PersistentGet".equals(messageName)) {
742                         fireReceivedPersistentGet(new PersistentGet(fcpMessage));
743                 } else if ("PersistentPut".equals(messageName)) {
744                         fireReceivedPersistentPut(new PersistentPut(fcpMessage));
745                 } else if ("PersistentPutDir".equals(messageName)) {
746                         fireReceivedPersistentPutDir(new PersistentPutDir(fcpMessage));
747                 } else if ("URIGenerated".equals(messageName)) {
748                         fireReceivedURIGenerated(new URIGenerated(fcpMessage));
749                 } else if ("EndListPersistentRequests".equals(messageName)) {
750                         fireReceivedEndListPersistentRequests(new EndListPersistentRequests(fcpMessage));
751                 } else if ("Peer".equals(messageName)) {
752                         fireReceivedPeer(new Peer(fcpMessage));
753                 } else if ("PeerNote".equals(messageName)) {
754                         fireReceivedPeerNote(new PeerNote(fcpMessage));
755                 } else if ("StartedCompression".equals(messageName)) {
756                         fireReceivedStartedCompression(new StartedCompression(fcpMessage));
757                 } else if ("FinishedCompression".equals(messageName)) {
758                         fireReceivedFinishedCompression(new FinishedCompression(fcpMessage));
759                 } else if ("GetFailed".equals(messageName)) {
760                         fireReceivedGetFailed(new GetFailed(fcpMessage));
761                 } else if ("PutFetchable".equals(messageName)) {
762                         fireReceivedPutFetchable(new PutFetchable(fcpMessage));
763                 } else if ("PutSuccessful".equals(messageName)) {
764                         fireReceivedPutSuccessful(new PutSuccessful(fcpMessage));
765                 } else if ("PutFailed".equals(messageName)) {
766                         fireReceivedPutFailed(new PutFailed(fcpMessage));
767                 } else if ("DataFound".equals(messageName)) {
768                         fireReceivedDataFound(new DataFound(fcpMessage));
769                 } else if ("SubscribedUSKUpdate".equals(messageName)) {
770                         fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage));
771                 } else if ("IdentifierCollision".equals(messageName)) {
772                         fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage));
773                 } else if ("AllData".equals(messageName)) {
774                         LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
775                         fireReceivedAllData(new AllData(fcpMessage, payloadInputStream));
776                         try {
777                                 payloadInputStream.consume();
778                         } catch (IOException ioe1) {
779                                 /* well, ignore. when the connection handler fails, all fails. */
780                         }
781                 } else if ("EndListPeerNotes".equals(messageName)) {
782                         fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage));
783                 } else if ("EndListPeers".equals(messageName)) {
784                         fireReceivedEndListPeers(new EndListPeers(fcpMessage));
785                 } else if ("SSKKeypair".equals(messageName)) {
786                         fireReceivedSSKKeypair(new SSKKeypair(fcpMessage));
787                 } else if ("PeerRemoved".equals(messageName)) {
788                         fireReceivedPeerRemoved(new PeerRemoved(fcpMessage));
789                 } else if ("PersistentRequestModified".equals(messageName)) {
790                         fireReceivedPersistentRequestModified(new PersistentRequestModified(fcpMessage));
791                 } else if ("PersistentRequestRemoved".equals(messageName)) {
792                         fireReceivedPersistentRequestRemoved(new PersistentRequestRemoved(fcpMessage));
793                 } else if ("UnknownPeerNoteType".equals(messageName)) {
794                         fireReceivedUnknownPeerNoteType(new UnknownPeerNoteType(fcpMessage));
795                 } else if ("UnknownNodeIdentifier".equals(messageName)) {
796                         fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage));
797                 } else if ("FCPPluginReply".equals(messageName)) {
798                         LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
799                         fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream));
800                         try {
801                                 payloadInputStream.consume();
802                         } catch (IOException ioe1) {
803                                 /* ignore. */
804                         }
805                 } else if ("PluginInfo".equals(messageName)) {
806                         fireReceivedPluginInfo(new PluginInfo(fcpMessage));
807                 } else if ("NodeData".equals(messageName)) {
808                         fireReceivedNodeData(new NodeData(fcpMessage));
809                 } else if ("TestDDAReply".equals(messageName)) {
810                         fireReceivedTestDDAReply(new TestDDAReply(fcpMessage));
811                 } else if ("TestDDAComplete".equals(messageName)) {
812                         fireReceivedTestDDAComplete(new TestDDAComplete(fcpMessage));
813                 } else if ("ConfigData".equals(messageName)) {
814                         fireReceivedConfigData(new ConfigData(fcpMessage));
815                 } else if ("NodeHello".equals(messageName)) {
816                         fireReceivedNodeHello(new NodeHello(fcpMessage));
817                 } else if ("CloseConnectionDuplicateClientName".equals(messageName)) {
818                         fireReceivedCloseConnectionDuplicateClientName(new CloseConnectionDuplicateClientName(fcpMessage));
819                 } else {
820                         fireMessageReceived(fcpMessage);
821                 }
822         }
823
824         /**
825          * Handles a disconnect from the node.
826          */
827         synchronized void handleDisconnect() {
828                 FcpUtils.close(remoteInputStream);
829                 FcpUtils.close(remoteOutputStream);
830                 FcpUtils.close(remoteSocket);
831                 connectionHandler = null;
832                 fireConnectionClosed();
833         }
834
835         //
836         // PRIVATE METHODS
837         //
838
839         /**
840          * Incremets the counter in {@link #incomingMessageStatistics} by <cod>1</code>
841          * for the given message name.
842          *
843          * @param name
844          *            The name of the message to count
845          */
846         private void countMessage(String name) {
847                 int oldValue = 0;
848                 if (incomingMessageStatistics.containsKey(name)) {
849                         oldValue = incomingMessageStatistics.get(name);
850                 }
851                 incomingMessageStatistics.put(name, oldValue + 1);
852                 logger.finest("count for " + name + ": " + (oldValue + 1));
853         }
854
855         /**
856          * Returns a limited input stream from the node’s input stream.
857          *
858          * @param dataLength
859          *            The length of the stream
860          * @return The limited input stream
861          */
862         private LimitedInputStream getInputStream(long dataLength) {
863                 if (dataLength <= 0) {
864                         return new LimitedInputStream(null, 0);
865                 }
866                 return new LimitedInputStream(remoteInputStream, dataLength);
867         }
868
869         /**
870          * A wrapper around an {@link InputStream} that only supplies a limit number
871          * of bytes from the underlying input stream.
872          *
873          * @author <a href="mailto:dr@ina-germany.de">David Roden</a>
874          * @version $Id$
875          */
876         private static class LimitedInputStream extends FilterInputStream {
877
878                 /** The remaining number of bytes that can be read. */
879                 private long remaining;
880
881                 /**
882                  * Creates a new LimitedInputStream that supplies at most
883                  * <code>length</code> bytes from the given input stream.
884                  *
885                  * @param inputStream
886                  *            The input stream
887                  * @param length
888                  *            The number of bytes to read
889                  */
890                 public LimitedInputStream(InputStream inputStream, long length) {
891                         super(inputStream);
892                         remaining = length;
893                 }
894
895                 /**
896                  * @see java.io.FilterInputStream#available()
897                  */
898                 @Override
899                 public synchronized int available() throws IOException {
900                         if (remaining == 0) {
901                                 return 0;
902                         }
903                         return (int) Math.min(super.available(), Math.min(Integer.MAX_VALUE, remaining));
904                 }
905
906                 /**
907                  * @see java.io.FilterInputStream#read()
908                  */
909                 @Override
910                 public synchronized int read() throws IOException {
911                         int read = -1;
912                         if (remaining > 0) {
913                                 read = super.read();
914                                 remaining--;
915                         }
916                         return read;
917                 }
918
919                 /**
920                  * @see java.io.FilterInputStream#read(byte[], int, int)
921                  */
922                 @Override
923                 public synchronized int read(byte[] b, int off, int len) throws IOException {
924                         if (remaining == 0) {
925                                 return -1;
926                         }
927                         int toCopy = (int) Math.min(len, Math.min(remaining, Integer.MAX_VALUE));
928                         int read = super.read(b, off, toCopy);
929                         remaining -= read;
930                         return read;
931                 }
932
933                 /**
934                  * @see java.io.FilterInputStream#skip(long)
935                  */
936                 @Override
937                 public synchronized long skip(long n) throws IOException {
938                         if ((n < 0) || (remaining == 0)) {
939                                 return 0;
940                         }
941                         long skipped = super.skip(Math.min(n, remaining));
942                         remaining -= skipped;
943                         return skipped;
944                 }
945
946                 /**
947                  * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
948                  * {@link #reset()} are not supported.
949                  *
950                  * @see java.io.FilterInputStream#mark(int)
951                  */
952                 @Override
953                 public void mark(int readlimit) {
954                         /* do nothing. */
955                 }
956
957                 /**
958                  * {@inheritDoc}
959                  *
960                  * @see java.io.FilterInputStream#markSupported()
961                  * @return <code>false</code>
962                  */
963                 @Override
964                 public boolean markSupported() {
965                         return false;
966                 }
967
968                 /**
969                  * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
970                  * {@link #reset()} are not supported.
971                  *
972                  * @see java.io.FilterInputStream#reset()
973                  */
974                 @Override
975                 public void reset() throws IOException {
976                         /* do nothing. */
977                 }
978
979                 /**
980                  * Consumes the input stream, i.e. read all bytes until the limit is
981                  * reached.
982                  *
983                  * @throws IOException
984                  *             if an I/O error occurs
985                  */
986                 public void consume() throws IOException {
987                         while (remaining > 0) {
988                                 skip(remaining);
989                         }
990                 }
991
992         }
993
994 }