X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Firc%2FConnection.java;h=336fdae2ea31b7b1ed1777ba756aaabdeb6e13b2;hb=79765458cdf668242941ec067bc3ec12783859cf;hp=530b0c7e8f2698e57d693627fa4f7b7c24394358;hpb=ad63c79977ba791203fb0f899990ea61878dbf07;p=xudocci.git diff --git a/src/main/java/net/pterodactylus/irc/Connection.java b/src/main/java/net/pterodactylus/irc/Connection.java index 530b0c7..336fdae 100644 --- a/src/main/java/net/pterodactylus/irc/Connection.java +++ b/src/main/java/net/pterodactylus/irc/Connection.java @@ -18,6 +18,7 @@ package net.pterodactylus.irc; import static com.google.common.base.Preconditions.checkState; +import static java.util.concurrent.TimeUnit.SECONDS; import java.io.BufferedReader; import java.io.Closeable; @@ -34,6 +35,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import javax.net.SocketFactory; @@ -58,8 +60,11 @@ import net.pterodactylus.irc.event.NicknameInUseReceived; import net.pterodactylus.irc.event.NoNicknameGivenReceived; import net.pterodactylus.irc.event.PrivateMessageReceived; import net.pterodactylus.irc.event.PrivateNoticeReceived; +import net.pterodactylus.irc.event.ReplyReceived; import net.pterodactylus.irc.event.UnknownReplyReceived; import net.pterodactylus.irc.util.RandomNickname; +import net.pterodactylus.xdcc.util.io.BandwidthCountingInputStream; +import net.pterodactylus.xdcc.util.io.BandwidthCountingOutputStream; import com.google.common.base.Optional; import com.google.common.collect.Maps; @@ -144,6 +149,24 @@ public class Connection extends AbstractExecutionThreadService implements Servic // /** + * Returns the hostname of the remote end of the connection. + * + * @return The remote’s hostname + */ + public String hostname() { + return hostname; + } + + /** + * Returns the port number of the remote end of the connection. + * + * @return The remote’s port number + */ + public int port() { + return port; + } + + /** * Returns whether this connection has already been established. * * @return {@code true} as long as this connection is established, {@code @@ -221,6 +244,24 @@ public class Connection extends AbstractExecutionThreadService implements Servic // /** + * Returns the current rate of the connection’s incoming side. + * + * @return The current input rate (in bytes per second) + */ + public long getInputRate() { + return (connectionHandler != null) ? connectionHandler.getInputRate() : 0; + } + + /** + * Returns the current rate of the connection’s outgoing side. + * + * @return The current output rate (in bytes per second) + */ + public long getOutputRate() { + return (connectionHandler != null) ? connectionHandler.getOutputRate() : 0; + } + + /** * Checks whether the given source is the client represented by this * connection. * @@ -278,6 +319,18 @@ public class Connection extends AbstractExecutionThreadService implements Servic connectionHandler.sendCommand("PRIVMSG", recipient, String.format("\u0001DCC RESUME %s %d %d\u0001", filename, port, position)); } + /** + * Closes this connection. + * + * @throws IOException + * if an I/O error occurs + */ + public void close() throws IOException { + if (connectionHandler != null) { + connectionHandler.close(); + } + } + // // ABSTRACTEXECUTIONTHREADSERVICE METHODS // @@ -294,6 +347,7 @@ public class Connection extends AbstractExecutionThreadService implements Servic /* connect to remote socket. */ try { Socket socket = socketFactory.createSocket(hostname, port); + socket.setSoTimeout((int) TimeUnit.MINUTES.toMillis(3)); connectionHandler = new ConnectionHandler(socket.getInputStream(), socket.getOutputStream()); /* register connection. */ @@ -324,6 +378,7 @@ public class Connection extends AbstractExecutionThreadService implements Servic while (connected) { Reply reply = connectionHandler.readReply(); + eventBus.post(new ReplyReceived(this, reply)); logger.finest(String.format("<< %s", reply)); String command = reply.command(); List parameters = reply.parameters(); @@ -462,6 +517,9 @@ public class Connection extends AbstractExecutionThreadService implements Servic } catch (IOException ioe1) { logger.log(Level.WARNING, "I/O error", ioe1); eventBus.post(new ConnectionClosed(this, ioe1)); + } catch (RuntimeException re1) { + logger.log(Level.SEVERE, "Runtime error", re1); + eventBus.post(new ConnectionClosed(this, re1)); } finally { established = false; logger.info("Closing Connection."); @@ -501,7 +559,7 @@ public class Connection extends AbstractExecutionThreadService implements Servic } } else if (messageWords[1].equalsIgnoreCase("ACCEPT")) { Optional port = Optional.fromNullable(Ints.tryParse(messageWords[3])); - long position = Optional.fromNullable(Longs.tryParse(messageWords[4])).or(-1L); + long position = (messageWords.length > 4) ? Optional.fromNullable(Longs.tryParse(messageWords[4])).or(-1L) : -1; if (port.isPresent()) { eventBus.post(new DccAcceptReceived(this, client, messageWords[2], port.get(), position)); } else { @@ -561,7 +619,10 @@ public class Connection extends AbstractExecutionThreadService implements Servic private class ConnectionHandler implements Closeable { /** The output stream of the connection. */ - private final OutputStream outputStream; + private final BandwidthCountingOutputStream outputStream; + + /** The input stream. */ + private final BandwidthCountingInputStream inputStream; /** The input stream of the connection. */ private final BufferedReader inputStreamReader; @@ -578,8 +639,9 @@ public class Connection extends AbstractExecutionThreadService implements Servic * if the encoding (currently “UTF-8”) is not valid */ private ConnectionHandler(InputStream inputStream, OutputStream outputStream) throws UnsupportedEncodingException { - this.outputStream = outputStream; - inputStreamReader = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")); + this.outputStream = new BandwidthCountingOutputStream(outputStream, 5, SECONDS); + this.inputStream = new BandwidthCountingInputStream(inputStream, 5, SECONDS); + inputStreamReader = new BufferedReader(new InputStreamReader(this.inputStream, "UTF-8")); } // @@ -587,6 +649,24 @@ public class Connection extends AbstractExecutionThreadService implements Servic // /** + * Returns the current rate of the connection’s incoming side. + * + * @return The current input rate (in bytes per second) + */ + public long getInputRate() { + return inputStream.getCurrentRate(); + } + + /** + * Returns the current rate of the connection’s outgoing side. + * + * @return The current output rate (in bytes per second) + */ + public long getOutputRate() { + return outputStream.getCurrentRate(); + } + + /** * Sends a command with the given parameters, skipping all {@link * Optional#absent()} optionals. * @@ -668,6 +748,7 @@ public class Connection extends AbstractExecutionThreadService implements Servic public void close() throws IOException { Closeables.close(outputStream, true); Closeables.close(inputStreamReader, true); + Closeables.close(inputStream, true); } }