version 0.4.6: improved handling of closed connections; fix bug in node-hostname...
[jSite.git] / src / de / todesbaum / util / freenet / fcp2 / Connection.java
1 /*
2  * todesbaum-lib - 
3  * Copyright (C) 2006 David Roden
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18  */
19
20 package de.todesbaum.util.freenet.fcp2;
21
22 import java.io.File;
23 import java.io.FileOutputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.OutputStream;
27 import java.io.OutputStreamWriter;
28 import java.io.Writer;
29 import java.net.Socket;
30 import java.nio.charset.Charset;
31 import java.util.ArrayList;
32 import java.util.List;
33
34 import de.todesbaum.util.io.LineInputStream;
35 import de.todesbaum.util.io.StreamCopier;
36 import de.todesbaum.util.io.TempFileInputStream;
37
38 /**
39  * A physical connection to a Freenet node.
40  * 
41  * @author David Roden <droden@gmail.com>
42  * @version $Id: Connection.java 413 2006-03-29 12:22:31Z bombe $
43  */
44 public class Connection {
45
46         /** The listeners that receive events from this connection. */
47         private List<ConnectionListener> connectionListeners = new ArrayList<ConnectionListener>();
48
49         /** The node this connection is connected to. */
50         private final Node node;
51
52         /** The name of this connection. */
53         private final String name;
54
55         /** The network socket of this connection. */
56         private Socket nodeSocket;
57
58         /** The input stream that reads from the socket. */
59         private InputStream nodeInputStream;
60
61         /** The output stream that writes to the socket. */
62         private OutputStream nodeOutputStream;
63
64         /** The thread that reads from the socket. */
65         private NodeReader nodeReader;
66
67         /** A writer for the output stream. */
68         private Writer nodeWriter;
69
70         /** The NodeHello message sent by the node on connect. */
71         protected Message nodeHello;
72
73         /**
74          * Creates a new connection to the specified node with the specified name.
75          * 
76          * @param node
77          *            The node to connect to
78          * @param name
79          *            The name of this connection
80          */
81         public Connection(Node node, String name) {
82                 this.node = node;
83                 this.name = name;
84         }
85
86         /**
87          * Adds a listener that gets notified on connection events.
88          * 
89          * @param connectionListener
90          *            The listener to add
91          */
92         public void addConnectionListener(ConnectionListener connectionListener) {
93                 connectionListeners.add(connectionListener);
94         }
95
96         /**
97          * Removes a listener from the list of registered listeners. Only the first
98          * matching listener is removed.
99          * 
100          * @param connectionListener
101          *            The listener to remove
102          * @see List#remove(java.lang.Object)
103          */
104         public void removeConnectionListener(ConnectionListener connectionListener) {
105                 connectionListeners.remove(connectionListener);
106         }
107
108         /**
109          * Notifies listeners about a received message.
110          * 
111          * @param message
112          *            The received message
113          */
114         protected void fireMessageReceived(Message message) {
115                 for (ConnectionListener connectionListener: connectionListeners) {
116                         connectionListener.messageReceived(this, message);
117                 }
118         }
119
120         /**
121          * Notifies listeners about the loss of the connection.
122          */
123         protected void fireConnectionTerminated() {
124                 for (ConnectionListener connectionListener: connectionListeners) {
125                         connectionListener.connectionTerminated(this);
126                 }
127         }
128
129         /**
130          * Returns the name of the connection.
131          * 
132          * @return The name of the connection
133          */
134         public String getName() {
135                 return name;
136         }
137
138         /**
139          * Connects to the node.
140          * 
141          * @return <code>true</code> if the connection succeeded and the node
142          *         returned a NodeHello message
143          * @throws IOException
144          *             if an I/O error occurs
145          * @see #getNodeHello()
146          */
147         public synchronized boolean connect() throws IOException {
148                 nodeSocket = null;
149                 nodeInputStream = null;
150                 nodeOutputStream = null;
151                 nodeWriter = null;
152                 nodeReader = null;
153                 try {
154                         nodeSocket = new Socket(node.getHostname(), node.getPort());
155                         nodeSocket.setReceiveBufferSize(65535);
156                         nodeInputStream = nodeSocket.getInputStream();
157                         nodeOutputStream = nodeSocket.getOutputStream();
158                         // nodeWriter = new TeeWriter(new
159                         // OutputStreamWriter(nodeOutputStream, Charset.forName("UTF-8")),
160                         // new PrintWriter(System.out));
161                         nodeWriter = new OutputStreamWriter(nodeOutputStream, Charset.forName("UTF-8"));
162                         nodeReader = new NodeReader(nodeInputStream);
163                         Thread nodeReaderThread = new Thread(nodeReader);
164                         nodeReaderThread.setDaemon(true);
165                         nodeReaderThread.start();
166                         ClientHello clientHello = new ClientHello();
167                         clientHello.setName(name);
168                         clientHello.setExpectedVersion("2.0");
169                         execute(clientHello);
170                         synchronized (this) {
171                                 try {
172                                         wait();
173                                 } catch (InterruptedException e) {
174                                 }
175                         }
176                         return nodeHello != null;
177                 } catch (IOException ioe1) {
178                         disconnect();
179                         throw ioe1;
180                 }
181         }
182
183         /**
184          * Returns whether this connection is still connected to the node.
185          * 
186          * @return <code>true</code> if this connection is still valid,
187          *         <code>false</code> otherwise
188          */
189         public boolean isConnected() {
190                 return (nodeHello != null) && (nodeSocket != null) && (nodeSocket.isConnected());
191         }
192
193         /**
194          * Returns the NodeHello message the node sent on connection.
195          * 
196          * @return The NodeHello message of the node
197          */
198         public Message getNodeHello() {
199                 return nodeHello;
200         }
201
202         /**
203          * Disconnects from the node.
204          */
205         public void disconnect() {
206                 if (nodeWriter != null) {
207                         try {
208                                 nodeWriter.close();
209                         } catch (IOException ioe1) {
210                         }
211                         nodeWriter = null;
212                 }
213                 if (nodeOutputStream != null) {
214                         try {
215                                 nodeOutputStream.close();
216                         } catch (IOException ioe1) {
217                         }
218                         nodeOutputStream = null;
219                 }
220                 if (nodeInputStream != null) {
221                         try {
222                                 nodeInputStream.close();
223                         } catch (IOException ioe1) {
224                         }
225                         nodeInputStream = null;
226                 }
227                 if (nodeSocket != null) {
228                         try {
229                                 nodeSocket.close();
230                         } catch (IOException ioe1) {
231                         }
232                         nodeSocket = null;
233                 }
234                 synchronized (this) {
235                         notify();
236                 }
237                 fireConnectionTerminated();
238         }
239
240         /**
241          * Executes the specified command.
242          * 
243          * @param command
244          *            The command to execute
245          * @throws IllegalStateException
246          *             if the connection is not connected
247          * @throws IOException
248          *             if an I/O error occurs
249          */
250         public synchronized void execute(Command command) throws IllegalStateException, IOException {
251                 if (nodeSocket == null) {
252                         throw new IllegalStateException("connection is not connected");
253                 }
254                 nodeWriter.write(command.getCommandName() + Command.LINEFEED);
255                 command.write(nodeWriter);
256                 nodeWriter.write("EndMessage" + Command.LINEFEED);
257                 nodeWriter.flush();
258                 if (command.hasPayload()) {
259                         StreamCopier.copy(command.getPayload(), nodeOutputStream, command.getPayloadLength());
260                         nodeOutputStream.flush();
261                 }
262         }
263
264         /**
265          * The reader thread for this connection. This is essentially a thread that
266          * reads lines from the node, creates messages from them and notifies
267          * listeners about the messages.
268          * 
269          * @author David Roden &lt;droden@gmail.com&gt;
270          * @version $Id: Connection.java 413 2006-03-29 12:22:31Z bombe $
271          */
272         private class NodeReader implements Runnable {
273
274                 /** The input stream to read from. */
275                 @SuppressWarnings("hiding")
276                 private InputStream nodeInputStream;
277
278                 /**
279                  * Creates a new reader that reads from the specified input stream.
280                  * 
281                  * @param nodeInputStream
282                  *            The input stream to read from
283                  */
284                 public NodeReader(InputStream nodeInputStream) {
285                         this.nodeInputStream = nodeInputStream;
286                 }
287
288                 /**
289                  * Main loop of the reader. Lines are read and converted into
290                  * {@link Message} objects.
291                  */
292                 public void run() {
293                         LineInputStream nodeReader = null;
294                         try {
295                                 nodeReader = new LineInputStream(nodeInputStream);
296                                 String line = "";
297                                 Message message = null;
298                                 while (line != null) {
299                                         line = nodeReader.readLine();
300                                         // System.err.println("> " + line);
301                                         if (line == null) {
302                                                 break;
303                                         }
304                                         if (message == null) {
305                                                 message = new Message(line);
306                                                 continue;
307                                         }
308                                         if ("Data".equals(line)) {
309                                                 /* need to read message from stream now */
310                                                 File tempFile = null;
311                                                 try {
312                                                         tempFile = File.createTempFile("fcpv2", "data");
313                                                         tempFile.deleteOnExit();
314                                                         FileOutputStream tempFileOutputStream = new FileOutputStream(tempFile);
315                                                         long dataLength = Long.parseLong(message.get("DataLength"));
316                                                         StreamCopier.copy(nodeInputStream, tempFileOutputStream, dataLength);
317                                                         tempFileOutputStream.close();
318                                                         message.setPayloadInputStream(new TempFileInputStream(tempFile));
319                                                 } catch (IOException ioe1) {
320                                                         ioe1.printStackTrace();
321                                                 }
322                                         }
323                                         if ("Data".equals(line) || "EndMessage".equals(line)) {
324                                                 if (message.getName().equals("NodeHello")) {
325                                                         nodeHello = message;
326                                                         synchronized (Connection.this) {
327                                                                 Connection.this.notify();
328                                                         }
329                                                 } else {
330                                                         fireMessageReceived(message);
331                                                 }
332                                                 message = null;
333                                                 continue;
334                                         }
335                                         int equalsPosition = line.indexOf('=');
336                                         if (equalsPosition > -1) {
337                                                 String key = line.substring(0, equalsPosition).trim();
338                                                 String value = line.substring(equalsPosition + 1).trim();
339                                                 if (key.equals("Identifier")) {
340                                                         message.setIdentifier(value);
341                                                 } else {
342                                                         message.put(key, value);
343                                                 }
344                                                 continue;
345                                         }
346                                         /* skip lines consisting of whitespace only */
347                                         if (line.trim().length() == 0) {
348                                                 continue;
349                                         }
350                                         /* if we got here, some error occured! */
351                                         throw new IOException("Unexpected line: " + line);
352                                 }
353                         } catch (IOException ioe1) {
354                                 // ioe1.printStackTrace();
355                         } finally {
356                                 if (nodeReader != null) {
357                                         try {
358                                                 nodeReader.close();
359                                         } catch (IOException ioe1) {
360                                         }
361                                 }
362                                 if (nodeInputStream != null) {
363                                         try {
364                                                 nodeInputStream.close();
365                                         } catch (IOException ioe1) {
366                                         }
367                                 }
368                         }
369                         Connection.this.disconnect();
370                 }
371
372         }
373
374 }