Hand down progress listener.
[jSite.git] / src / de / todesbaum / util / freenet / fcp2 / Connection.java
1 /*
2  * todesbaum-lib -
3  * Copyright (C) 2006 David Roden
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 2 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program; if not, write to the Free Software
17  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18  */
19
20 package de.todesbaum.util.freenet.fcp2;
21
22 import java.io.File;
23 import java.io.FileOutputStream;
24 import java.io.IOException;
25 import java.io.InputStream;
26 import java.io.OutputStream;
27 import java.io.OutputStreamWriter;
28 import java.io.Writer;
29 import java.net.Socket;
30 import java.nio.charset.Charset;
31 import java.util.ArrayList;
32 import java.util.List;
33
34 import de.todesbaum.util.io.Closer;
35 import de.todesbaum.util.io.LineInputStream;
36 import de.todesbaum.util.io.StreamCopier;
37 import de.todesbaum.util.io.StreamCopier.ProgressListener;
38 import de.todesbaum.util.io.TempFileInputStream;
39
40 /**
41  * A physical connection to a Freenet node.
42  *
43  * @author David Roden <droden@gmail.com>
44  * @version $Id$
45  */
46 public class Connection {
47
48         /** The listeners that receive events from this connection. */
49         private List<ConnectionListener> connectionListeners = new ArrayList<ConnectionListener>();
50
51         /** The node this connection is connected to. */
52         private final Node node;
53
54         /** The name of this connection. */
55         private final String name;
56
57         /** The network socket of this connection. */
58         private Socket nodeSocket;
59
60         /** The input stream that reads from the socket. */
61         private InputStream nodeInputStream;
62
63         /** The output stream that writes to the socket. */
64         private OutputStream nodeOutputStream;
65
66         /** The thread that reads from the socket. */
67         private NodeReader nodeReader;
68
69         /** A writer for the output stream. */
70         private Writer nodeWriter;
71
72         /** The NodeHello message sent by the node on connect. */
73         protected Message nodeHello;
74
75         /** The temp directory to use. */
76         private String tempDirectory;
77
78         /**
79          * Creates a new connection to the specified node with the specified name.
80          *
81          * @param node
82          *            The node to connect to
83          * @param name
84          *            The name of this connection
85          */
86         public Connection(Node node, String name) {
87                 this.node = node;
88                 this.name = name;
89         }
90
91         /**
92          * Adds a listener that gets notified on connection events.
93          *
94          * @param connectionListener
95          *            The listener to add
96          */
97         public void addConnectionListener(ConnectionListener connectionListener) {
98                 connectionListeners.add(connectionListener);
99         }
100
101         /**
102          * Removes a listener from the list of registered listeners. Only the first
103          * matching listener is removed.
104          *
105          * @param connectionListener
106          *            The listener to remove
107          * @see List#remove(java.lang.Object)
108          */
109         public void removeConnectionListener(ConnectionListener connectionListener) {
110                 connectionListeners.remove(connectionListener);
111         }
112
113         /**
114          * Notifies listeners about a received message.
115          *
116          * @param message
117          *            The received message
118          */
119         protected void fireMessageReceived(Message message) {
120                 for (ConnectionListener connectionListener : connectionListeners) {
121                         connectionListener.messageReceived(this, message);
122                 }
123         }
124
125         /**
126          * Notifies listeners about the loss of the connection.
127          */
128         protected void fireConnectionTerminated() {
129                 for (ConnectionListener connectionListener : connectionListeners) {
130                         connectionListener.connectionTerminated(this);
131                 }
132         }
133
134         /**
135          * Returns the name of the connection.
136          *
137          * @return The name of the connection
138          */
139         public String getName() {
140                 return name;
141         }
142
143         /**
144          * Sets the temp directory to use for creation of temporary files.
145          *
146          * @param tempDirectory
147          *            The temp directory to use, or {@code null} to use the default
148          *            temp directory
149          */
150         public void setTempDirectory(String tempDirectory) {
151                 this.tempDirectory = tempDirectory;
152         }
153
154         /**
155          * Connects to the node.
156          *
157          * @return <code>true</code> if the connection succeeded and the node
158          *         returned a NodeHello message
159          * @throws IOException
160          *             if an I/O error occurs
161          * @see #getNodeHello()
162          */
163         public synchronized boolean connect() throws IOException {
164                 nodeSocket = null;
165                 nodeInputStream = null;
166                 nodeOutputStream = null;
167                 nodeWriter = null;
168                 nodeReader = null;
169                 try {
170                         nodeSocket = new Socket(node.getHostname(), node.getPort());
171                         nodeSocket.setReceiveBufferSize(65535);
172                         nodeInputStream = nodeSocket.getInputStream();
173                         nodeOutputStream = nodeSocket.getOutputStream();
174                         nodeWriter = new OutputStreamWriter(nodeOutputStream, Charset.forName("UTF-8"));
175                         nodeReader = new NodeReader(nodeInputStream);
176                         Thread nodeReaderThread = new Thread(nodeReader);
177                         nodeReaderThread.setDaemon(true);
178                         nodeReaderThread.start();
179                         ClientHello clientHello = new ClientHello();
180                         clientHello.setName(name);
181                         clientHello.setExpectedVersion("2.0");
182                         execute(clientHello);
183                         synchronized (this) {
184                                 try {
185                                         wait();
186                                 } catch (InterruptedException e) {
187                                 }
188                         }
189                         return nodeHello != null;
190                 } catch (IOException ioe1) {
191                         disconnect();
192                         throw ioe1;
193                 }
194         }
195
196         /**
197          * Returns whether this connection is still connected to the node.
198          *
199          * @return <code>true</code> if this connection is still valid,
200          *         <code>false</code> otherwise
201          */
202         public boolean isConnected() {
203                 return (nodeHello != null) && (nodeSocket != null) && (nodeSocket.isConnected());
204         }
205
206         /**
207          * Returns the NodeHello message the node sent on connection.
208          *
209          * @return The NodeHello message of the node
210          */
211         public Message getNodeHello() {
212                 return nodeHello;
213         }
214
215         /**
216          * Disconnects from the node.
217          */
218         public void disconnect() {
219                 if (nodeWriter != null) {
220                         try {
221                                 nodeWriter.close();
222                         } catch (IOException ioe1) {
223                         }
224                         nodeWriter = null;
225                 }
226                 if (nodeOutputStream != null) {
227                         try {
228                                 nodeOutputStream.close();
229                         } catch (IOException ioe1) {
230                         }
231                         nodeOutputStream = null;
232                 }
233                 if (nodeInputStream != null) {
234                         try {
235                                 nodeInputStream.close();
236                         } catch (IOException ioe1) {
237                         }
238                         nodeInputStream = null;
239                 }
240                 if (nodeSocket != null) {
241                         try {
242                                 nodeSocket.close();
243                         } catch (IOException ioe1) {
244                         }
245                         nodeSocket = null;
246                 }
247                 synchronized (this) {
248                         notify();
249                 }
250                 fireConnectionTerminated();
251         }
252
253         /**
254          * Executes the specified command.
255          *
256          * @param command
257          *            The command to execute
258          * @throws IllegalStateException
259          *             if the connection is not connected
260          * @throws IOException
261          *             if an I/O error occurs
262          */
263         public synchronized void execute(Command command) throws IllegalStateException, IOException {
264                 execute(command, null);
265         }
266
267         /**
268          * Executes the specified command.
269          *
270          * @param command
271          *            The command to execute
272          * @param progressListener
273          *            A progress listener for a payload transfer
274          * @throws IllegalStateException
275          *             if the connection is not connected
276          * @throws IOException
277          *             if an I/O error occurs
278          */
279         public synchronized void execute(Command command, ProgressListener progressListener) throws IllegalStateException, IOException {
280                 if (nodeSocket == null) {
281                         throw new IllegalStateException("connection is not connected");
282                 }
283                 nodeWriter.write(command.getCommandName() + Command.LINEFEED);
284                 command.write(nodeWriter);
285                 nodeWriter.write("EndMessage" + Command.LINEFEED);
286                 nodeWriter.flush();
287                 if (command.hasPayload()) {
288                         InputStream payloadInputStream = null;
289                         try {
290                                 payloadInputStream = command.getPayload();
291                                 StreamCopier.copy(payloadInputStream, nodeOutputStream, command.getPayloadLength(), progressListener);
292                         } finally {
293                                 Closer.close(payloadInputStream);
294                         }
295                         nodeOutputStream.flush();
296                 }
297         }
298
299         /**
300          * The reader thread for this connection. This is essentially a thread that
301          * reads lines from the node, creates messages from them and notifies
302          * listeners about the messages.
303          *
304          * @author David Roden &lt;droden@gmail.com&gt;
305          * @version $Id$
306          */
307         private class NodeReader implements Runnable {
308
309                 /** The input stream to read from. */
310                 @SuppressWarnings("hiding")
311                 private InputStream nodeInputStream;
312
313                 /**
314                  * Creates a new reader that reads from the specified input stream.
315                  *
316                  * @param nodeInputStream
317                  *            The input stream to read from
318                  */
319                 public NodeReader(InputStream nodeInputStream) {
320                         this.nodeInputStream = nodeInputStream;
321                 }
322
323                 /**
324                  * Main loop of the reader. Lines are read and converted into
325                  * {@link Message} objects.
326                  */
327                 public void run() {
328                         LineInputStream nodeReader = null;
329                         try {
330                                 nodeReader = new LineInputStream(nodeInputStream);
331                                 String line = "";
332                                 Message message = null;
333                                 while (line != null) {
334                                         line = nodeReader.readLine();
335                                         // System.err.println("> " + line);
336                                         if (line == null) {
337                                                 break;
338                                         }
339                                         if (message == null) {
340                                                 message = new Message(line);
341                                                 continue;
342                                         }
343                                         if ("Data".equals(line)) {
344                                                 /* need to read message from stream now */
345                                                 File tempFile = null;
346                                                 try {
347                                                         tempFile = File.createTempFile("fcpv2", "data", (tempDirectory != null) ? new File(tempDirectory) : null);
348                                                         tempFile.deleteOnExit();
349                                                         FileOutputStream tempFileOutputStream = new FileOutputStream(tempFile);
350                                                         long dataLength = Long.parseLong(message.get("DataLength"));
351                                                         StreamCopier.copy(nodeInputStream, tempFileOutputStream, dataLength);
352                                                         tempFileOutputStream.close();
353                                                         message.setPayloadInputStream(new TempFileInputStream(tempFile));
354                                                 } catch (IOException ioe1) {
355                                                         ioe1.printStackTrace();
356                                                 }
357                                         }
358                                         if ("Data".equals(line) || "EndMessage".equals(line)) {
359                                                 if (message.getName().equals("NodeHello")) {
360                                                         nodeHello = message;
361                                                         synchronized (Connection.this) {
362                                                                 Connection.this.notify();
363                                                         }
364                                                 } else {
365                                                         fireMessageReceived(message);
366                                                 }
367                                                 message = null;
368                                                 continue;
369                                         }
370                                         int equalsPosition = line.indexOf('=');
371                                         if (equalsPosition > -1) {
372                                                 String key = line.substring(0, equalsPosition).trim();
373                                                 String value = line.substring(equalsPosition + 1).trim();
374                                                 if (key.equals("Identifier")) {
375                                                         message.setIdentifier(value);
376                                                 } else {
377                                                         message.put(key, value);
378                                                 }
379                                                 continue;
380                                         }
381                                         /* skip lines consisting of whitespace only */
382                                         if (line.trim().length() == 0) {
383                                                 continue;
384                                         }
385                                         /* if we got here, some error occured! */
386                                         throw new IOException("Unexpected line: " + line);
387                                 }
388                         } catch (IOException ioe1) {
389                                 // ioe1.printStackTrace();
390                         } finally {
391                                 if (nodeReader != null) {
392                                         try {
393                                                 nodeReader.close();
394                                         } catch (IOException ioe1) {
395                                         }
396                                 }
397                                 if (nodeInputStream != null) {
398                                         try {
399                                                 nodeInputStream.close();
400                                         } catch (IOException ioe1) {
401                                         }
402                                 }
403                         }
404                         Connection.this.disconnect();
405                 }
406
407         }
408
409 }