Copy payload in AllData/FCPPluginReply handlers
authorDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Fri, 10 Jun 2016 04:50:49 +0000 (06:50 +0200)
committerDavid ‘Bombe’ Roden <bombe@freenetproject.org>
Fri, 10 Jun 2016 04:50:49 +0000 (06:50 +0200)
src/main/java/net/pterodactylus/fcp/FcpConnection.java

index 34f47fd..b7b99bf 100644 (file)
@@ -31,6 +31,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.logging.Logger;
 
+import net.pterodactylus.fcp.FcpUtils.TempInputStream;
+
 /**
  * An FCP connection to a Freenet node.
  *
@@ -227,7 +229,7 @@ public class FcpConnection implements Closeable {
         * @param fcpMessage
         *            The received message
         */
-       void handleMessage(FcpMessage fcpMessage) {
+       void handleMessage(FcpMessage fcpMessage) throws IOException{
                logger.fine("received message: " + fcpMessage.getName());
                String messageName = fcpMessage.getName();
                countMessage(messageName);
@@ -270,13 +272,8 @@ public class FcpConnection implements Closeable {
                } else if ("IdentifierCollision".equals(messageName)) {
                        fcpListenerManager.fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage));
                } else if ("AllData".equals(messageName)) {
-                       LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
+                       InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
                        fcpListenerManager.fireReceivedAllData(new AllData(fcpMessage, payloadInputStream));
-                       try {
-                               payloadInputStream.consume();
-                       } catch (IOException ioe1) {
-                               /* well, ignore. when the connection handler fails, all fails. */
-                       }
                } else if ("EndListPeerNotes".equals(messageName)) {
                        fcpListenerManager.fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage));
                } else if ("EndListPeers".equals(messageName)) {
@@ -294,13 +291,8 @@ public class FcpConnection implements Closeable {
                } else if ("UnknownNodeIdentifier".equals(messageName)) {
                        fcpListenerManager.fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage));
                } else if ("FCPPluginReply".equals(messageName)) {
-                       LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
+                       InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
                        fcpListenerManager.fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream));
-                       try {
-                               payloadInputStream.consume();
-                       } catch (IOException ioe1) {
-                               /* ignore. */
-                       }
                } else if ("PluginInfo".equals(messageName)) {
                        fcpListenerManager.fireReceivedPluginInfo(new PluginInfo(fcpMessage));
                } else if ("PluginRemoved".equals(messageName)) {
@@ -364,142 +356,8 @@ public class FcpConnection implements Closeable {
                logger.finest("count for " + name + ": " + (oldValue + 1));
        }
 
-       /**
-        * Returns a limited input stream from the node’s input stream.
-        *
-        * @param dataLength
-        *            The length of the stream
-        * @return The limited input stream
-        */
-       private synchronized LimitedInputStream getInputStream(long dataLength) {
-               if (dataLength <= 0) {
-                       return new LimitedInputStream(null, 0);
-               }
-               return new LimitedInputStream(remoteInputStream, dataLength);
-       }
-
-       /**
-        * A wrapper around an {@link InputStream} that only supplies a limit
-        * number of bytes from the underlying input stream.
-        *
-        * @author David ‘Bombe’ Roden &lt;bombe@freenetproject.org&gt;
-        */
-       private static class LimitedInputStream extends FilterInputStream {
-
-               /** The remaining number of bytes that can be read. */
-               private long remaining;
-
-               /**
-                * Creates a new LimitedInputStream that supplies at most
-                * <code>length</code> bytes from the given input stream.
-                *
-                * @param inputStream
-                *            The input stream
-                * @param length
-                *            The number of bytes to read
-                */
-               public LimitedInputStream(InputStream inputStream, long length) {
-                       super(inputStream);
-                       remaining = length;
-               }
-
-               /**
-                * @see java.io.FilterInputStream#available()
-                */
-               @Override
-               public synchronized int available() throws IOException {
-                       if (remaining == 0) {
-                               return 0;
-                       }
-                       return (int) Math.min(super.available(), Math.min(Integer.MAX_VALUE, remaining));
-               }
-
-               /**
-                * @see java.io.FilterInputStream#read()
-                */
-               @Override
-               public synchronized int read() throws IOException {
-                       int read = -1;
-                       if (remaining > 0) {
-                               read = super.read();
-                               remaining--;
-                       }
-                       return read;
-               }
-
-               /**
-                * @see java.io.FilterInputStream#read(byte[], int, int)
-                */
-               @Override
-               public synchronized int read(byte[] b, int off, int len) throws IOException {
-                       if (remaining == 0) {
-                               return -1;
-                       }
-                       int toCopy = (int) Math.min(len, Math.min(remaining, Integer.MAX_VALUE));
-                       int read = super.read(b, off, toCopy);
-                       remaining -= read;
-                       return read;
-               }
-
-               /**
-                * @see java.io.FilterInputStream#skip(long)
-                */
-               @Override
-               public synchronized long skip(long n) throws IOException {
-                       if ((n < 0) || (remaining == 0)) {
-                               return 0;
-                       }
-                       long skipped = super.skip(Math.min(n, remaining));
-                       remaining -= skipped;
-                       return skipped;
-               }
-
-               /**
-                * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
-                * {@link #reset()} are not supported.
-                *
-                * @see java.io.FilterInputStream#mark(int)
-                */
-               @Override
-               public synchronized void mark(int readlimit) {
-                       /* do nothing. */
-               }
-
-               /**
-                * {@inheritDoc}
-                *
-                * @see java.io.FilterInputStream#markSupported()
-                * @return <code>false</code>
-                */
-               @Override
-               public boolean markSupported() {
-                       return false;
-               }
-
-               /**
-                * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
-                * {@link #reset()} are not supported.
-                *
-                * @see java.io.FilterInputStream#reset()
-                */
-               @Override
-               public synchronized void reset() throws IOException {
-                       /* do nothing. */
-               }
-
-               /**
-                * Consumes the input stream, i.e. read all bytes until the limit is
-                * reached.
-                *
-                * @throws IOException
-                *             if an I/O error occurs
-                */
-               public synchronized void consume() throws IOException {
-                       while (remaining > 0) {
-                               skip(remaining);
-                       }
-               }
-
+       private synchronized InputStream getInputStream(long dataLength) throws IOException {
+               return new TempInputStream(remoteInputStream, dataLength);
        }
 
 }