X-Git-Url: https://git.pterodactylus.net/?p=xudocci.git;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Firc%2FConnection.java;h=2f8941442301276cb016fe331b27df4bcbac6fcb;hp=ade43c9f693e63879fd32aa7a5c91eaca5c9f97c;hb=c82a73fcb36e0304d0d15b554550ff9732287213;hpb=3e5434124d9dc162883f4866174046799c3a3f33 diff --git a/src/main/java/net/pterodactylus/irc/Connection.java b/src/main/java/net/pterodactylus/irc/Connection.java index ade43c9..2f89414 100644 --- a/src/main/java/net/pterodactylus/irc/Connection.java +++ b/src/main/java/net/pterodactylus/irc/Connection.java @@ -18,6 +18,7 @@ package net.pterodactylus.irc; import static com.google.common.base.Preconditions.checkState; +import static java.util.concurrent.TimeUnit.SECONDS; import java.io.BufferedReader; import java.io.Closeable; @@ -27,37 +28,53 @@ import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; import java.io.UnsupportedEncodingException; +import java.net.InetAddress; import java.net.Socket; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.logging.Level; -import java.util.logging.Logger; +import java.util.concurrent.TimeUnit; import javax.net.SocketFactory; import net.pterodactylus.irc.event.ChannelJoined; +import net.pterodactylus.irc.event.ChannelLeft; import net.pterodactylus.irc.event.ChannelMessageReceived; import net.pterodactylus.irc.event.ChannelNicknames; import net.pterodactylus.irc.event.ChannelNotJoined; import net.pterodactylus.irc.event.ChannelNotJoined.Reason; +import net.pterodactylus.irc.event.ChannelNoticeReceived; import net.pterodactylus.irc.event.ChannelTopic; +import net.pterodactylus.irc.event.ClientQuit; +import net.pterodactylus.irc.event.ConnectionClosed; import net.pterodactylus.irc.event.ConnectionEstablished; import net.pterodactylus.irc.event.ConnectionFailed; +import net.pterodactylus.irc.event.DccAcceptReceived; +import net.pterodactylus.irc.event.DccSendReceived; +import net.pterodactylus.irc.event.KickedFromChannel; import net.pterodactylus.irc.event.MotdReceived; +import net.pterodactylus.irc.event.NicknameChanged; import net.pterodactylus.irc.event.NicknameInUseReceived; import net.pterodactylus.irc.event.NoNicknameGivenReceived; import net.pterodactylus.irc.event.PrivateMessageReceived; +import net.pterodactylus.irc.event.PrivateNoticeReceived; +import net.pterodactylus.irc.event.ReplyReceived; import net.pterodactylus.irc.event.UnknownReplyReceived; import net.pterodactylus.irc.util.RandomNickname; +import net.pterodactylus.xdcc.util.io.BandwidthCountingInputStream; +import net.pterodactylus.xdcc.util.io.BandwidthCountingOutputStream; import com.google.common.base.Optional; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.eventbus.EventBus; import com.google.common.io.Closeables; +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; import com.google.common.util.concurrent.AbstractExecutionThreadService; import com.google.common.util.concurrent.Service; +import org.apache.log4j.Logger; /** * A connection to an IRC server. @@ -105,6 +122,9 @@ public class Connection extends AbstractExecutionThreadService implements Servic /** The connection handler. */ private ConnectionHandler connectionHandler; + /** Whether the connection has already been established. */ + private boolean established; + /** * Creates a new connection. * @@ -129,6 +149,34 @@ public class Connection extends AbstractExecutionThreadService implements Servic // /** + * Returns the hostname of the remote end of the connection. + * + * @return The remote’s hostname + */ + public String hostname() { + return hostname; + } + + /** + * Returns the port number of the remote end of the connection. + * + * @return The remote’s port number + */ + public int port() { + return port; + } + + /** + * Returns whether this connection has already been established. + * + * @return {@code true} as long as this connection is established, {@code + * false} otherwise + */ + public boolean established() { + return established; + } + + /** * Returns the nickname that is currently in use by this connection. The * nickname is only available once the connection has been {@link #start()}ed. * @@ -196,6 +244,37 @@ public class Connection extends AbstractExecutionThreadService implements Servic // /** + * Returns the current rate of the connection’s incoming side. + * + * @return The current input rate (in bytes per second) + */ + public long getInputRate() { + return (connectionHandler != null) ? connectionHandler.getInputRate() : 0; + } + + /** + * Returns the current rate of the connection’s outgoing side. + * + * @return The current output rate (in bytes per second) + */ + public long getOutputRate() { + return (connectionHandler != null) ? connectionHandler.getOutputRate() : 0; + } + + /** + * Checks whether the given source is the client represented by this + * connection. + * + * @param source + * The source to check + * @return {@code true} if this connection represents the given source, {@code + * false} otherwise + */ + public boolean isSource(Source source) { + return source.nick().isPresent() && source.nick().get().equals(nickname); + } + + /** * Joins the given channel. * * @param channel @@ -222,6 +301,36 @@ public class Connection extends AbstractExecutionThreadService implements Servic connectionHandler.sendCommand("PRIVMSG", recipient, message); } + /** + * Sends a DCC RESUME request to the given recipient. + * + * @param recipient + * The recipient of the request + * @param filename + * The name of the file to resume + * @param port + * The port number from the original DCC SEND request + * @param position + * The position at which to resume the transfer + * @throws IOException + * if an I/O error occurs + */ + public void sendDccResume(String recipient, String filename, int port, long position) throws IOException { + connectionHandler.sendCommand("PRIVMSG", recipient, String.format("\u0001DCC RESUME %s %d %d\u0001", filename, port, position)); + } + + /** + * Closes this connection. + * + * @throws IOException + * if an I/O error occurs + */ + public void close() throws IOException { + if (connectionHandler != null) { + connectionHandler.close(); + } + } + // // ABSTRACTEXECUTIONTHREADSERVICE METHODS // @@ -238,6 +347,7 @@ public class Connection extends AbstractExecutionThreadService implements Servic /* connect to remote socket. */ try { Socket socket = socketFactory.createSocket(hostname, port); + socket.setSoTimeout((int) TimeUnit.MINUTES.toMillis(3)); connectionHandler = new ConnectionHandler(socket.getInputStream(), socket.getOutputStream()); /* register connection. */ @@ -268,7 +378,8 @@ public class Connection extends AbstractExecutionThreadService implements Servic while (connected) { Reply reply = connectionHandler.readReply(); - logger.finest(String.format("<< %s", reply)); + eventBus.post(new ReplyReceived(this, reply)); + logger.trace(String.format("<< %s", reply)); String command = reply.command(); List parameters = reply.parameters(); @@ -276,55 +387,27 @@ public class Connection extends AbstractExecutionThreadService implements Servic if (command.equalsIgnoreCase("PRIVMSG")) { String recipient = parameters.get(0); String message = parameters.get(1); - if (!channelTypes.contains(recipient.charAt(0))) { + if (message.startsWith("\u0001") && message.endsWith("\u0001")) { + /* CTCP! */ + handleCtcp(reply.source().get(), message); + } else if (!channelTypes.contains(recipient.charAt(0))) { eventBus.post(new PrivateMessageReceived(this, reply.source().get(), message)); } else { eventBus.post(new ChannelMessageReceived(this, recipient, reply.source().get(), message)); } - /* replies 001-004 don’t hold information but they have to be sent on a successful connection. */ - } else if (command.equals("001")) { - connectionStatus |= 0x01; - } else if (command.equals("002")) { - connectionStatus |= 0x02; - } else if (command.equals("003")) { - connectionStatus |= 0x04; - } else if (command.equals("004")) { - connectionStatus |= 0x08; - - /* 005 originally was a bounce message, now used to transmit useful information about the server. */ - } else if (command.equals("005")) { - for (String parameter : parameters) { - if (parameter.startsWith("PREFIX=")) { - int openParen = parameter.indexOf('('); - int closeParen = parameter.indexOf(')'); - if ((openParen != -1) && (closeParen != -1)) { - for (int modeCharacterIndex = 1; modeCharacterIndex < (closeParen - openParen); ++modeCharacterIndex) { - char modeCharacter = parameter.charAt(openParen + modeCharacterIndex); - char modeSymbol = parameter.charAt(closeParen + modeCharacterIndex); - nickPrefixes.put(String.valueOf(modeSymbol), String.valueOf(modeCharacter)); - } - logger.fine(String.format("Parsed Prefixes: %s", nickPrefixes)); - } - } else if (parameter.startsWith("CHANTYPES=")) { - for (int typeIndex = 10; typeIndex < parameter.length(); ++typeIndex) { - channelTypes.add(parameter.charAt(typeIndex)); - } - logger.fine(String.format("Parsed Channel Types: %s", channelTypes)); - } + } else if (command.equalsIgnoreCase("NOTICE")) { + String recipient = parameters.get(0); + String message = parameters.get(1); + if (message.startsWith("\u0001") && message.endsWith("\u0001")) { + /* CTCP! */ + handleCtcp(reply.source().get(), message); + } else if (!channelTypes.contains(recipient.charAt(0))) { + eventBus.post(new PrivateNoticeReceived(this, reply)); + } else { + eventBus.post(new ChannelNoticeReceived(this, reply.source().get(), recipient, message)); } - /* 375, 372, and 376 handle the server’s MOTD. */ - } else if (command.equals("375")) { - /* MOTD starts. */ - motd.append(parameters.get(1)).append('\n'); - } else if (command.equals("372")) { - motd.append(parameters.get(1)).append('\n'); - } else if (command.equals("376")) { - motd.append(parameters.get(1)).append('\n'); - eventBus.post(new MotdReceived(this, motd.toString())); - motd.setLength(0); - /* 43x replies are for nick change errors. */ } else if (command.equals("431")) { eventBus.post(new NoNicknameGivenReceived(this, reply)); @@ -336,6 +419,10 @@ public class Connection extends AbstractExecutionThreadService implements Servic eventBus.post(new NicknameInUseReceived(this, reply)); } + /* client stuff. */ + } else if (command.equalsIgnoreCase("NICK")) { + eventBus.post(new NicknameChanged(this, reply.source().get(), parameters.get(0))); + /* channel stuff. */ } else if (command.equalsIgnoreCase("JOIN")) { eventBus.post(new ChannelJoined(this, parameters.get(0), reply.source().get())); @@ -354,6 +441,10 @@ public class Connection extends AbstractExecutionThreadService implements Servic } else if (command.equals("366")) { eventBus.post(new ChannelNicknames(this, parameters.get(1), nicks)); nicks.clear(); + } else if (command.equalsIgnoreCase("PART")) { + eventBus.post(new ChannelLeft(this, parameters.get(0), reply.source().get(), getOptional(parameters, 1))); + } else if (command.equalsIgnoreCase("QUIT")) { + eventBus.post(new ClientQuit(this, reply.source().get(), parameters.get(0))); /* common channel join errors. */ } else if (command.equals("474")) { @@ -362,11 +453,59 @@ public class Connection extends AbstractExecutionThreadService implements Servic eventBus.post(new ChannelNotJoined(this, parameters.get(1), Reason.inviteOnly)); } else if (command.equals("475")) { eventBus.post(new ChannelNotJoined(this, parameters.get(1), Reason.badChannelKey)); + } else if (command.equals("477")) { + eventBus.post(new ChannelNotJoined(this, parameters.get(1), Reason.registeredNicknamesOnly)); /* basic connection housekeeping. */ } else if (command.equalsIgnoreCase("PING")) { connectionHandler.sendCommand("PONG", getOptional(parameters, 0), getOptional(parameters, 1)); + /* replies 001-004 don’t hold information but they have to be sent on a successful connection. */ + } else if (command.equals("001")) { + connectionStatus |= 0x01; + } else if (command.equals("002")) { + connectionStatus |= 0x02; + } else if (command.equals("003")) { + connectionStatus |= 0x04; + } else if (command.equals("004")) { + connectionStatus |= 0x08; + + /* 005 originally was a bounce message, now used to transmit useful information about the server. */ + } else if (command.equals("005")) { + for (String parameter : parameters) { + if (parameter.startsWith("PREFIX=")) { + int openParen = parameter.indexOf('('); + int closeParen = parameter.indexOf(')'); + if ((openParen != -1) && (closeParen != -1)) { + for (int modeCharacterIndex = 1; modeCharacterIndex < (closeParen - openParen); ++modeCharacterIndex) { + char modeCharacter = parameter.charAt(openParen + modeCharacterIndex); + char modeSymbol = parameter.charAt(closeParen + modeCharacterIndex); + nickPrefixes.put(String.valueOf(modeSymbol), String.valueOf(modeCharacter)); + } + logger.debug(String.format("Parsed Prefixes: %s", nickPrefixes)); + } + } else if (parameter.startsWith("CHANTYPES=")) { + for (int typeIndex = 10; typeIndex < parameter.length(); ++typeIndex) { + channelTypes.add(parameter.charAt(typeIndex)); + } + logger.debug(String.format("Parsed Channel Types: %s", channelTypes)); + } + } + + /* 375, 372, and 376 handle the server’s MOTD. */ + } else if (command.equals("375")) { + /* MOTD starts. */ + motd.append(parameters.get(1)).append('\n'); + } else if (command.equals("372")) { + motd.append(parameters.get(1)).append('\n'); + } else if (command.equals("376")) { + motd.append(parameters.get(1)).append('\n'); + eventBus.post(new MotdReceived(this, motd.toString())); + motd.setLength(0); + + } else if (command.equals("KICK")) { + eventBus.post(new KickedFromChannel(this, parameters.get(0), reply.source().get(), parameters.get(1), getOptional(parameters, 2))); + /* okay, everything else. */ } else { eventBus.post(new UnknownReplyReceived(this, reply)); @@ -374,13 +513,20 @@ public class Connection extends AbstractExecutionThreadService implements Servic if ((connectionStatus == 0x0f) && (connectionStatus != oldConnectionStatus)) { /* connection succeeded! */ + established = true; eventBus.post(new ConnectionEstablished(this)); } oldConnectionStatus = connectionStatus; } + eventBus.post(new ConnectionClosed(this)); } catch (IOException ioe1) { - logger.log(Level.WARNING, "I/O error", ioe1); + logger.warn("I/O error", ioe1); + eventBus.post(new ConnectionClosed(this, ioe1)); + } catch (RuntimeException re1) { + logger.error("Runtime error", re1); + eventBus.post(new ConnectionClosed(this, re1)); } finally { + established = false; logger.info("Closing Connection."); try { Closeables.close(connectionHandler, true); @@ -396,6 +542,39 @@ public class Connection extends AbstractExecutionThreadService implements Servic // /** + * Handles a CTCP message. + * + * @param client + * The client sending the message + * @param message + * The message + */ + private void handleCtcp(Source client, String message) { + String[] messageWords = message.substring(1, message.length() - 1).split(" +"); + String ctcpCommand = messageWords[0]; + if (ctcpCommand.equalsIgnoreCase("DCC")) { + if (messageWords[1].equalsIgnoreCase("SEND")) { + Optional inetAddress = parseInetAddress(messageWords[3]); + Optional port = Optional.fromNullable(Ints.tryParse(messageWords[4])); + long fileSize = Optional.fromNullable(Longs.tryParse(messageWords[5])).or(-1L); + if (inetAddress.isPresent() && port.isPresent()) { + eventBus.post(new DccSendReceived(this, client, messageWords[2], inetAddress.get(), port.get(), fileSize)); + } else { + logger.warn(String.format("Received malformed DCC SEND: “%s”", message)); + } + } else if (messageWords[1].equalsIgnoreCase("ACCEPT")) { + Optional port = Optional.fromNullable(Ints.tryParse(messageWords[3])); + long position = (messageWords.length > 4) ? Optional.fromNullable(Longs.tryParse(messageWords[4])).or(-1L) : -1; + if (port.isPresent()) { + eventBus.post(new DccAcceptReceived(this, client, messageWords[2], port.get(), position)); + } else { + logger.warn(String.format("Received malformed DCC ACCEPT: “%s”", message)); + } + } + } + } + + /** * Returns an item from the list, or {@link Optional#absent()} if the list is * shorter than required for the given index. * @@ -415,11 +594,40 @@ public class Connection extends AbstractExecutionThreadService implements Servic return Optional.absent(); } + /** + * Parses the given {@code ip} and returns an {@link InetAddress} from it. + * + * @param ip + * The IP to parse + * @return The parsed inet address, or {@link Optional#absent()} if no inet + * address could be parsed + */ + private Optional parseInetAddress(String ip) { + Long ipNumber = Longs.tryParse(ip); + if (ipNumber == null) { + return Optional.absent(); + } + + StringBuilder hostname = new StringBuilder(15); + hostname.append((ipNumber >>> 24) & 0xff).append('.'); + hostname.append((ipNumber >>> 16) & 0xff).append('.'); + hostname.append((ipNumber >>> 8) & 0xff).append('.'); + hostname.append(ipNumber & 0xff); + try { + return Optional.of(InetAddress.getByName(hostname.toString())); + } catch (UnknownHostException uhe1) { + return Optional.absent(); + } + } + /** Handles input and output for the connection. */ private class ConnectionHandler implements Closeable { /** The output stream of the connection. */ - private final OutputStream outputStream; + private final BandwidthCountingOutputStream outputStream; + + /** The input stream. */ + private final BandwidthCountingInputStream inputStream; /** The input stream of the connection. */ private final BufferedReader inputStreamReader; @@ -436,8 +644,9 @@ public class Connection extends AbstractExecutionThreadService implements Servic * if the encoding (currently “UTF-8”) is not valid */ private ConnectionHandler(InputStream inputStream, OutputStream outputStream) throws UnsupportedEncodingException { - this.outputStream = outputStream; - inputStreamReader = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")); + this.outputStream = new BandwidthCountingOutputStream(outputStream, 5, SECONDS); + this.inputStream = new BandwidthCountingInputStream(inputStream, 5, SECONDS); + inputStreamReader = new BufferedReader(new InputStreamReader(this.inputStream, "UTF-8")); } // @@ -445,6 +654,24 @@ public class Connection extends AbstractExecutionThreadService implements Servic // /** + * Returns the current rate of the connection’s incoming side. + * + * @return The current input rate (in bytes per second) + */ + public long getInputRate() { + return inputStream.getCurrentRate(); + } + + /** + * Returns the current rate of the connection’s outgoing side. + * + * @return The current output rate (in bytes per second) + */ + public long getOutputRate() { + return outputStream.getCurrentRate(); + } + + /** * Sends a command with the given parameters, skipping all {@link * Optional#absent()} optionals. * @@ -495,7 +722,7 @@ public class Connection extends AbstractExecutionThreadService implements Servic commandBuilder.append(parameter); } - logger.finest(String.format(">> %s", commandBuilder)); + logger.trace(String.format(">> %s", commandBuilder)); outputStream.write((commandBuilder.toString() + "\r\n").getBytes("UTF-8")); outputStream.flush(); } @@ -526,6 +753,7 @@ public class Connection extends AbstractExecutionThreadService implements Servic public void close() throws IOException { Closeables.close(outputStream, true); Closeables.close(inputStreamReader, true); + Closeables.close(inputStream, true); } }