If DataLength can not be parsed, assume a length of 0
[jFCPlib.git] / src / main / java / net / pterodactylus / fcp / FcpConnection.java
index 84f7ce9..3331bdf 100644 (file)
@@ -1,9 +1,9 @@
 /*
- * jFCPlib - FpcConnection.java - Copyright © 2008 David Roden
+ * jFCPlib - FcpConnection.java - Copyright © 2008–2016 David Roden
  *
- * This program is free software; you can redistribute it and/or modify
+ * This program is free software: you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
+ * the Free Software Foundation, either version 3 of the License, or
  * (at your option) any later version.
  *
  * This program is distributed in the hope that it will be useful,
@@ -12,8 +12,7 @@
  * GNU General Public License for more details.
  *
  * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
 
 package net.pterodactylus.fcp;
@@ -31,7 +30,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.logging.Logger;
 
-import net.pterodactylus.util.logging.Logging;
+import net.pterodactylus.fcp.FcpUtils.TempInputStream;
 
 /**
  * An FCP connection to a Freenet node.
@@ -41,7 +40,7 @@ import net.pterodactylus.util.logging.Logging;
 public class FcpConnection implements Closeable {
 
        /** Logger. */
-       private static final Logger logger = Logging.getLogger(FcpConnection.class.getName());
+       private static final Logger logger = Logger.getLogger(FcpConnection.class.getName());
 
        /** The default port for FCP v2. */
        public static final int DEFAULT_PORT = 9481;
@@ -68,7 +67,7 @@ public class FcpConnection implements Closeable {
        private FcpConnectionHandler connectionHandler;
 
        /** Incoming message statistics. */
-       private Map<String, Integer> incomingMessageStatistics = Collections.synchronizedMap(new HashMap<String, Integer>());
+       private static final Map<String, Integer> incomingMessageStatistics = Collections.synchronizedMap(new HashMap<String, Integer>());
 
        /**
         * Creates a new FCP connection to the freenet node running on localhost,
@@ -158,6 +157,10 @@ public class FcpConnection implements Closeable {
                fcpListenerManager.removeListener(fcpListener);
        }
 
+       public synchronized boolean isClosed() {
+               return connectionHandler == null;
+       }
+
        //
        // ACTIONS
        //
@@ -193,9 +196,10 @@ public class FcpConnection implements Closeable {
        }
 
        /**
-        * Closes the connection. If there is no connection to the node, this method
-        * does nothing.
+        * Closes the connection. If there is no connection to the node, this
+        * method does nothing.
         */
+       @Override
        public void close() {
                handleDisconnect(null);
        }
@@ -224,7 +228,7 @@ public class FcpConnection implements Closeable {
         * @param fcpMessage
         *            The received message
         */
-       void handleMessage(FcpMessage fcpMessage) {
+       void handleMessage(FcpMessage fcpMessage) throws IOException{
                logger.fine("received message: " + fcpMessage.getName());
                String messageName = fcpMessage.getName();
                countMessage(messageName);
@@ -262,16 +266,13 @@ public class FcpConnection implements Closeable {
                        fcpListenerManager.fireReceivedDataFound(new DataFound(fcpMessage));
                } else if ("SubscribedUSKUpdate".equals(messageName)) {
                        fcpListenerManager.fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage));
+               } else if ("SubscribedUSK".equals(messageName)) {
+                       fcpListenerManager.fireReceivedSubscribedUSK(new SubscribedUSK(fcpMessage));
                } else if ("IdentifierCollision".equals(messageName)) {
                        fcpListenerManager.fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage));
                } else if ("AllData".equals(messageName)) {
-                       LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
+                       InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
                        fcpListenerManager.fireReceivedAllData(new AllData(fcpMessage, payloadInputStream));
-                       try {
-                               payloadInputStream.consume();
-                       } catch (IOException ioe1) {
-                               /* well, ignore. when the connection handler fails, all fails. */
-                       }
                } else if ("EndListPeerNotes".equals(messageName)) {
                        fcpListenerManager.fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage));
                } else if ("EndListPeers".equals(messageName)) {
@@ -289,15 +290,12 @@ public class FcpConnection implements Closeable {
                } else if ("UnknownNodeIdentifier".equals(messageName)) {
                        fcpListenerManager.fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage));
                } else if ("FCPPluginReply".equals(messageName)) {
-                       LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
+                       InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"), 0));
                        fcpListenerManager.fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream));
