import java.util.Map;
import java.util.logging.Logger;
+import net.pterodactylus.fcp.FcpUtils.TempInputStream;
+
/**
* An FCP connection to a Freenet node.
*
* @param fcpMessage
* The received message
*/
- void handleMessage(FcpMessage fcpMessage) {
+ void handleMessage(FcpMessage fcpMessage) throws IOException{
logger.fine("received message: " + fcpMessage.getName());
String messageName = fcpMessage.getName();
countMessage(messageName);
} else if ("IdentifierCollision".equals(messageName)) {
fcpListenerManager.fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage));
} else if ("AllData".equals(messageName)) {
- LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
+ InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
fcpListenerManager.fireReceivedAllData(new AllData(fcpMessage, payloadInputStream));
- try {
- payloadInputStream.consume();
- } catch (IOException ioe1) {
- /* well, ignore. when the connection handler fails, all fails. */
- }
} else if ("EndListPeerNotes".equals(messageName)) {
fcpListenerManager.fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage));
} else if ("EndListPeers".equals(messageName)) {
} else if ("UnknownNodeIdentifier".equals(messageName)) {
fcpListenerManager.fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage));
} else if ("FCPPluginReply".equals(messageName)) {
- LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
+ InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
fcpListenerManager.fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream));
- try {
- payloadInputStream.consume();
- } catch (IOException ioe1) {
- /* ignore. */
- }
} else if ("PluginInfo".equals(messageName)) {
fcpListenerManager.fireReceivedPluginInfo(new PluginInfo(fcpMessage));
} else if ("PluginRemoved".equals(messageName)) {
logger.finest("count for " + name + ": " + (oldValue + 1));
}
- /**
- * Returns a limited input stream from the node’s input stream.
- *
- * @param dataLength
- * The length of the stream
- * @return The limited input stream
- */
- private synchronized LimitedInputStream getInputStream(long dataLength) {
- if (dataLength <= 0) {
- return new LimitedInputStream(null, 0);
- }
- return new LimitedInputStream(remoteInputStream, dataLength);
- }
-
- /**
- * A wrapper around an {@link InputStream} that only supplies a limit
- * number of bytes from the underlying input stream.
- *
- * @author David ‘Bombe’ Roden <bombe@freenetproject.org>
- */
- private static class LimitedInputStream extends FilterInputStream {
-
- /** The remaining number of bytes that can be read. */
- private long remaining;
-
- /**
- * Creates a new LimitedInputStream that supplies at most
- * <code>length</code> bytes from the given input stream.
- *
- * @param inputStream
- * The input stream
- * @param length
- * The number of bytes to read
- */
- public LimitedInputStream(InputStream inputStream, long length) {
- super(inputStream);
- remaining = length;
- }
-
- /**
- * @see java.io.FilterInputStream#available()
- */
- @Override
- public synchronized int available() throws IOException {
- if (remaining == 0) {
- return 0;
- }
- return (int) Math.min(super.available(), Math.min(Integer.MAX_VALUE, remaining));
- }
-
- /**
- * @see java.io.FilterInputStream#read()
- */
- @Override
- public synchronized int read() throws IOException {
- int read = -1;
- if (remaining > 0) {
- read = super.read();
- remaining--;
- }
- return read;
- }
-
- /**
- * @see java.io.FilterInputStream#read(byte[], int, int)
- */
- @Override
- public synchronized int read(byte[] b, int off, int len) throws IOException {
- if (remaining == 0) {
- return -1;
- }
- int toCopy = (int) Math.min(len, Math.min(remaining, Integer.MAX_VALUE));
- int read = super.read(b, off, toCopy);
- remaining -= read;
- return read;
- }
-
- /**
- * @see java.io.FilterInputStream#skip(long)
- */
- @Override
- public synchronized long skip(long n) throws IOException {
- if ((n < 0) || (remaining == 0)) {
- return 0;
- }
- long skipped = super.skip(Math.min(n, remaining));
- remaining -= skipped;
- return skipped;
- }
-
- /**
- * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
- * {@link #reset()} are not supported.
- *
- * @see java.io.FilterInputStream#mark(int)
- */
- @Override
- public synchronized void mark(int readlimit) {
- /* do nothing. */
- }
-
- /**
- * {@inheritDoc}
- *
- * @see java.io.FilterInputStream#markSupported()
- * @return <code>false</code>
- */
- @Override
- public boolean markSupported() {
- return false;
- }
-
- /**
- * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
- * {@link #reset()} are not supported.
- *
- * @see java.io.FilterInputStream#reset()
- */
- @Override
- public synchronized void reset() throws IOException {
- /* do nothing. */
- }
-
- /**
- * Consumes the input stream, i.e. read all bytes until the limit is
- * reached.
- *
- * @throws IOException
- * if an I/O error occurs
- */
- public synchronized void consume() throws IOException {
- while (remaining > 0) {
- skip(remaining);
- }
- }
-
+ private synchronized InputStream getInputStream(long dataLength) throws IOException {
+ return new TempInputStream(remoteInputStream, dataLength);
}
}