X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2FFcpConnection.java;h=3331bdf17cca8fd80d9c6c0faae878fb85c8622c;hb=5618b5c1d75357ad2e17da1d42997b4390aaf0da;hp=858b783b305517c9526eec51b943987ea6e75b0f;hpb=b171126719c983b590c51f22eb60a3e0afdf1fb9;p=jFCPlib.git diff --git a/src/main/java/net/pterodactylus/fcp/FcpConnection.java b/src/main/java/net/pterodactylus/fcp/FcpConnection.java index 858b783..3331bdf 100644 --- a/src/main/java/net/pterodactylus/fcp/FcpConnection.java +++ b/src/main/java/net/pterodactylus/fcp/FcpConnection.java @@ -1,9 +1,9 @@ /* - * jFCPlib - FpcConnection.java - Copyright © 2008 David Roden + * jFCPlib - FcpConnection.java - Copyright © 2008–2016 David Roden * - * This program is free software; you can redistribute it and/or modify + * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or + * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, @@ -12,8 +12,7 @@ * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software - * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + * along with this program. If not, see . */ package net.pterodactylus.fcp; @@ -31,7 +30,7 @@ import java.util.HashMap; import java.util.Map; import java.util.logging.Logger; -import net.pterodactylus.util.logging.Logging; +import net.pterodactylus.fcp.FcpUtils.TempInputStream; /** * An FCP connection to a Freenet node. @@ -41,7 +40,7 @@ import net.pterodactylus.util.logging.Logging; public class FcpConnection implements Closeable { /** Logger. */ - private static final Logger logger = Logging.getLogger(FcpConnection.class.getName()); + private static final Logger logger = Logger.getLogger(FcpConnection.class.getName()); /** The default port for FCP v2. */ public static final int DEFAULT_PORT = 9481; @@ -158,6 +157,10 @@ public class FcpConnection implements Closeable { fcpListenerManager.removeListener(fcpListener); } + public synchronized boolean isClosed() { + return connectionHandler == null; + } + // // ACTIONS // @@ -225,7 +228,7 @@ public class FcpConnection implements Closeable { * @param fcpMessage * The received message */ - void handleMessage(FcpMessage fcpMessage) { + void handleMessage(FcpMessage fcpMessage) throws IOException{ logger.fine("received message: " + fcpMessage.getName()); String messageName = fcpMessage.getName(); countMessage(messageName); @@ -263,16 +266,13 @@ public class FcpConnection implements Closeable { fcpListenerManager.fireReceivedDataFound(new DataFound(fcpMessage)); } else if ("SubscribedUSKUpdate".equals(messageName)) { fcpListenerManager.fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage)); + } else if ("SubscribedUSK".equals(messageName)) { + fcpListenerManager.fireReceivedSubscribedUSK(new SubscribedUSK(fcpMessage)); } else if ("IdentifierCollision".equals(messageName)) { fcpListenerManager.fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage)); } else if ("AllData".equals(messageName)) { - LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); + InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); fcpListenerManager.fireReceivedAllData(new AllData(fcpMessage, payloadInputStream)); - try { - payloadInputStream.consume(); - } catch (IOException ioe1) { - /* well, ignore. when the connection handler fails, all fails. */ - } } else if ("EndListPeerNotes".equals(messageName)) { fcpListenerManager.fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage)); } else if ("EndListPeers".equals(messageName)) { @@ -290,15 +290,12 @@ public class FcpConnection implements Closeable { } else if ("UnknownNodeIdentifier".equals(messageName)) { fcpListenerManager.fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage)); } else if ("FCPPluginReply".equals(messageName)) { - LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"))); + InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"), 0)); fcpListenerManager.fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream)); - try { - payloadInputStream.consume(); - } catch (IOException ioe1) { - /* ignore. */ - } } else if ("PluginInfo".equals(messageName)) { fcpListenerManager.fireReceivedPluginInfo(new PluginInfo(fcpMessage)); + } else if ("PluginRemoved".equals(messageName)) { + fcpListenerManager.fireReceivedPluginRemoved(new PluginRemoved(fcpMessage)); } else if ("NodeData".equals(messageName)) { fcpListenerManager.fireReceivedNodeData(new NodeData(fcpMessage)); } else if ("TestDDAReply".equals(messageName)) { @@ -358,142 +355,8 @@ public class FcpConnection implements Closeable { logger.finest("count for " + name + ": " + (oldValue + 1)); } - /** - * Returns a limited input stream from the node’s input stream. - * - * @param dataLength - * The length of the stream - * @return The limited input stream - */ - private synchronized LimitedInputStream getInputStream(long dataLength) { - if (dataLength <= 0) { - return new LimitedInputStream(null, 0); - } - return new LimitedInputStream(remoteInputStream, dataLength); - } - - /** - * A wrapper around an {@link InputStream} that only supplies a limit - * number of bytes from the underlying input stream. - * - * @author David ‘Bombe’ Roden <bombe@freenetproject.org> - */ - private static class LimitedInputStream extends FilterInputStream { - - /** The remaining number of bytes that can be read. */ - private long remaining; - - /** - * Creates a new LimitedInputStream that supplies at most - * length bytes from the given input stream. - * - * @param inputStream - * The input stream - * @param length - * The number of bytes to read - */ - public LimitedInputStream(InputStream inputStream, long length) { - super(inputStream); - remaining = length; - } - - /** - * @see java.io.FilterInputStream#available() - */ - @Override - public synchronized int available() throws IOException { - if (remaining == 0) { - return 0; - } - return (int) Math.min(super.available(), Math.min(Integer.MAX_VALUE, remaining)); - } - - /** - * @see java.io.FilterInputStream#read() - */ - @Override - public synchronized int read() throws IOException { - int read = -1; - if (remaining > 0) { - read = super.read(); - remaining--; - } - return read; - } - - /** - * @see java.io.FilterInputStream#read(byte[], int, int) - */ - @Override - public synchronized int read(byte[] b, int off, int len) throws IOException { - if (remaining == 0) { - return -1; - } - int toCopy = (int) Math.min(len, Math.min(remaining, Integer.MAX_VALUE)); - int read = super.read(b, off, toCopy); - remaining -= read; - return read; - } - - /** - * @see java.io.FilterInputStream#skip(long) - */ - @Override - public synchronized long skip(long n) throws IOException { - if ((n < 0) || (remaining == 0)) { - return 0; - } - long skipped = super.skip(Math.min(n, remaining)); - remaining -= skipped; - return skipped; - } - - /** - * {@inheritDoc} This method does nothing, as {@link #mark(int)} and - * {@link #reset()} are not supported. - * - * @see java.io.FilterInputStream#mark(int) - */ - @Override - public synchronized void mark(int readlimit) { - /* do nothing. */ - } - - /** - * {@inheritDoc} - * - * @see java.io.FilterInputStream#markSupported() - * @return false - */ - @Override - public boolean markSupported() { - return false; - } - - /** - * {@inheritDoc} This method does nothing, as {@link #mark(int)} and - * {@link #reset()} are not supported. - * - * @see java.io.FilterInputStream#reset() - */ - @Override - public synchronized void reset() throws IOException { - /* do nothing. */ - } - - /** - * Consumes the input stream, i.e. read all bytes until the limit is - * reached. - * - * @throws IOException - * if an I/O error occurs - */ - public synchronized void consume() throws IOException { - while (remaining > 0) { - skip(remaining); - } - } - + private synchronized InputStream getInputStream(long dataLength) throws IOException { + return new TempInputStream(remoteInputStream, dataLength); } }