X-Git-Url: https://git.pterodactylus.net/?p=jFCPlib.git;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2FFcpConnection.java;fp=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2FFcpConnection.java;h=b7b99bfa1ddafb4b6506ae38cf78cbdb24091da7;hp=66377a2c05c2e8b4de4747e1770a07cb7d9246fc;hb=fed333217dc45a47f6eabea6a5b29aed987cd160;hpb=6065356598b699145344a1a873054f2e5b0d23ef diff --git a/src/main/java/net/pterodactylus/fcp/FcpConnection.java b/src/main/java/net/pterodactylus/fcp/FcpConnection.java index 66377a2..b7b99bf 100644 --- a/src/main/java/net/pterodactylus/fcp/FcpConnection.java +++ b/src/main/java/net/pterodactylus/fcp/FcpConnection.java @@ -31,6 +31,8 @@ import java.util.HashMap; import java.util.Map; import java.util.logging.Logger; +import net.pterodactylus.fcp.FcpUtils.TempInputStream; + /** * An FCP connection to a Freenet node. * @@ -156,6 +158,10 @@ public class FcpConnection implements Closeable { fcpListenerManager.removeListener(fcpListener); } + public synchronized boolean isClosed() { + return connectionHandler == null; + } + // // ACTIONS // @@ -223,7 +229,7 @@ public class FcpConnection implements Closeable { * @param fcpMessage * The received message */ - void handleMessage(FcpMessage fcpMessage) { + void handleMessage(FcpMessage fcpMessage) throws IOException{ logger.fine("received message: " + fcpMessage.getName()); String messageName = fcpMessage.getName(); countMessage(messageName); @@ -261,16 +267,13 @@ public class FcpConnection implements Closeable { fcpListenerManager.fireReceivedDataFound(new DataFound(fcpMessage)); } else if ("SubscribedUSKUpdate".equals(messageName)) { fcpListenerManager.fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage)); + } else if ("SubscribedUSK".equals(messageName)) { + fcpListenerManager.fireReceivedSubscribedUSK(new SubscribedUSK(fcpMessage)); } else if ("IdentifierCollision".equals(messageName)) { fcpListenerManager.fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage)); } else if ("AllData".equals(messageName)) { - LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); + InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); fcpListenerManager.fireReceivedAllData(new AllData(fcpMessage, payloadInputStream)); - try { - payloadInputStream.consume(); - } catch (IOException ioe1) { - /* well, ignore. when the connection handler fails, all fails. */ - } } else if ("EndListPeerNotes".equals(messageName)) { fcpListenerManager.fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage)); } else if ("EndListPeers".equals(messageName)) { @@ -288,15 +291,12 @@ public class FcpConnection implements Closeable { } else if ("UnknownNodeIdentifier".equals(messageName)) { fcpListenerManager.fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage)); } else if ("FCPPluginReply".equals(messageName)) { - LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); + InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); fcpListenerManager.fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream)); - try { - payloadInputStream.consume(); - } catch (IOException ioe1) { - /* ignore. */ - } } else if ("PluginInfo".equals(messageName)) { fcpListenerManager.fireReceivedPluginInfo(new PluginInfo(fcpMessage)); + } else if ("PluginRemoved".equals(messageName)) { + fcpListenerManager.fireReceivedPluginRemoved(new PluginRemoved(fcpMessage)); } else if ("NodeData".equals(messageName)) { fcpListenerManager.fireReceivedNodeData(new NodeData(fcpMessage)); } else if ("TestDDAReply".equals(messageName)) { @@ -356,142 +356,8 @@ public class FcpConnection implements Closeable { logger.finest("count for " + name + ": " + (oldValue + 1)); } - /** - * Returns a limited input stream from the node’s input stream. - * - * @param dataLength - * The length of the stream - * @return The limited input stream - */ - private synchronized LimitedInputStream getInputStream(long dataLength) { - if (dataLength <= 0) { - return new LimitedInputStream(null, 0); - } - return new LimitedInputStream(remoteInputStream, dataLength); - } - - /** - * A wrapper around an {@link InputStream} that only supplies a limit - * number of bytes from the underlying input stream. - * - * @author David ‘Bombe’ Roden <bombe@freenetproject.org> - */ - private static class LimitedInputStream extends FilterInputStream { - - /** The remaining number of bytes that can be read. */ - private long remaining; - - /** - * Creates a new LimitedInputStream that supplies at most - * length bytes from the given input stream. - * - * @param inputStream - * The input stream - * @param length - * The number of bytes to read - */ - public LimitedInputStream(InputStream inputStream, long length) { - super(inputStream); - remaining = length; - } - - /** - * @see java.io.FilterInputStream#available() - */ - @Override - public synchronized int available() throws IOException { - if (remaining == 0) { - return 0; - } - return (int) Math.min(super.available(), Math.min(Integer.MAX_VALUE, remaining)); - } - - /** - * @see java.io.FilterInputStream#read() - */ - @Override - public synchronized int read() throws IOException { - int read = -1; - if (remaining > 0) { - read = super.read(); - remaining--; - } - return read; - } - - /** - * @see java.io.FilterInputStream#read(byte[], int, int) - */ - @Override - public synchronized int read(byte[] b, int off, int len) throws IOException { - if (remaining == 0) { - return -1; - } - int toCopy = (int) Math.min(len, Math.min(remaining, Integer.MAX_VALUE)); - int read = super.read(b, off, toCopy); - remaining -= read; - return read; - } - - /** - * @see java.io.FilterInputStream#skip(long) - */ - @Override - public synchronized long skip(long n) throws IOException { - if ((n < 0) || (remaining == 0)) { - return 0; - } - long skipped = super.skip(Math.min(n, remaining)); - remaining -= skipped; - return skipped; - } - - /** - * {@inheritDoc} This method does nothing, as {@link #mark(int)} and - * {@link #reset()} are not supported. - * - * @see java.io.FilterInputStream#mark(int) - */ - @Override - public synchronized void mark(int readlimit) { - /* do nothing. */ - } - - /** - * {@inheritDoc} - * - * @see java.io.FilterInputStream#markSupported() - * @return false - */ - @Override - public boolean markSupported() { - return false; - } - - /** - * {@inheritDoc} This method does nothing, as {@link #mark(int)} and - * {@link #reset()} are not supported. - * - * @see java.io.FilterInputStream#reset() - */ - @Override - public synchronized void reset() throws IOException { - /* do nothing. */ - } - - /** - * Consumes the input stream, i.e. read all bytes until the limit is - * reached. - * - * @throws IOException - * if an I/O error occurs - */ - public synchronized void consume() throws IOException { - while (remaining > 0) { - skip(remaining); - } - } - + private synchronized InputStream getInputStream(long dataLength) throws IOException { + return new TempInputStream(remoteInputStream, dataLength); } }