X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;ds=sidebyside;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Ffcp%2FFcpConnection.java;h=3331bdf17cca8fd80d9c6c0faae878fb85c8622c;hb=d81674b71bc9f250fc61db366e1cffeae897f55e;hp=e2bf72a64d7f6e85e51c9bdda88398175edbaf9f;hpb=16f13bf1bb39de9af116ddfcbccf0d3d426490bc;p=jFCPlib.git
diff --git a/src/main/java/net/pterodactylus/fcp/FcpConnection.java b/src/main/java/net/pterodactylus/fcp/FcpConnection.java
index e2bf72a..3331bdf 100644
--- a/src/main/java/net/pterodactylus/fcp/FcpConnection.java
+++ b/src/main/java/net/pterodactylus/fcp/FcpConnection.java
@@ -1,9 +1,9 @@
/*
- * jFCPlib - FpcConnection.java - Copyright © 2008 David Roden
+ * jFCPlib - FcpConnection.java - Copyright © 2008â2016 David Roden
*
- * This program is free software; you can redistribute it and/or modify
+ * This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
+ * the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
@@ -12,8 +12,7 @@
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ * along with this program. If not, see .
*/
package net.pterodactylus.fcp;
@@ -31,7 +30,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
-import net.pterodactylus.util.logging.Logging;
+import net.pterodactylus.fcp.FcpUtils.TempInputStream;
/**
* An FCP connection to a Freenet node.
@@ -41,7 +40,7 @@ import net.pterodactylus.util.logging.Logging;
public class FcpConnection implements Closeable {
/** Logger. */
- private static final Logger logger = Logging.getLogger(FcpConnection.class.getName());
+ private static final Logger logger = Logger.getLogger(FcpConnection.class.getName());
/** The default port for FCP v2. */
public static final int DEFAULT_PORT = 9481;
@@ -68,7 +67,7 @@ public class FcpConnection implements Closeable {
private FcpConnectionHandler connectionHandler;
/** Incoming message statistics. */
- private Map incomingMessageStatistics = Collections.synchronizedMap(new HashMap());
+ private static final Map incomingMessageStatistics = Collections.synchronizedMap(new HashMap());
/**
* Creates a new FCP connection to the freenet node running on localhost,
@@ -158,6 +157,10 @@ public class FcpConnection implements Closeable {
fcpListenerManager.removeListener(fcpListener);
}
+ public synchronized boolean isClosed() {
+ return connectionHandler == null;
+ }
+
//
// ACTIONS
//
@@ -193,9 +196,10 @@ public class FcpConnection implements Closeable {
}
/**
- * Closes the connection. If there is no connection to the node, this method
- * does nothing.
+ * Closes the connection. If there is no connection to the node, this
+ * method does nothing.
*/
+ @Override
public void close() {
handleDisconnect(null);
}
@@ -224,7 +228,7 @@ public class FcpConnection implements Closeable {
* @param fcpMessage
* The received message
*/
- void handleMessage(FcpMessage fcpMessage) {
+ void handleMessage(FcpMessage fcpMessage) throws IOException{
logger.fine("received message: " + fcpMessage.getName());
String messageName = fcpMessage.getName();
countMessage(messageName);
@@ -262,16 +266,13 @@ public class FcpConnection implements Closeable {
fcpListenerManager.fireReceivedDataFound(new DataFound(fcpMessage));
} else if ("SubscribedUSKUpdate".equals(messageName)) {
fcpListenerManager.fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage));
+ } else if ("SubscribedUSK".equals(messageName)) {
+ fcpListenerManager.fireReceivedSubscribedUSK(new SubscribedUSK(fcpMessage));
} else if ("IdentifierCollision".equals(messageName)) {
fcpListenerManager.fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage));
} else if ("AllData".equals(messageName)) {
- LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
+ InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
fcpListenerManager.fireReceivedAllData(new AllData(fcpMessage, payloadInputStream));
- try {
- payloadInputStream.consume();
- } catch (IOException ioe1) {
- /* well, ignore. when the connection handler fails, all fails. */
- }
} else if ("EndListPeerNotes".equals(messageName)) {
fcpListenerManager.fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage));
} else if ("EndListPeers".equals(messageName)) {
@@ -289,15 +290,12 @@ public class FcpConnection implements Closeable {
} else if ("UnknownNodeIdentifier".equals(messageName)) {
fcpListenerManager.fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage));
} else if ("FCPPluginReply".equals(messageName)) {
- LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
+ InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength"), 0));
fcpListenerManager.fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream));
- try {
- payloadInputStream.consume();
- } catch (IOException ioe1) {
- /* ignore. */
- }
} else if ("PluginInfo".equals(messageName)) {
fcpListenerManager.fireReceivedPluginInfo(new PluginInfo(fcpMessage));
+ } else if ("PluginRemoved".equals(messageName)) {
+ fcpListenerManager.fireReceivedPluginRemoved(new PluginRemoved(fcpMessage));
} else if ("NodeData".equals(messageName)) {
fcpListenerManager.fireReceivedNodeData(new NodeData(fcpMessage));
} else if ("TestDDAReply".equals(messageName)) {
@@ -323,8 +321,8 @@ public class FcpConnection implements Closeable {
* Handles a disconnect from the node.
*
* @param throwable
- * The exception that caused the disconnect, or null
- * if there was no exception
+ * The exception that caused the disconnect, or
+ * null
if there was no exception
*/
synchronized void handleDisconnect(Throwable throwable) {
FcpUtils.close(remoteInputStream);
@@ -357,142 +355,8 @@ public class FcpConnection implements Closeable {
logger.finest("count for " + name + ": " + (oldValue + 1));
}
- /**
- * Returns a limited input stream from the nodeâs input stream.
- *
- * @param dataLength
- * The length of the stream
- * @return The limited input stream
- */
- private synchronized LimitedInputStream getInputStream(long dataLength) {
- if (dataLength <= 0) {
- return new LimitedInputStream(null, 0);
- }
- return new LimitedInputStream(remoteInputStream, dataLength);
- }
-
- /**
- * A wrapper around an {@link InputStream} that only supplies a limit number
- * of bytes from the underlying input stream.
- *
- * @author David âBombeâ Roden <bombe@freenetproject.org>
- */
- private static class LimitedInputStream extends FilterInputStream {
-
- /** The remaining number of bytes that can be read. */
- private long remaining;
-
- /**
- * Creates a new LimitedInputStream that supplies at most
- * length
bytes from the given input stream.
- *
- * @param inputStream
- * The input stream
- * @param length
- * The number of bytes to read
- */
- public LimitedInputStream(InputStream inputStream, long length) {
- super(inputStream);
- remaining = length;
- }
-
- /**
- * @see java.io.FilterInputStream#available()
- */
- @Override
- public synchronized int available() throws IOException {
- if (remaining == 0) {
- return 0;
- }
- return (int) Math.min(super.available(), Math.min(Integer.MAX_VALUE, remaining));
- }
-
- /**
- * @see java.io.FilterInputStream#read()
- */
- @Override
- public synchronized int read() throws IOException {
- int read = -1;
- if (remaining > 0) {
- read = super.read();
- remaining--;
- }
- return read;
- }
-
- /**
- * @see java.io.FilterInputStream#read(byte[], int, int)
- */
- @Override
- public synchronized int read(byte[] b, int off, int len) throws IOException {
- if (remaining == 0) {
- return -1;
- }
- int toCopy = (int) Math.min(len, Math.min(remaining, Integer.MAX_VALUE));
- int read = super.read(b, off, toCopy);
- remaining -= read;
- return read;
- }
-
- /**
- * @see java.io.FilterInputStream#skip(long)
- */
- @Override
- public synchronized long skip(long n) throws IOException {
- if ((n < 0) || (remaining == 0)) {
- return 0;
- }
- long skipped = super.skip(Math.min(n, remaining));
- remaining -= skipped;
- return skipped;
- }
-
- /**
- * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
- * {@link #reset()} are not supported.
- *
- * @see java.io.FilterInputStream#mark(int)
- */
- @Override
- public synchronized void mark(int readlimit) {
- /* do nothing. */
- }
-
- /**
- * {@inheritDoc}
- *
- * @see java.io.FilterInputStream#markSupported()
- * @return false
- */
- @Override
- public boolean markSupported() {
- return false;
- }
-
- /**
- * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
- * {@link #reset()} are not supported.
- *
- * @see java.io.FilterInputStream#reset()
- */
- @Override
- public synchronized void reset() throws IOException {
- /* do nothing. */
- }
-
- /**
- * Consumes the input stream, i.e. read all bytes until the limit is
- * reached.
- *
- * @throws IOException
- * if an I/O error occurs
- */
- public synchronized void consume() throws IOException {
- while (remaining > 0) {
- skip(remaining);
- }
- }
-
+ private synchronized InputStream getInputStream(long dataLength) throws IOException {
+ return new TempInputStream(remoteInputStream, dataLength);
}
}