From: David ‘Bombe’ Roden Date: Fri, 10 Jun 2016 04:50:49 +0000 (+0200) Subject: Copy payload in AllData/FCPPluginReply handlers X-Git-Tag: v0.1.4^2~1 X-Git-Url: https://git.pterodactylus.net/?a=commitdiff_plain;h=db657e470022206c978717b53cffb0c9f3c7569b;p=jFCPlib.git Copy payload in AllData/FCPPluginReply handlers --- diff --git a/src/main/java/net/pterodactylus/fcp/FcpConnection.java b/src/main/java/net/pterodactylus/fcp/FcpConnection.java index 34f47fd..b7b99bf 100644 --- a/src/main/java/net/pterodactylus/fcp/FcpConnection.java +++ b/src/main/java/net/pterodactylus/fcp/FcpConnection.java @@ -31,6 +31,8 @@ import java.util.HashMap; import java.util.Map; import java.util.logging.Logger; +import net.pterodactylus.fcp.FcpUtils.TempInputStream; + /** * An FCP connection to a Freenet node. * @@ -227,7 +229,7 @@ public class FcpConnection implements Closeable { * @param fcpMessage * The received message */ - void handleMessage(FcpMessage fcpMessage) { + void handleMessage(FcpMessage fcpMessage) throws IOException{ logger.fine("received message: " + fcpMessage.getName()); String messageName = fcpMessage.getName(); countMessage(messageName); @@ -270,13 +272,8 @@ public class FcpConnection implements Closeable { } else if ("IdentifierCollision".equals(messageName)) { fcpListenerManager.fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage)); } else if ("AllData".equals(messageName)) { - LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); + InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); fcpListenerManager.fireReceivedAllData(new AllData(fcpMessage, payloadInputStream)); - try { - payloadInputStream.consume(); - } catch (IOException ioe1) { - /* well, ignore. when the connection handler fails, all fails. */ - } } else if ("EndListPeerNotes".equals(messageName)) { fcpListenerManager.fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage)); } else if ("EndListPeers".equals(messageName)) { @@ -294,13 +291,8 @@ public class FcpConnection implements Closeable { } else if ("UnknownNodeIdentifier".equals(messageName)) { fcpListenerManager.fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage)); } else if ("FCPPluginReply".equals(messageName)) { - LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); + InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); fcpListenerManager.fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream)); - try { - payloadInputStream.consume(); - } catch (IOException ioe1) { - /* ignore. */ - } } else if ("PluginInfo".equals(messageName)) { fcpListenerManager.fireReceivedPluginInfo(new PluginInfo(fcpMessage)); } else if ("PluginRemoved".equals(messageName)) { @@ -364,142 +356,8 @@ public class FcpConnection implements Closeable { logger.finest("count for " + name + ": " + (oldValue + 1)); } - /** - * Returns a limited input stream from the node’s input stream. - * - * @param dataLength - * The length of the stream - * @return The limited input stream - */ - private synchronized LimitedInputStream getInputStream(long dataLength) { - if (dataLength <= 0) { - return new LimitedInputStream(null, 0); - } - return new LimitedInputStream(remoteInputStream, dataLength); - } - - /** - * A wrapper around an {@link InputStream} that only supplies a limit - * number of bytes from the underlying input stream. - * - * @author David ‘Bombe’ Roden <bombe@freenetproject.org> - */ - private static class LimitedInputStream extends FilterInputStream { - - /** The remaining number of bytes that can be read. */ - private long remaining; - - /** - * Creates a new LimitedInputStream that supplies at most - * length bytes from the given input stream. - * - * @param inputStream - * The input stream - * @param length - * The number of bytes to read - */ - public LimitedInputStream(InputStream inputStream, long length) { - super(inputStream); - remaining = length; - } - - /** - * @see java.io.FilterInputStream#available() - */ - @Override - public synchronized int available() throws IOException { - if (remaining == 0) { - return 0; - } - return (int) Math.min(super.available(), Math.min(Integer.MAX_VALUE, remaining)); - } - - /** - * @see java.io.FilterInputStream#read() - */ - @Override - public synchronized int read() throws IOException { - int read = -1; - if (remaining > 0) { - read = super.read(); - remaining--; - } - return read; - } - - /** - * @see java.io.FilterInputStream#read(byte[], int, int) - */ - @Override - public synchronized int read(byte[] b, int off, int len) throws IOException { - if (remaining == 0) { - return -1; - } - int toCopy = (int) Math.min(len, Math.min(remaining, Integer.MAX_VALUE)); - int read = super.read(b, off, toCopy); - remaining -= read; - return read; - } - - /** - * @see java.io.FilterInputStream#skip(long) - */ - @Override - public synchronized long skip(long n) throws IOException { - if ((n < 0) || (remaining == 0)) { - return 0; - } - long skipped = super.skip(Math.min(n, remaining)); - remaining -= skipped; - return skipped; - } - - /** - * {@inheritDoc} This method does nothing, as {@link #mark(int)} and - * {@link #reset()} are not supported. - * - * @see java.io.FilterInputStream#mark(int) - */ - @Override - public synchronized void mark(int readlimit) { - /* do nothing. */ - } - - /** - * {@inheritDoc} - * - * @see java.io.FilterInputStream#markSupported() - * @return false - */ - @Override - public boolean markSupported() { - return false; - } - - /** - * {@inheritDoc} This method does nothing, as {@link #mark(int)} and - * {@link #reset()} are not supported. - * - * @see java.io.FilterInputStream#reset() - */ - @Override - public synchronized void reset() throws IOException { - /* do nothing. */ - } - - /** - * Consumes the input stream, i.e. read all bytes until the limit is - * reached. - * - * @throws IOException - * if an I/O error occurs - */ - public synchronized void consume() throws IOException { - while (remaining > 0) { - skip(remaining); - } - } - + private synchronized InputStream getInputStream(long dataLength) throws IOException { + return new TempInputStream(remoteInputStream, dataLength); } }