2 * jSite2 - FpcConnection.java -
3 * Copyright © 2008 David Roden
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
20 package net.pterodactylus.fcp;
22 import java.io.FilterInputStream;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.OutputStream;
26 import java.net.InetAddress;
27 import java.net.Socket;
28 import java.net.UnknownHostException;
29 import java.util.ArrayList;
30 import java.util.Collections;
31 import java.util.HashMap;
32 import java.util.List;
36 * An FCP connection to a Freenet node.
38 * @author David ‘Bombe’ Roden <bombe@freenetproject.org>
41 public class FcpConnection {
43 /** The default port for FCP v2. */
44 public static final int DEFAULT_PORT = 9481;
46 /** The list of FCP listeners. */
47 private final List<FcpListener> fcpListeners = new ArrayList<FcpListener>();
49 /** The address of the node. */
50 private final InetAddress address;
52 /** The port number of the node’s FCP port. */
53 private final int port;
55 /** The remote socket. */
56 private Socket remoteSocket;
58 /** The input stream from the node. */
59 private InputStream remoteInputStream;
61 /** The output stream to the node. */
62 private OutputStream remoteOutputStream;
64 /** The connection handler. */
65 private FcpConnectionHandler connectionHandler;
67 /** Incoming message statistics. */
68 private Map<String, Integer> incomingMessageStatistics = Collections.synchronizedMap(new HashMap<String, Integer>());
71 * Creates a new FCP connection to the freenet node running on localhost,
72 * using the default port.
74 * @throws UnknownHostException
75 * if the hostname can not be resolved
77 public FcpConnection() throws UnknownHostException {
78 this(InetAddress.getLocalHost());
82 * Creates a new FCP connection to the Freenet node running on the given
83 * host, listening on the default port.
86 * The hostname of the Freenet node
87 * @throws UnknownHostException
88 * if <code>host</code> can not be resolved
90 public FcpConnection(String host) throws UnknownHostException {
91 this(host, DEFAULT_PORT);
95 * Creates a new FCP connection to the Freenet node running on the given
96 * host, listening on the given port.
99 * The hostname of the Freenet node
101 * The port number of the node’s FCP port
102 * @throws UnknownHostException
103 * if <code>host</code> can not be resolved
105 public FcpConnection(String host, int port) throws UnknownHostException {
106 this(InetAddress.getByName(host), port);
110 * Creates a new FCP connection to the Freenet node running at the given
111 * address, listening on the default port.
114 * The address of the Freenet node
116 public FcpConnection(InetAddress address) {
117 this(address, DEFAULT_PORT);
121 * Creates a new FCP connection to the Freenet node running at the given
122 * address, listening on the given port.
125 * The address of the Freenet node
127 * The port number of the node’s FCP port
129 public FcpConnection(InetAddress address, int port) {
130 this.address = address;
135 // LISTENER MANAGEMENT
139 * Adds the given listener to the list of listeners.
142 * The listener to add
144 public void addFcpListener(FcpListener fcpListener) {
145 fcpListeners.add(fcpListener);
149 * Removes the given listener from the list of listeners.
152 * The listener to remove
154 public void removeFcpListener(FcpListener fcpListener) {
155 fcpListeners.remove(fcpListener);
159 * Notifies listeners that a “NodeHello” message was received.
161 * @see FcpListener#receivedNodeHello(FcpConnection, NodeHello)
163 * The “NodeHello” message
165 private void fireReceivedNodeHello(NodeHello nodeHello) {
166 for (FcpListener fcpListener: fcpListeners) {
167 fcpListener.receivedNodeHello(this, nodeHello);
172 * Notifies listeners that a “CloseConnectionDuplicateClientName” message
175 * @see FcpListener#receivedCloseConnectionDuplicateClientName(FcpConnection,
176 * CloseConnectionDuplicateClientName)
177 * @param closeConnectionDuplicateClientName
178 * The “CloseConnectionDuplicateClientName” message
180 private void fireReceivedCloseConnectionDuplicateClientName(CloseConnectionDuplicateClientName closeConnectionDuplicateClientName) {
181 for (FcpListener fcpListener: fcpListeners) {
182 fcpListener.receivedCloseConnectionDuplicateClientName(this, closeConnectionDuplicateClientName);
187 * Notifies listeners that a “SSKKeypair” message was received.
189 * @see FcpListener#receivedSSKKeypair(FcpConnection, SSKKeypair)
191 * The “SSKKeypair” message
193 private void fireReceivedSSKKeypair(SSKKeypair sskKeypair) {
194 for (FcpListener fcpListener: fcpListeners) {
195 fcpListener.receivedSSKKeypair(this, sskKeypair);
200 * Notifies listeners that a “Peer” message was received.
202 * @see FcpListener#receivedPeer(FcpConnection, Peer)
206 private void fireReceivedPeer(Peer peer) {
207 for (FcpListener fcpListener: fcpListeners) {
208 fcpListener.receivedPeer(this, peer);
213 * Notifies all listeners that an “EndListPeers” message was received.
215 * @see FcpListener#receivedEndListPeers(FcpConnection, EndListPeers)
216 * @param endListPeers
217 * The “EndListPeers” message
219 private void fireReceivedEndListPeers(EndListPeers endListPeers) {
220 for (FcpListener fcpListener: fcpListeners) {
221 fcpListener.receivedEndListPeers(this, endListPeers);
226 * Notifies all listeners that a “PeerNote” message was received.
228 * @see FcpListener#receivedPeerNote(FcpConnection, PeerNote)
231 private void fireReceivedPeerNote(PeerNote peerNote) {
232 for (FcpListener fcpListener: fcpListeners) {
233 fcpListener.receivedPeerNote(this, peerNote);
238 * Notifies all listeners that an “EndListPeerNotes” message was received.
240 * @see FcpListener#receivedEndListPeerNotes(FcpConnection,
242 * @param endListPeerNotes
243 * The “EndListPeerNotes” message
245 private void fireReceivedEndListPeerNotes(EndListPeerNotes endListPeerNotes) {
246 for (FcpListener fcpListener: fcpListeners) {
247 fcpListener.receivedEndListPeerNotes(this, endListPeerNotes);
252 * Notifies all listeners that a “PeerRemoved” message was received.
254 * @see FcpListener#receivedPeerRemoved(FcpConnection, PeerRemoved)
256 * The “PeerRemoved” message
258 private void fireReceivedPeerRemoved(PeerRemoved peerRemoved) {
259 for (FcpListener fcpListener: fcpListeners) {
260 fcpListener.receivedPeerRemoved(this, peerRemoved);
265 * Notifies all listeners that a “NodeData” message was received.
267 * @see FcpListener#receivedNodeData(FcpConnection, NodeData)
269 * The “NodeData” message
271 private void fireReceivedNodeData(NodeData nodeData) {
272 for (FcpListener fcpListener: fcpListeners) {
273 fcpListener.receivedNodeData(this, nodeData);
278 * Notifies all listeners that a “TestDDAReply” message was received.
280 * @see FcpListener#receivedTestDDAReply(FcpConnection, TestDDAReply)
281 * @param testDDAReply
282 * The “TestDDAReply” message
284 private void fireReceivedTestDDAReply(TestDDAReply testDDAReply) {
285 for (FcpListener fcpListener: fcpListeners) {
286 fcpListener.receivedTestDDAReply(this, testDDAReply);
291 * Notifies all listeners that a “TestDDAComplete” message was received.
293 * @see FcpListener#receivedTestDDAComplete(FcpConnection, TestDDAComplete)
294 * @param testDDAComplete
295 * The “TestDDAComplete” message
297 private void fireReceivedTestDDAComplete(TestDDAComplete testDDAComplete) {
298 for (FcpListener fcpListener: fcpListeners) {
299 fcpListener.receivedTestDDAComplete(this, testDDAComplete);
304 * Notifies all listeners that a “PersistentGet” message was received.
306 * @see FcpListener#receivedPersistentGet(FcpConnection, PersistentGet)
307 * @param persistentGet
308 * The “PersistentGet” message
310 private void fireReceivedPersistentGet(PersistentGet persistentGet) {
311 for (FcpListener fcpListener: fcpListeners) {
312 fcpListener.receivedPersistentGet(this, persistentGet);
317 * Notifies all listeners that a “PersistentPut” message was received.
319 * @see FcpListener#receivedPersistentPut(FcpConnection, PersistentPut)
320 * @param persistentPut
321 * The “PersistentPut” message
323 private void fireReceivedPersistentPut(PersistentPut persistentPut) {
324 for (FcpListener fcpListener: fcpListeners) {
325 fcpListener.receivedPersistentPut(this, persistentPut);
330 * Notifies all listeners that a “EndListPersistentRequests” message was
333 * @see FcpListener#receivedEndListPersistentRequests(FcpConnection,
334 * EndListPersistentRequests)
335 * @param endListPersistentRequests
336 * The “EndListPersistentRequests” message
338 private void fireReceivedEndListPersistentRequests(EndListPersistentRequests endListPersistentRequests) {
339 for (FcpListener fcpListener: fcpListeners) {
340 fcpListener.receivedEndListPersistentRequests(this, endListPersistentRequests);
345 * Notifies all listeners that a “URIGenerated” message was received.
347 * @see FcpListener#receivedURIGenerated(FcpConnection, URIGenerated)
348 * @param uriGenerated
349 * The “URIGenerated” message
351 private void fireReceivedURIGenerated(URIGenerated uriGenerated) {
352 for (FcpListener fcpListener: fcpListeners) {
353 fcpListener.receivedURIGenerated(this, uriGenerated);
358 * Notifies all listeners that a “DataFound” message was received.
360 * @see FcpListener#receivedDataFound(FcpConnection, DataFound)
362 * The “DataFound” message
364 private void fireReceivedDataFound(DataFound dataFound) {
365 for (FcpListener fcpListener: fcpListeners) {
366 fcpListener.receivedDataFound(this, dataFound);
371 * Notifies all listeners that an “AllData” message was received.
373 * @see FcpListener#receivedAllData(FcpConnection, AllData)
375 * The “AllData” message
377 private void fireReceivedAllData(AllData allData) {
378 for (FcpListener fcpListener: fcpListeners) {
379 fcpListener.receivedAllData(this, allData);
384 * Notifies all listeners that a “SimpleProgress” message was received.
386 * @see FcpListener#receivedSimpleProgress(FcpConnection, SimpleProgress)
387 * @param simpleProgress
388 * The “SimpleProgress” message
390 private void fireReceivedSimpleProgress(SimpleProgress simpleProgress) {
391 for (FcpListener fcpListener: fcpListeners) {
392 fcpListener.receivedSimpleProgress(this, simpleProgress);
397 * Notifies all listeners that a “StartedCompression” message was received.
399 * @see FcpListener#receivedStartedCompression(FcpConnection,
400 * StartedCompression)
401 * @param startedCompression
402 * The “StartedCompression” message
404 private void fireReceivedStartedCompression(StartedCompression startedCompression) {
405 for (FcpListener fcpListener: fcpListeners) {
406 fcpListener.receivedStartedCompression(this, startedCompression);
411 * Notifies all listeners that a “FinishedCompression” message was received.
413 * @see FcpListener#receviedFinishedCompression(FcpConnection,
414 * FinishedCompression)
415 * @param finishedCompression
416 * The “FinishedCompression” message
418 private void fireReceivedFinishedCompression(FinishedCompression finishedCompression) {
419 for (FcpListener fcpListener: fcpListeners) {
420 fcpListener.receviedFinishedCompression(this, finishedCompression);
425 * Notifies all listeners that an “UnknownPeerNoteType” message was
428 * @see FcpListener#receivedUnknownPeerNoteType(FcpConnection,
429 * UnknownPeerNoteType)
430 * @param unknownPeerNoteType
431 * The “UnknownPeerNoteType” message
433 private void fireReceivedUnknownPeerNoteType(UnknownPeerNoteType unknownPeerNoteType) {
434 for (FcpListener fcpListener: fcpListeners) {
435 fcpListener.receivedUnknownPeerNoteType(this, unknownPeerNoteType);
440 * Notifies all listeners that an “UnknownNodeIdentifier” message was
443 * @see FcpListener#receivedUnknownNodeIdentifier(FcpConnection,
444 * UnknownNodeIdentifier)
445 * @param unknownNodeIdentifier
446 * The “UnknownNodeIdentifier” message
448 private void fireReceivedUnknownNodeIdentifier(UnknownNodeIdentifier unknownNodeIdentifier) {
449 for (FcpListener fcpListener: fcpListeners) {
450 fcpListener.receivedUnknownNodeIdentifier(this, unknownNodeIdentifier);
455 * Notifies all listeners that a “ConfigData” message was received.
457 * @see FcpListener#receivedConfigData(FcpConnection, ConfigData)
459 * The “ConfigData” message
461 private void fireReceivedConfigData(ConfigData configData) {
462 for (FcpListener fcpListener: fcpListeners) {
463 fcpListener.receivedConfigData(this, configData);
468 * Notifies all listeners that a “GetFailed” message was received.
470 * @see FcpListener#receivedGetFailed(FcpConnection, GetFailed)
472 * The “GetFailed” message
474 private void fireReceivedGetFailed(GetFailed getFailed) {
475 for (FcpListener fcpListener: fcpListeners) {
476 fcpListener.receivedGetFailed(this, getFailed);
481 * Notifies all listeners that a “PutFailed” message was received.
483 * @see FcpListener#receivedPutFailed(FcpConnection, PutFailed)
485 * The “PutFailed” message
487 private void fireReceivedPutFailed(PutFailed putFailed) {
488 for (FcpListener fcpListener: fcpListeners) {
489 fcpListener.receivedPutFailed(this, putFailed);
494 * Notifies all listeners that an “IdentifierCollision” message was
497 * @see FcpListener#receivedIdentifierCollision(FcpConnection,
498 * IdentifierCollision)
499 * @param identifierCollision
500 * The “IdentifierCollision” message
502 private void fireReceivedIdentifierCollision(IdentifierCollision identifierCollision) {
503 for (FcpListener fcpListener: fcpListeners) {
504 fcpListener.receivedIdentifierCollision(this, identifierCollision);
509 * Notifies all listeners that an “PersistentPutDir” message was received.
511 * @see FcpListener#receivedPersistentPutDir(FcpConnection,
513 * @param persistentPutDir
514 * The “PersistentPutDir” message
516 private void fireReceivedPersistentPutDir(PersistentPutDir persistentPutDir) {
517 for (FcpListener fcpListener: fcpListeners) {
518 fcpListener.receivedPersistentPutDir(this, persistentPutDir);
523 * Notifies all listeners that a “PersistentRequestRemoved” message was
526 * @see FcpListener#receivedPersistentRequestRemoved(FcpConnection,
527 * PersistentRequestRemoved)
528 * @param persistentRequestRemoved
529 * The “PersistentRequestRemoved” message
531 private void fireReceivedPersistentRequestRemoved(PersistentRequestRemoved persistentRequestRemoved) {
532 for (FcpListener fcpListener: fcpListeners) {
533 fcpListener.receivedPersistentRequestRemoved(this, persistentRequestRemoved);
538 * Notifies all listeners that a “SubscribedUSKUpdate” message was received.
540 * @see FcpListener#receivedSubscribedUSKUpdate(FcpConnection,
541 * SubscribedUSKUpdate)
542 * @param subscribedUSKUpdate
543 * The “SubscribedUSKUpdate” message
545 private void fireReceivedSubscribedUSKUpdate(SubscribedUSKUpdate subscribedUSKUpdate) {
546 for (FcpListener fcpListener: fcpListeners) {
547 fcpListener.receivedSubscribedUSKUpdate(this, subscribedUSKUpdate);
552 * Notifies all listeners that a “PluginInfo” message was received.
554 * @see FcpListener#receivedPluginInfo(FcpConnection, PluginInfo)
556 * The “PluginInfo” message
558 private void fireReceivedPluginInfo(PluginInfo pluginInfo) {
559 for (FcpListener fcpListener: fcpListeners) {
560 fcpListener.receivedPluginInfo(this, pluginInfo);
565 * Notifies all listeners that an “FCPPluginReply” message was received.
567 * @see FcpListener#receivedFCPPluginReply(FcpConnection, FCPPluginReply)
568 * @param fcpPluginReply
569 * The “FCPPluginReply” message
571 private void fireReceivedFCPPluginReply(FCPPluginReply fcpPluginReply) {
572 for (FcpListener fcpListener: fcpListeners) {
573 fcpListener.receivedFCPPluginReply(this, fcpPluginReply);
578 * Notifies all listeners that a “PersistentRequestModified” message was
581 * @see FcpListener#receivedPersistentRequestModified(FcpConnection,
582 * PersistentRequestModified)
583 * @param persistentRequestModified
584 * The “PersistentRequestModified” message
586 private void fireReceivedPersistentRequestModified(PersistentRequestModified persistentRequestModified) {
587 for (FcpListener fcpListener: fcpListeners) {
588 fcpListener.receivedPersistentRequestModified(this, persistentRequestModified);
593 * Notifies all listeners that a “PutSuccessful” message was received.
595 * @see FcpListener#receivedPutSuccessful(FcpConnection, PutSuccessful)
596 * @param putSuccessful
597 * The “PutSuccessful” message
599 private void fireReceivedPutSuccessful(PutSuccessful putSuccessful) {
600 for (FcpListener fcpListener: fcpListeners) {
601 fcpListener.receivedPutSuccessful(this, putSuccessful);
606 * Notifies all listeners that a “PutFetchable” message was received.
608 * @see FcpListener#receivedPutFetchable(FcpConnection, PutFetchable)
609 * @param putFetchable
610 * The “PutFetchable” message
612 private void fireReceivedPutFetchable(PutFetchable putFetchable) {
613 for (FcpListener fcpListener: fcpListeners) {
614 fcpListener.receivedPutFetchable(this, putFetchable);
619 * Notifies all listeners that a “ProtocolError” message was received.
621 * @see FcpListener#receivedProtocolError(FcpConnection, ProtocolError)
622 * @param protocolError
623 * The “ProtocolError” message
625 private void fireReceivedProtocolError(ProtocolError protocolError) {
626 for (FcpListener fcpListener: fcpListeners) {
627 fcpListener.receivedProtocolError(this, protocolError);
632 * Notifies all registered listeners that a message has been received.
634 * @see FcpListener#receivedMessage(FcpConnection, FcpMessage)
636 * The message that was received
638 private void fireMessageReceived(FcpMessage fcpMessage) {
639 for (FcpListener fcpListener: fcpListeners) {
640 fcpListener.receivedMessage(this, fcpMessage);
645 * Notifies all listeners that the connection to the node was closed.
647 * @see FcpListener#connectionClosed(FcpConnection)
649 private void fireConnectionClosed() {
650 for (FcpListener fcpListener: fcpListeners) {
651 fcpListener.connectionClosed(this);
660 * Connects to the node.
662 * @throws IOException
663 * if an I/O error occurs
664 * @throws IllegalStateException
665 * if there is already a connection to the node
667 public synchronized void connect() throws IOException, IllegalStateException {
668 if (connectionHandler != null) {
669 throw new IllegalStateException("already connected, disconnect first");
671 remoteSocket = new Socket(address, port);
672 remoteInputStream = remoteSocket.getInputStream();
673 remoteOutputStream = remoteSocket.getOutputStream();
674 new Thread(connectionHandler = new FcpConnectionHandler(this, remoteInputStream)).start();
678 * Disconnects from the node. If there is no connection to the node, this
679 * method does nothing.
681 public synchronized void disconnect() {
682 if (connectionHandler == null) {
685 FcpUtils.close(remoteSocket);
686 connectionHandler.stop();
687 connectionHandler = null;
688 fireConnectionClosed();
692 * Sends the given FCP message.
695 * The FCP message to send
696 * @throws IOException
697 * if an I/O error occurs
699 public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException {
700 System.out.println("sending message: " + fcpMessage.getName());
701 fcpMessage.write(remoteOutputStream);
705 // PACKAGE-PRIVATE METHODS
709 * Handles the given message, notifying listeners. This message should only
710 * be called by {@link FcpConnectionHandler}.
713 * The received message
715 void handleMessage(FcpMessage fcpMessage) {
716 String messageName = fcpMessage.getName();
717 countMessage(messageName);
718 if ("SimpleProgress".equals(messageName)) {
719 fireReceivedSimpleProgress(new SimpleProgress(fcpMessage));
720 } else if ("ProtocolError".equals(messageName)) {
721 fireReceivedProtocolError(new ProtocolError(fcpMessage));
722 } else if ("PersistentGet".equals(messageName)) {
723 fireReceivedPersistentGet(new PersistentGet(fcpMessage));
724 } else if ("PersistentPut".equals(messageName)) {
725 fireReceivedPersistentPut(new PersistentPut(fcpMessage));
726 } else if ("PersistentPutDir".equals(messageName)) {
727 fireReceivedPersistentPutDir(new PersistentPutDir(fcpMessage));
728 } else if ("URIGenerated".equals(messageName)) {
729 fireReceivedURIGenerated(new URIGenerated(fcpMessage));
730 } else if ("EndListPersistentRequests".equals(messageName)) {
731 fireReceivedEndListPersistentRequests(new EndListPersistentRequests(fcpMessage));
732 } else if ("Peer".equals(messageName)) {
733 fireReceivedPeer(new Peer(fcpMessage));
734 } else if ("PeerNote".equals(messageName)) {
735 fireReceivedPeerNote(new PeerNote(fcpMessage));
736 } else if ("StartedCompression".equals(messageName)) {
737 fireReceivedStartedCompression(new StartedCompression(fcpMessage));
738 } else if ("FinishedCompression".equals(messageName)) {
739 fireReceivedFinishedCompression(new FinishedCompression(fcpMessage));
740 } else if ("GetFailed".equals(messageName)) {
741 fireReceivedGetFailed(new GetFailed(fcpMessage));
742 } else if ("PutFetchable".equals(messageName)) {
743 fireReceivedPutFetchable(new PutFetchable(fcpMessage));
744 } else if ("PutSuccessful".equals(messageName)) {
745 fireReceivedPutSuccessful(new PutSuccessful(fcpMessage));
746 } else if ("PutFailed".equals(messageName)) {
747 fireReceivedPutFailed(new PutFailed(fcpMessage));
748 } else if ("DataFound".equals(messageName)) {
749 fireReceivedDataFound(new DataFound(fcpMessage));
750 } else if ("SubscribedUSKUpdate".equals(messageName)) {
751 fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage));
752 } else if ("IdentifierCollision".equals(messageName)) {
753 fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage));
754 } else if ("AllData".equals(messageName)) {
755 LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
756 fireReceivedAllData(new AllData(fcpMessage, payloadInputStream));
758 payloadInputStream.consume();
759 } catch (IOException ioe1) {
760 /* well, ignore. when the connection handler fails, all fails. */
762 } else if ("EndListPeerNotes".equals(messageName)) {
763 fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage));
764 } else if ("EndListPeers".equals(messageName)) {
765 fireReceivedEndListPeers(new EndListPeers(fcpMessage));
766 } else if ("SSKKeypair".equals(messageName)) {
767 fireReceivedSSKKeypair(new SSKKeypair(fcpMessage));
768 } else if ("PeerRemoved".equals(messageName)) {
769 fireReceivedPeerRemoved(new PeerRemoved(fcpMessage));
770 } else if ("PersistentRequestModified".equals(messageName)) {
771 fireReceivedPersistentRequestModified(new PersistentRequestModified(fcpMessage));
772 } else if ("PersistentRequestRemoved".equals(messageName)) {
773 fireReceivedPersistentRequestRemoved(new PersistentRequestRemoved(fcpMessage));
774 } else if ("UnknownPeerNoteType".equals(messageName)) {
775 fireReceivedUnknownPeerNoteType(new UnknownPeerNoteType(fcpMessage));
776 } else if ("UnknownNodeIdentifier".equals(messageName)) {
777 fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage));
778 } else if ("FCPPluginReply".equals(messageName)) {
779 LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
780 fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream));
782 payloadInputStream.consume();
783 } catch (IOException ioe1) {
786 } else if ("PluginInfo".equals(messageName)) {
787 fireReceivedPluginInfo(new PluginInfo(fcpMessage));
788 } else if ("NodeData".equals(messageName)) {
789 fireReceivedNodeData(new NodeData(fcpMessage));
790 } else if ("TestDDAReply".equals(messageName)) {
791 fireReceivedTestDDAReply(new TestDDAReply(fcpMessage));
792 } else if ("TestDDAComplete".equals(messageName)) {
793 fireReceivedTestDDAComplete(new TestDDAComplete(fcpMessage));
794 } else if ("ConfigData".equals(messageName)) {
795 fireReceivedConfigData(new ConfigData(fcpMessage));
796 } else if ("NodeHello".equals(messageName)) {
797 fireReceivedNodeHello(new NodeHello(fcpMessage));
798 } else if ("CloseConnectionDuplicateClientName".equals(messageName)) {
799 fireReceivedCloseConnectionDuplicateClientName(new CloseConnectionDuplicateClientName(fcpMessage));
801 fireMessageReceived(fcpMessage);
806 * Handles a disconnect from the node.
808 synchronized void handleDisconnect() {
809 FcpUtils.close(remoteInputStream);
810 FcpUtils.close(remoteOutputStream);
811 FcpUtils.close(remoteSocket);
812 connectionHandler = null;
813 fireConnectionClosed();
821 * Incremets the counter in {@link #incomingMessageStatistics} by <cod>1</code>
822 * for the given message name.
825 * The name of the message to count
827 private void countMessage(String name) {
829 if (incomingMessageStatistics.containsKey(name)) {
830 oldValue = incomingMessageStatistics.get(name);
832 incomingMessageStatistics.put(name, oldValue + 1);
836 * Returns a limited input stream from the node’s input stream.
839 * The length of the stream
840 * @return The limited input stream
842 private LimitedInputStream getInputStream(long dataLength) {
843 if (dataLength <= 0) {
844 return new LimitedInputStream(null, 0);
846 return new LimitedInputStream(remoteInputStream, dataLength);
850 * A wrapper around an {@link InputStream} that only supplies a limit number
851 * of bytes from the underlying input stream.
853 * @author <a href="mailto:dr@ina-germany.de">David Roden</a>
856 private static class LimitedInputStream extends FilterInputStream {
858 /** The remaining number of bytes that can be read. */
859 private long remaining;
862 * Creates a new LimitedInputStream that supplies at most
863 * <code>length</code> bytes from the given input stream.
868 * The number of bytes to read
870 public LimitedInputStream(InputStream inputStream, long length) {
876 * @see java.io.FilterInputStream#available()
879 public synchronized int available() throws IOException {
880 if (remaining == 0) {
883 return (int) Math.min(super.available(), Math.min(Integer.MAX_VALUE, remaining));
887 * @see java.io.FilterInputStream#read()
890 public synchronized int read() throws IOException {
900 * @see java.io.FilterInputStream#read(byte[], int, int)
903 public synchronized int read(byte[] b, int off, int len) throws IOException {
904 if (remaining == 0) {
907 int toCopy = (int) Math.min(len, Math.min(remaining, Integer.MAX_VALUE));
908 int read = super.read(b, off, toCopy);
914 * @see java.io.FilterInputStream#skip(long)
917 public synchronized long skip(long n) throws IOException {
918 if ((n < 0) || (remaining == 0)) {
921 long skipped = super.skip(Math.min(n, remaining));
922 remaining -= skipped;
927 * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
928 * {@link #reset()} are not supported.
930 * @see java.io.FilterInputStream#mark(int)
933 public void mark(int readlimit) {
940 * @see java.io.FilterInputStream#markSupported()
941 * @return <code>false</code>
944 public boolean markSupported() {
949 * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
950 * {@link #reset()} are not supported.
952 * @see java.io.FilterInputStream#reset()
955 public void reset() throws IOException {
960 * Consumes the input stream, i.e. read all bytes until the limit is
963 * @throws IOException
964 * if an I/O error occurs
966 public void consume() throws IOException {
967 while (remaining > 0) {