X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2FFcpConnection.java;h=b7b99bfa1ddafb4b6506ae38cf78cbdb24091da7;hb=db657e470022206c978717b53cffb0c9f3c7569b;hp=2f407858a2591b549d8ede2890b769f32c2e2a6e;hpb=508624458578f01a393f8b44f32b98b40054fdc8;p=jFCPlib.git diff --git a/src/main/java/net/pterodactylus/fcp/FcpConnection.java b/src/main/java/net/pterodactylus/fcp/FcpConnection.java index 2f40785..b7b99bf 100644 --- a/src/main/java/net/pterodactylus/fcp/FcpConnection.java +++ b/src/main/java/net/pterodactylus/fcp/FcpConnection.java @@ -31,6 +31,8 @@ import java.util.HashMap; import java.util.Map; import java.util.logging.Logger; +import net.pterodactylus.fcp.FcpUtils.TempInputStream; + /** * An FCP connection to a Freenet node. * @@ -66,7 +68,7 @@ public class FcpConnection implements Closeable { private FcpConnectionHandler connectionHandler; /** Incoming message statistics. */ - private Map incomingMessageStatistics = Collections.synchronizedMap(new HashMap()); + private static final Map incomingMessageStatistics = Collections.synchronizedMap(new HashMap()); /** * Creates a new FCP connection to the freenet node running on localhost, @@ -156,6 +158,10 @@ public class FcpConnection implements Closeable { fcpListenerManager.removeListener(fcpListener); } + public synchronized boolean isClosed() { + return connectionHandler == null; + } + // // ACTIONS // @@ -191,9 +197,10 @@ public class FcpConnection implements Closeable { } /** - * Closes the connection. If there is no connection to the node, this method - * does nothing. + * Closes the connection. If there is no connection to the node, this + * method does nothing. */ + @Override public void close() { handleDisconnect(null); } @@ -222,7 +229,7 @@ public class FcpConnection implements Closeable { * @param fcpMessage * The received message */ - void handleMessage(FcpMessage fcpMessage) { + void handleMessage(FcpMessage fcpMessage) throws IOException{ logger.fine("received message: " + fcpMessage.getName()); String messageName = fcpMessage.getName(); countMessage(messageName); @@ -260,16 +267,13 @@ public class FcpConnection implements Closeable { fcpListenerManager.fireReceivedDataFound(new DataFound(fcpMessage)); } else if ("SubscribedUSKUpdate".equals(messageName)) { fcpListenerManager.fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage)); + } else if ("SubscribedUSK".equals(messageName)) { + fcpListenerManager.fireReceivedSubscribedUSK(new SubscribedUSK(fcpMessage)); } else if ("IdentifierCollision".equals(messageName)) { fcpListenerManager.fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage)); } else if ("AllData".equals(messageName)) { - LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); + InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); fcpListenerManager.fireReceivedAllData(new AllData(fcpMessage, payloadInputStream)); - try { - payloadInputStream.consume(); - } catch (IOException ioe1) { - /* well, ignore. when the connection handler fails, all fails. */ - } } else if ("EndListPeerNotes".equals(messageName)) { fcpListenerManager.fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage)); } else if ("EndListPeers".equals(messageName)) { @@ -287,15 +291,12 @@ public class FcpConnection implements Closeable { } else if ("UnknownNodeIdentifier".equals(messageName)) { fcpListenerManager.fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage)); } else if ("FCPPluginReply".equals(messageName)) { - LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); + InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); fcpListenerManager.fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream)); - try { - payloadInputStream.consume(); - } catch (IOException ioe1) { - /* ignore. */ - } } else if ("PluginInfo".equals(messageName)) { fcpListenerManager.fireReceivedPluginInfo(new PluginInfo(fcpMessage)); + } else if ("PluginRemoved".equals(messageName)) { + fcpListenerManager.fireReceivedPluginRemoved(new PluginRemoved(fcpMessage)); } else if ("NodeData".equals(messageName)) { fcpListenerManager.fireReceivedNodeData(new NodeData(fcpMessage)); } else if ("TestDDAReply".equals(messageName)) { @@ -321,8 +322,8 @@ public class FcpConnection implements Closeable { * Handles a disconnect from the node. * * @param throwable - * The exception that caused the disconnect, or null - * if there was no exception + * The exception that caused the disconnect, or + * null if there was no exception */ synchronized void handleDisconnect(Throwable throwable) { FcpUtils.close(remoteInputStream); @@ -355,142 +356,8 @@ public class FcpConnection implements Closeable { logger.finest("count for " + name + ": " + (oldValue + 1)); } - /** - * Returns a limited input stream from the node’s input stream. - * - * @param dataLength - * The length of the stream - * @return The limited input stream - */ - private synchronized LimitedInputStream getInputStream(long dataLength) { - if (dataLength <= 0) { - return new LimitedInputStream(null, 0); - } - return new LimitedInputStream(remoteInputStream, dataLength); - } - - /** - * A wrapper around an {@link InputStream} that only supplies a limit number - * of bytes from the underlying input stream. - * - * @author David ‘Bombe’ Roden <bombe@freenetproject.org> - */ - private static class LimitedInputStream extends FilterInputStream { - - /** The remaining number of bytes that can be read. */ - private long remaining; - - /** - * Creates a new LimitedInputStream that supplies at most - * length bytes from the given input stream. - * - * @param inputStream - * The input stream - * @param length - * The number of bytes to read - */ - public LimitedInputStream(InputStream inputStream, long length) { - super(inputStream); - remaining = length; - } - - /** - * @see java.io.FilterInputStream#available() - */ - @Override - public synchronized int available() throws IOException { - if (remaining == 0) { - return 0; - } - return (int) Math.min(super.available(), Math.min(Integer.MAX_VALUE, remaining)); - } - - /** - * @see java.io.FilterInputStream#read() - */ - @Override - public synchronized int read() throws IOException { - int read = -1; - if (remaining > 0) { - read = super.read(); - remaining--; - } - return read; - } - - /** - * @see java.io.FilterInputStream#read(byte[], int, int) - */ - @Override - public synchronized int read(byte[] b, int off, int len) throws IOException { - if (remaining == 0) { - return -1; - } - int toCopy = (int) Math.min(len, Math.min(remaining, Integer.MAX_VALUE)); - int read = super.read(b, off, toCopy); - remaining -= read; - return read; - } - - /** - * @see java.io.FilterInputStream#skip(long) - */ - @Override - public synchronized long skip(long n) throws IOException { - if ((n < 0) || (remaining == 0)) { - return 0; - } - long skipped = super.skip(Math.min(n, remaining)); - remaining -= skipped; - return skipped; - } - - /** - * {@inheritDoc} This method does nothing, as {@link #mark(int)} and - * {@link #reset()} are not supported. - * - * @see java.io.FilterInputStream#mark(int) - */ - @Override - public void mark(int readlimit) { - /* do nothing. */ - } - - /** - * {@inheritDoc} - * - * @see java.io.FilterInputStream#markSupported() - * @return false - */ - @Override - public boolean markSupported() { - return false; - } - - /** - * {@inheritDoc} This method does nothing, as {@link #mark(int)} and - * {@link #reset()} are not supported. - * - * @see java.io.FilterInputStream#reset() - */ - @Override - public void reset() throws IOException { - /* do nothing. */ - } - - /** - * Consumes the input stream, i.e. read all bytes until the limit is - * reached. - * - * @throws IOException - * if an I/O error occurs - */ - public synchronized void consume() throws IOException { - while (remaining > 0) { - skip(remaining); - } - } - + private synchronized InputStream getInputStream(long dataLength) throws IOException { + return new TempInputStream(remoteInputStream, dataLength); } }