/*
- * jFCPlib - FpcConnection.java - Copyright © 2008 David Roden
+ * jFCPlib - FcpConnection.java - Copyright © 2008–2016 David Roden
*
- * This program is free software; you can redistribute it and/or modify
+ * This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
+ * the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package net.pterodactylus.fcp;
import java.util.Map;
import java.util.logging.Logger;
-import net.pterodactylus.util.logging.Logging;
+import net.pterodactylus.fcp.FcpUtils.TempInputStream;
/**
* An FCP connection to a Freenet node.
public class FcpConnection implements Closeable {
/** Logger. */
- private static final Logger logger = Logging.getLogger(FcpConnection.class.getName());
+ private static final Logger logger = Logger.getLogger(FcpConnection.class.getName());
/** The default port for FCP v2. */
public static final int DEFAULT_PORT = 9481;
private FcpConnectionHandler connectionHandler;
/** Incoming message statistics. */
- private Map<String, Integer> incomingMessageStatistics = Collections.synchronizedMap(new HashMap<String, Integer>());
+ private static final Map<String, Integer> incomingMessageStatistics = Collections.synchronizedMap(new HashMap<String, Integer>());
/**
* Creates a new FCP connection to the freenet node running on localhost,
fcpListenerManager.removeListener(fcpListener);
}
+ public synchronized boolean isClosed() {
+ return connectionHandler == null;
+ }
+
//
// ACTIONS
//
}
/**
- * Closes the connection. If there is no connection to the node, this method
- * does nothing.
+ * Closes the connection. If there is no connection to the node, this
+ * method does nothing.
*/
+ @Override
public void close() {
handleDisconnect(null);
}
* @param fcpMessage
* The received message
*/
- void handleMessage(FcpMessage fcpMessage) {
+ void handleMessage(FcpMessage fcpMessage) throws IOException{
logger.fine("received message: " + fcpMessage.getName());
String messageName = fcpMessage.getName();
countMessage(messageName);
fcpListenerManager.fireReceivedDataFound(new DataFound(fcpMessage));
} else if ("SubscribedUSKUpdate".equals(messageName)) {
fcpListenerManager.fireReceivedSubscribedUSKUpdate(new SubscribedUSKUpdate(fcpMessage));
+ } else if ("SubscribedUSK".equals(messageName)) {
+ fcpListenerManager.fireReceivedSubscribedUSK(new SubscribedUSK(fcpMessage));
} else if ("IdentifierCollision".equals(messageName)) {
fcpListenerManager.fireReceivedIdentifierCollision(new IdentifierCollision(fcpMessage));
} else if ("AllData".equals(messageName)) {
- LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
+ InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
fcpListenerManager.fireReceivedAllData(new AllData(fcpMessage, payloadInputStream));
- try {
- payloadInputStream.consume();
- } catch (IOException ioe1) {
- /* well, ignore. when the connection handler fails, all fails. */
- }
} else if ("EndListPeerNotes".equals(messageName)) {
fcpListenerManager.fireReceivedEndListPeerNotes(new EndListPeerNotes(fcpMessage));
} else if ("EndListPeers".equals(messageName)) {
} else if ("UnknownNodeIdentifier".equals(messageName)) {
fcpListenerManager.fireReceivedUnknownNodeIdentifier(new UnknownNodeIdentifier(fcpMessage));
} else if ("FCPPluginReply".equals(messageName)) {
- LimitedInputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
+ InputStream payloadInputStream = getInputStream(FcpUtils.safeParseLong(fcpMessage.getField("DataLength")));
fcpListenerManager.fireReceivedFCPPluginReply(new FCPPluginReply(fcpMessage, payloadInputStream));
- try {
- payloadInputStream.consume();
- } catch (IOException ioe1) {
- /* ignore. */
- }
} else if ("PluginInfo".equals(messageName)) {
fcpListenerManager.fireReceivedPluginInfo(new PluginInfo(fcpMessage));
+ } else if ("PluginRemoved".equals(messageName)) {
+ fcpListenerManager.fireReceivedPluginRemoved(new PluginRemoved(fcpMessage));
} else if ("NodeData".equals(messageName)) {
fcpListenerManager.fireReceivedNodeData(new NodeData(fcpMessage));
} else if ("TestDDAReply".equals(messageName)) {
* Handles a disconnect from the node.
*
* @param throwable
- * The exception that caused the disconnect, or <code>null</code>
- * if there was no exception
+ * The exception that caused the disconnect, or
+ * <code>null</code> if there was no exception
*/
synchronized void handleDisconnect(Throwable throwable) {
FcpUtils.close(remoteInputStream);
logger.finest("count for " + name + ": " + (oldValue + 1));
}
- /**
- * Returns a limited input stream from the node’s input stream.
- *
- * @param dataLength
- * The length of the stream
- * @return The limited input stream
- */
- private synchronized LimitedInputStream getInputStream(long dataLength) {
- if (dataLength <= 0) {
- return new LimitedInputStream(null, 0);
- }
- return new LimitedInputStream(remoteInputStream, dataLength);
- }
-
- /**
- * A wrapper around an {@link InputStream} that only supplies a limit number
- * of bytes from the underlying input stream.
- *
- * @author David ‘Bombe’ Roden <bombe@freenetproject.org>
- */
- private static class LimitedInputStream extends FilterInputStream {
-
- /** The remaining number of bytes that can be read. */
- private long remaining;
-
- /**
- * Creates a new LimitedInputStream that supplies at most
- * <code>length</code> bytes from the given input stream.
- *
- * @param inputStream
- * The input stream
- * @param length
- * The number of bytes to read
- */
- public LimitedInputStream(InputStream inputStream, long length) {
- super(inputStream);
- remaining = length;
- }
-
- /**
- * @see java.io.FilterInputStream#available()
- */
- @Override
- public synchronized int available() throws IOException {
- if (remaining == 0) {
- return 0;
- }
- return (int) Math.min(super.available(), Math.min(Integer.MAX_VALUE, remaining));
- }
-
- /**
- * @see java.io.FilterInputStream#read()
- */
- @Override
- public synchronized int read() throws IOException {
- int read = -1;
- if (remaining > 0) {
- read = super.read();
- remaining--;
- }
- return read;
- }
-
- /**
- * @see java.io.FilterInputStream#read(byte[], int, int)
- */
- @Override
- public synchronized int read(byte[] b, int off, int len) throws IOException {
- if (remaining == 0) {
- return -1;
- }
- int toCopy = (int) Math.min(len, Math.min(remaining, Integer.MAX_VALUE));
- int read = super.read(b, off, toCopy);
- remaining -= read;
- return read;
- }
-
- /**
- * @see java.io.FilterInputStream#skip(long)
- */
- @Override
- public synchronized long skip(long n) throws IOException {
- if ((n < 0) || (remaining == 0)) {
- return 0;
- }
- long skipped = super.skip(Math.min(n, remaining));
- remaining -= skipped;
- return skipped;
- }
-
- /**
- * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
- * {@link #reset()} are not supported.
- *
- * @see java.io.FilterInputStream#mark(int)
- */
- @Override
- public void mark(int readlimit) {
- /* do nothing. */
- }
-
- /**
- * {@inheritDoc}
- *
- * @see java.io.FilterInputStream#markSupported()
- * @return <code>false</code>
- */
- @Override
- public boolean markSupported() {
- return false;
- }
-
- /**
- * {@inheritDoc} This method does nothing, as {@link #mark(int)} and
- * {@link #reset()} are not supported.
- *
- * @see java.io.FilterInputStream#reset()
- */
- @Override
- public void reset() throws IOException {
- /* do nothing. */
- }
-
- /**
- * Consumes the input stream, i.e. read all bytes until the limit is
- * reached.
- *
- * @throws IOException
- * if an I/O error occurs
- */
- public synchronized void consume() throws IOException {
- while (remaining > 0) {
- skip(remaining);
- }
- }
-
+ private synchronized InputStream getInputStream(long dataLength) throws IOException {
+ return new TempInputStream(remoteInputStream, dataLength);
}
}