X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Firc%2FConnection.java;h=30344be9b2c831023ce74edf1a2b8e7e0446d0fd;hb=1aa7f843043b8723a02b4bbd88eb21ae8b7db881;hp=92deeefcead752faad5e381f27ac10bf21541983;hpb=dee8fa734215a443bd279556cb084dfba845175b;p=xudocci.git diff --git a/src/main/java/net/pterodactylus/irc/Connection.java b/src/main/java/net/pterodactylus/irc/Connection.java index 92deeef..30344be 100644 --- a/src/main/java/net/pterodactylus/irc/Connection.java +++ b/src/main/java/net/pterodactylus/irc/Connection.java @@ -18,6 +18,8 @@ package net.pterodactylus.irc; import static com.google.common.base.Preconditions.checkState; +import static java.util.Arrays.asList; +import static java.util.concurrent.TimeUnit.SECONDS; import java.io.BufferedReader; import java.io.Closeable; @@ -30,32 +32,44 @@ import java.io.UnsupportedEncodingException; import java.net.Socket; import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + import javax.net.SocketFactory; +import net.pterodactylus.irc.connection.ChannelNickHandler; +import net.pterodactylus.irc.connection.ChannelNotJoinedHandler; +import net.pterodactylus.irc.connection.ConnectionEstablishHandler; +import net.pterodactylus.irc.connection.CtcpHandler; +import net.pterodactylus.irc.connection.Handler; +import net.pterodactylus.irc.connection.MessageHandler; +import net.pterodactylus.irc.connection.MotdHandler; +import net.pterodactylus.irc.connection.PrefixHandler; +import net.pterodactylus.irc.connection.SimpleCommandHandler; import net.pterodactylus.irc.event.ChannelJoined; -import net.pterodactylus.irc.event.ChannelNicknames; -import net.pterodactylus.irc.event.ChannelNotJoined; -import net.pterodactylus.irc.event.ChannelNotJoined.Reason; +import net.pterodactylus.irc.event.ChannelLeft; import net.pterodactylus.irc.event.ChannelTopic; +import net.pterodactylus.irc.event.ClientQuit; +import net.pterodactylus.irc.event.ConnectionClosed; import net.pterodactylus.irc.event.ConnectionEstablished; import net.pterodactylus.irc.event.ConnectionFailed; -import net.pterodactylus.irc.event.MotdReceived; +import net.pterodactylus.irc.event.KickedFromChannel; +import net.pterodactylus.irc.event.NicknameChanged; import net.pterodactylus.irc.event.NicknameInUseReceived; import net.pterodactylus.irc.event.NoNicknameGivenReceived; +import net.pterodactylus.irc.event.ReplyReceived; import net.pterodactylus.irc.event.UnknownReplyReceived; import net.pterodactylus.irc.util.RandomNickname; +import net.pterodactylus.xdcc.util.io.BandwidthCountingInputStream; +import net.pterodactylus.xdcc.util.io.BandwidthCountingOutputStream; -import com.beust.jcommander.internal.Maps; -import com.beust.jcommander.internal.Sets; import com.google.common.base.Optional; import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; import com.google.common.io.Closeables; import com.google.common.util.concurrent.AbstractExecutionThreadService; import com.google.common.util.concurrent.Service; +import org.apache.log4j.Logger; /** * A connection to an IRC server. @@ -64,6 +78,9 @@ import com.google.common.util.concurrent.Service; */ public class Connection extends AbstractExecutionThreadService implements Service { + /* The logger. */ + private static final Logger logger = Logger.getLogger(Connection.class.getName()); + /** The event bus. */ private final EventBus eventBus; @@ -100,6 +117,9 @@ public class Connection extends AbstractExecutionThreadService implements Servic /** The connection handler. */ private ConnectionHandler connectionHandler; + /** Whether the connection has already been established. */ + private final AtomicBoolean established = new AtomicBoolean(); + /** * Creates a new connection. * @@ -124,6 +144,34 @@ public class Connection extends AbstractExecutionThreadService implements Servic // /** + * Returns the hostname of the remote end of the connection. + * + * @return The remote’s hostname + */ + public String hostname() { + return hostname; + } + + /** + * Returns the port number of the remote end of the connection. + * + * @return The remote’s port number + */ + public int port() { + return port; + } + + /** + * Returns whether this connection has already been established. + * + * @return {@code true} as long as this connection is established, {@code + * false} otherwise + */ + public boolean established() { + return established.get(); + } + + /** * Returns the nickname that is currently in use by this connection. The * nickname is only available once the connection has been {@link #start()}ed. * @@ -155,7 +203,7 @@ public class Connection extends AbstractExecutionThreadService implements Servic * * @param username * The username to use - * @return + * @return This connection */ public Connection username(String username) { this.username = Optional.fromNullable(username); @@ -191,47 +239,91 @@ public class Connection extends AbstractExecutionThreadService implements Servic // /** + * Returns the current rate of the connection’s incoming side. + * + * @return The current input rate (in bytes per second) + */ + public long getInputRate() { + return (connectionHandler != null) ? connectionHandler.getInputRate() : 0; + } + + /** + * Returns the current rate of the connection’s outgoing side. + * + * @return The current output rate (in bytes per second) + */ + public long getOutputRate() { + return (connectionHandler != null) ? connectionHandler.getOutputRate() : 0; + } + + /** + * Checks whether the given source is the client represented by this + * connection. + * + * @param source + * The source to check + * @return {@code true} if this connection represents the given source, {@code + * false} otherwise + */ + public boolean isSource(Source source) { + return source.nick().isPresent() && source.nick().get().equals(nickname); + } + + /** * Joins the given channel. * * @param channel * The channel to join - * @return {@code true} if the channel was joined, {@code false} otherwise + * @throws IOException + * if an I/O error occurs */ - public boolean joinChannel(final String channel) { - final SynchronousQueue result = new SynchronousQueue(); - Object eventHandler = new Object() { - - @Subscribe - public void channelJoined(ChannelJoined channelJoined) throws InterruptedException { - if (!channelJoined.channel().equalsIgnoreCase(channel)) { - return; - } - Optional nickname = channelJoined.client().nick(); - if (!nickname.isPresent() || (nickname.isPresent() && nickname().equalsIgnoreCase(nickname.get()))) { - eventBus.unregister(this); - result.put(true); - } - } + public void joinChannel(final String channel) throws IOException { + connectionHandler.sendCommand("JOIN", channel); + } - @Subscribe - public void channelNotJoined(ChannelNotJoined channelNotJoined) throws InterruptedException { - if (!channelNotJoined.channel().equalsIgnoreCase(channel)) { - return; - } - eventBus.unregister(this); - result.put(false); - } - }; - eventBus.register(eventHandler); - try { - connectionHandler.sendCommand("JOIN", channel); - return result.take(); - } catch (IOException ioe1) { - eventBus.unregister(eventHandler); - } catch (InterruptedException ie1) { - /* TODO - how to handle? */ + /** + * Sends a message to the given recipient, which may be a channel or another + * nickname. + * + * @param recipient + * The recipient of the message + * @param message + * The message + * @throws IOException + * if an I/O error occurs + */ + public void sendMessage(String recipient, String message) throws IOException { + connectionHandler.sendCommand("PRIVMSG", recipient, message); + } + + /** + * Sends a DCC RESUME request to the given recipient. + * + * @param recipient + * The recipient of the request + * @param filename + * The name of the file to resume + * @param port + * The port number from the original DCC SEND request + * @param position + * The position at which to resume the transfer + * @throws IOException + * if an I/O error occurs + */ + public void sendDccResume(String recipient, String filename, int port, long position) throws IOException { + connectionHandler.sendCommand("PRIVMSG", recipient, String.format("\u0001DCC RESUME %s %d %d\u0001", filename, port, position)); + } + + /** + * Closes this connection. + * + * @throws IOException + * if an I/O error occurs + */ + public void close() throws IOException { + if (connectionHandler != null) { + connectionHandler.close(); } - return false; } // @@ -250,6 +342,7 @@ public class Connection extends AbstractExecutionThreadService implements Servic /* connect to remote socket. */ try { Socket socket = socketFactory.createSocket(hostname, port); + socket.setSoTimeout((int) TimeUnit.MINUTES.toMillis(3)); connectionHandler = new ConnectionHandler(socket.getInputStream(), socket.getOutputStream()); /* register connection. */ @@ -265,71 +358,46 @@ public class Connection extends AbstractExecutionThreadService implements Servic return; } + eventBus.register(this); /* now read replies and react. */ try { /* some status variables. */ - int oldConnectionStatus = 0; - int connectionStatus = 0; boolean connected = true; - StringBuilder motd = new StringBuilder(); - Set nicks = Sets.newHashSet(); - /* server modes. */ - Map nickPrefixes = Maps.newHashMap(); - Set channelTypes = Sets.newHashSet(); + PrefixHandler prefixHandler = new PrefixHandler(); + List handlers = asList( + new MessageHandler(eventBus, this, prefixHandler), + new CtcpHandler(eventBus, this), + new ChannelNickHandler(eventBus, this, prefixHandler), + new SimpleCommandHandler() + .addCommand("431", (s, p) -> eventBus.post( + new NoNicknameGivenReceived(this))) + .addCommand("NICK", (s, p) -> eventBus.post( + new NicknameChanged(this, s.get(), + p.get(0)))), + new MotdHandler(eventBus, this), + new ChannelNotJoinedHandler(eventBus, this), + new ConnectionEstablishHandler(eventBus, this), + prefixHandler + ); while (connected) { Reply reply = connectionHandler.readReply(); - System.err.println("<< " + reply); + eventBus.post(new ReplyReceived(this, reply)); + logger.trace(String.format("<< %s", reply)); String command = reply.command(); List parameters = reply.parameters(); - /* replies 001-004 don’t hold information but they have to be sent on a successful connection. */ - if (command.equals("001")) { - connectionStatus |= 0x01; - } else if (command.equals("002")) { - connectionStatus |= 0x02; - } else if (command.equals("003")) { - connectionStatus |= 0x04; - } else if (command.equals("004")) { - connectionStatus |= 0x08; - - /* 005 originally was a bounce message, now used to transmit useful information about the server. */ - } else if (command.equals("005")) { - for (String parameter : parameters) { - if (parameter.startsWith("PREFIX=")) { - int openParen = parameter.indexOf('('); - int closeParen = parameter.indexOf(')'); - if ((openParen != -1) && (closeParen != -1)) { - for (int modeCharacterIndex = 1; modeCharacterIndex < (closeParen - openParen); ++modeCharacterIndex) { - char modeCharacter = parameter.charAt(openParen + modeCharacterIndex); - char modeSymbol = parameter.charAt(closeParen + modeCharacterIndex); - nickPrefixes.put(String.valueOf(modeSymbol), String.valueOf(modeCharacter)); - } - } - } else if (parameter.startsWith("CHANTYPES=")) { - for (int typeIndex = 10; typeIndex < parameter.length(); ++typeIndex) { - channelTypes.add(parameter.charAt(typeIndex)); - } - } + for (Handler handler : handlers) { + if (handler.willHandle(reply)) { + handler.handleReply(reply); + break; } - - /* 375, 372, and 376 handle the server’s MOTD. */ - } else if (command.equals("375")) { - /* MOTD starts. */ - motd.append(parameters.get(1)).append('\n'); - } else if (command.equals("372")) { - motd.append(parameters.get(1)).append('\n'); - } else if (command.equals("376")) { - motd.append(parameters.get(1)).append('\n'); - eventBus.post(new MotdReceived(this, motd.toString())); - motd.setLength(0); + } /* 43x replies are for nick change errors. */ - } else if (command.equals("431")) { - eventBus.post(new NoNicknameGivenReceived(this, reply)); - } else if (command.equals("433")) { - if (connectionStatus == 0) { + if (command.equals("433")) { + if (!established.get()) { nickname = nicknameChooser.getNickname(); connectionHandler.sendCommand("NICK", nickname); } else { @@ -343,47 +411,34 @@ public class Connection extends AbstractExecutionThreadService implements Servic /* no topic is set. */ } else if (command.equals("332")) { eventBus.post(new ChannelTopic(this, parameters.get(1), parameters.get(2))); - } else if (command.equals("353")) { - String channel = parameters.get(2); - for (String nickname : parameters.get(3).split(" ")) { - if (nickPrefixes.containsKey(nickname.substring(0, 1))) { - nicks.add(new Nickname(nickname.substring(1), nickname.substring(0, 1))); - } else { - nicks.add(new Nickname(nickname, "")); - } - } - } else if (command.equals("366")) { - eventBus.post(new ChannelNicknames(this, parameters.get(1), nicks)); - System.out.println("Found Nicknames: " + nicks); - nicks.clear(); - - /* common channel join errors. */ - } else if (command.equals("474")) { - eventBus.post(new ChannelNotJoined(this, parameters.get(1), Reason.banned)); - } else if (command.equals("473")) { - eventBus.post(new ChannelNotJoined(this, parameters.get(1), Reason.inviteOnly)); - } else if (command.equals("475")) { - eventBus.post(new ChannelNotJoined(this, parameters.get(1), Reason.badChannelKey)); + } else if (command.equalsIgnoreCase("PART")) { + eventBus.post(new ChannelLeft(this, parameters.get(0), reply.source().get(), getOptional(parameters, 1))); + } else if (command.equalsIgnoreCase("QUIT")) { + eventBus.post(new ClientQuit(this, reply.source().get(), parameters.get(0))); /* basic connection housekeeping. */ } else if (command.equalsIgnoreCase("PING")) { connectionHandler.sendCommand("PONG", getOptional(parameters, 0), getOptional(parameters, 1)); + } else if (command.equalsIgnoreCase("KICK")) { + eventBus.post(new KickedFromChannel(this, parameters.get(0), reply.source().get(), parameters.get(1), getOptional(parameters, 2))); + /* okay, everything else. */ } else { eventBus.post(new UnknownReplyReceived(this, reply)); } - - if ((connectionStatus == 0x0f) && (connectionStatus != oldConnectionStatus)) { - /* connection succeeded! */ - eventBus.post(new ConnectionEstablished(this)); - } - oldConnectionStatus = connectionStatus; } + eventBus.post(new ConnectionClosed(this)); } catch (IOException ioe1) { - ioe1.printStackTrace(); + logger.warn("I/O error", ioe1); + eventBus.post(new ConnectionClosed(this, ioe1)); + } catch (RuntimeException re1) { + logger.error("Runtime error", re1); + eventBus.post(new ConnectionClosed(this, re1)); } finally { - System.out.println("Closing Connection."); + established.set(false); + eventBus.unregister(this); + logger.info("Closing Connection."); try { Closeables.close(connectionHandler, true); } catch (IOException ioe1) { @@ -393,6 +448,13 @@ public class Connection extends AbstractExecutionThreadService implements Servic } + @Subscribe + public void connectionEstablished(ConnectionEstablished connectionEstablished) { + if (connectionEstablished.connection() == this) { + established.set(true); + } + } + // // PRIVATE METHODS // @@ -421,7 +483,10 @@ public class Connection extends AbstractExecutionThreadService implements Servic private class ConnectionHandler implements Closeable { /** The output stream of the connection. */ - private final OutputStream outputStream; + private final BandwidthCountingOutputStream outputStream; + + /** The input stream. */ + private final BandwidthCountingInputStream inputStream; /** The input stream of the connection. */ private final BufferedReader inputStreamReader; @@ -438,8 +503,9 @@ public class Connection extends AbstractExecutionThreadService implements Servic * if the encoding (currently “UTF-8”) is not valid */ private ConnectionHandler(InputStream inputStream, OutputStream outputStream) throws UnsupportedEncodingException { - this.outputStream = outputStream; - inputStreamReader = new BufferedReader(new InputStreamReader(inputStream, "UTF-8")); + this.outputStream = new BandwidthCountingOutputStream(outputStream, 5, SECONDS); + this.inputStream = new BandwidthCountingInputStream(inputStream, 5, SECONDS); + inputStreamReader = new BufferedReader(new InputStreamReader(this.inputStream, "UTF-8")); } // @@ -447,6 +513,24 @@ public class Connection extends AbstractExecutionThreadService implements Servic // /** + * Returns the current rate of the connection’s incoming side. + * + * @return The current input rate (in bytes per second) + */ + public long getInputRate() { + return inputStream.getCurrentRate(); + } + + /** + * Returns the current rate of the connection’s outgoing side. + * + * @return The current output rate (in bytes per second) + */ + public long getOutputRate() { + return outputStream.getCurrentRate(); + } + + /** * Sends a command with the given parameters, skipping all {@link * Optional#absent()} optionals. * @@ -464,7 +548,7 @@ public class Connection extends AbstractExecutionThreadService implements Servic setParameters.add(maybeSetParameter.get()); } } - sendCommand(command, setParameters.toArray(new String[0])); + sendCommand(command, setParameters.toArray(new String[setParameters.size()])); } /** @@ -497,7 +581,7 @@ public class Connection extends AbstractExecutionThreadService implements Servic commandBuilder.append(parameter); } - System.out.println(">> " + commandBuilder.toString()); + logger.trace(String.format(">> %s", commandBuilder)); outputStream.write((commandBuilder.toString() + "\r\n").getBytes("UTF-8")); outputStream.flush(); } @@ -528,6 +612,7 @@ public class Connection extends AbstractExecutionThreadService implements Servic public void close() throws IOException { Closeables.close(outputStream, true); Closeables.close(inputStreamReader, true); + Closeables.close(inputStream, true); } }