-                       try {
-                               payloadInputStream.consume();
-                       } catch (IOException ioe1) {
-                               /* ignore. */
-                       }
                } else if ("PluginInfo".equals(messageName)) {
                        fcpListenerManager.fireReceivedPluginInfo(new PluginInfo(fcpMessage));
+               } else if ("PluginRemoved".equals(messageName)) {
+                       fcpListenerManager.fireReceivedPluginRemoved(new PluginRemoved(fcpMessage));
                } else if ("NodeData".equals(messageName)) {
                        fcpListenerManager.fireReceivedNodeData(new NodeData(fcpMessage));
                } else if ("TestDDAReply".equals(messageName)) {
@@ -323,8 +321,8 @@ public class FcpConnection implements Closeable {
         * Handles a disconnect from the node.
         *
         * @param throwable
-        *            The exception that caused the disconnect, or <code>null</code>
-        *            if there was no exception
+        *            The exception that caused the disconnect, or
+        *            <code>null</code> if there was no exception
         */
        synchronized void handleDisconnect(Throwable throwable) {
                FcpUtils.close(remoteInputStream);
@@ -357,142 +355,8 @@ public class FcpConnection implements Closeable {
                logger.finest("count for " + name + ": " + (oldValue + 1));
        }
 
-       /**
-        * Returns a limited input stream from the node’s input stream.
-        *
-        * @param dataLength
-        *            The length of the stream
-        * @return The limited input stream
-        */
-       private synchronized LimitedInputStream getInputStream(long dataLength) {
-               if (dataLength <= 0) {
-                       return new LimitedInputStream(null, 0);
-               }
-               return new LimitedInputStream(remoteInputStream, dataLength);
-       }
-
-       /**
-        * A wrapper around an {@link InputStream} that only supplies a limit number
-        * of bytes from the underlying input stream.
-        *
-        * @author David ‘Bombe’ Roden &lt;bombe@freenetproject.org&gt;
-        */
-       private static class LimitedInputStream extends FilterInputStream {
-
-               /** The remaining number of bytes that can be read. */
-               private long remaining;
-
-               /**
-                * Creates a new LimitedInputStream that supplies at most
-                * <code>length</code> bytes from the given input stream.
-                *
-                * @param inputStream
-                *            The input stream
-                * @param length
-                *            The number of bytes to read
-                */
-               public LimitedInputStream(InputStream inputStream, long length) {
-                       super(inputStream);
-                       remaining = length;
-               }
-
-               /**
-                * @see java.io.FilterInputStream#available()
-                */
-               @Override
-               public synchronized int available() throws IOException {
-                       if (remaining == 0) {
-                               return 0;
-                       }
-                       return (int) Math.min(super.available(), Math.min(Integer.MAX_VALUE, remaining));
-               }
-
-               /**
-                * @see java.io.FilterInputStream#read()
-                */
-               @Override
-               public synchronized int read() throws IOException {
-                       int read = -1;
-                       if (remaining > 0) {
-                               read = super.read();
-                               remaining--;
-                       }
-                       return read;
-               }
-
-               /**
-                * @see java.io.FilterInputStream#read(byte[], int, int)
-                */
-               @Override
-               public synchronized int read(byte[] b, int off, int len) throws IOException {
-                       if (remaining == 0) {
-                               return -1;
-                       }
-                       int toCopy = (int) Math.min(len, Math.min(remaining, Integer.MAX_VALUE));
-                       int read = super.read(b, off, toCopy);
-                       remaining -= read;
-                       return read;
-               }
-
-               /**
-                * @see java.io.FilterInputStream#skip(long)
-                */
-               @Override
-               public synchronized long skip(long n) throws IOException {
-                       if ((n < 0) || (remaining == 0)) {
-                               return 0;
-                       }
-                       long skipped = super.skip(Math.min(n, remaining));
-                       remaining -= skipped;
-                       return skipped;
-               }
-
-               /**
-                * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
-                * {@link #reset()} are not supported.
-                *
-                * @see java.io.FilterInputStream#mark(int)
-                */
-               @Override
-               public void mark(int readlimit) {
-                       /* do nothing. */
-               }
-
-               /**
-                * {@inheritDoc}
-                *
-                * @see java.io.FilterInputStream#markSupported()
-                * @return <code>false</code>
-                */
-               @Override
-               public boolean markSupported() {
-                       return false;
-               }
-
-               /**
-                * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
-                * {@link #reset()} are not supported.
-                *
-                * @see java.io.FilterInputStream#reset()
-                */
-               @Override
-               public void reset() throws IOException {
-                       /* do nothing. */
-               }
-
-               /**
-                * Consumes the input stream, i.e. read all bytes until the limit is
-                * reached.
-                *
-                * @throws IOException
-                *             if an I/O error occurs
-                */
-               public synchronized void consume() throws IOException {
-                       while (remaining > 0) {
-                               skip(remaining);
-                       }
-               }
-
+       private synchronized InputStream getInputStream(long dataLength) throws IOException {
+               return new TempInputStream(remoteInputStream, dataLength);
        }
 
 }