private Socket remoteSocket;
private InputStream remoteInputStream;
private OutputStream remoteOutputStream;
+ private FcpConnectionHandler connectionHandler;
private boolean connected;
public FcpConnection(String host, String clientName) throws UnknownHostException {
// LISTENER MANAGEMENT
//
+ /**
+ * Adds the given listener to the list of listeners.
+ *
+ * @param fcpListener
+ * The listener to add
+ */
public void addFcpListener(FcpListener fcpListener) {
fcpListeners.add(fcpListener);
}
+ /**
+ * Removes the given listener from the list of listeners.
+ *
+ * @param fcpListener
+ * The listener to remove
+ */
public void removeFcpListener(FcpListener fcpListener) {
fcpListeners.remove(fcpListener);
}
- private void fireNodeHello(Map<String, String> nodeProperties) {
+ /**
+ * Notifies all registered listeners that a message has been received.
+ *
+ * @see FcpListener#receivedMessage(FcpConnection, FcpMessage)
+ * @param fcpMessage
+ * The message that was received
+ */
+ private void fireMessageReceived(FcpMessage fcpMessage) {
for (FcpListener fcpListener: fcpListeners) {
- fcpListener.fcpNodeHello(this, nodeProperties);
+ fcpListener.receivedMessage(this, fcpMessage);
}
}
// ACTIONS
//
- public synchronized void connect() throws FcpException, IOException {
- System.out.println("connecting...");
+ /**
+ * Connects to the node.
+ *
+ * @throws IOException
+ * if an I/O error occurs
+ * @throws IllegalStateException
+ * if there is already a connection to the node
+ */
+ public synchronized void connect() throws IOException, IllegalStateException {
+ if (connectionHandler != null) {
+ throw new IllegalStateException("already connected, disconnect first");
+ }
remoteSocket = new Socket(address, port);
remoteInputStream = remoteSocket.getInputStream();
remoteOutputStream = remoteSocket.getOutputStream();
connected = true;
- System.out.println("connected.");
- new Thread(new FcpConnectionHandler(this, remoteInputStream)).start();
- sendMessage(clientHelloMessage);
+ new Thread(connectionHandler = new FcpConnectionHandler(this, remoteInputStream)).start();
}
+ /**
+ * Disconnects from the node. If there is no connection to the node, this
+ * method does nothing.
+ */
public synchronized void disconnect() {
+ if (connectionHandler == null) {
+ return;
+ }
connected = false;
Closer.close(remoteSocket);
+ connectionHandler.stop();
+ connectionHandler = null;
+ }
+
+ public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException {
+ System.out.println("sending message: " + fcpMessage.getName());
+ fcpMessage.write(remoteOutputStream);
}
/**
* @return The properties of the peer, or <code>null</code> if the peer is
* unknown
* @throws IOException
- * @throws FcpException
+ * @throws FcpException
*/
public Map<String, String> sendListPeer(String nodeIdentifier) throws IOException, FcpException {
FcpMessage listPeerMessage = new FcpMessage("ListPeer");
return null;
}
+ void handleMessage(FcpMessage fcpMessage) {
+ fireMessageReceived(fcpMessage);
+ }
+
public List<Map<String, String>> sendListPeers(boolean withMetadata, boolean withVolatile) throws IOException, FcpException {
FcpMessage listPeersMessage = new FcpMessage("ListPeers");
listPeersMessage.setField("WithMetadata", String.valueOf(withMetadata));
}
return peerNotes;
}
-
+
public void sendTestDDARequest(String directory, boolean wantReadDirectory, boolean wantWriteDirectory) throws IOException, FcpException {
FcpMessage testDDARequestMessage = new FcpMessage("TestDDARequest");
testDDARequestMessage.setField("Directory", directory);
testDDARequestMessage.setField("WantWriteDirectory", String.valueOf(wantWriteDirectory));
sendMessage(testDDARequestMessage);
}
-
+
public FcpKeyPair generateSSK() throws IOException, FcpException {
FcpMessage generateSSKMessage = new FcpMessage("GenerateSSK");
String identifier = hashCode() + String.valueOf(System.currentTimeMillis());
String privateKey = returnMessage.getField("InsertURI");
return new FcpKeyPair(publicKey, privateKey);
}
-
- //
- // PACKAGE-PRIVATE METHODS
- //
-
- void handleMessage(FcpMessage fcpMessage) {
- synchronized (messageWaitSync) {
- while (receivedMessage != null) {
- /* previous message has not yet been consumed */
- System.out.println("waiting for message to be consumed...");
- try {
- messageWaitSync.wait();
- } catch (InterruptedException ie1) {
- }
- }
- /* TODO - check whether to send events here or later. */
- if ("NodeHello".equals(fcpMessage.getName())) {
- fireNodeHello(fcpMessage.getFields());
- }
- System.out.println("setting receivedMessage");
- receivedMessage = fcpMessage;
- messageWaitSync.notifyAll();
- }
- }
//
// PRIVATE METHODS
//
- public synchronized void sendMessage(FcpMessage fcpMessage) throws IOException {
- System.out.println("sending message: " + fcpMessage.getName());
- fcpMessage.write(remoteOutputStream);
- }
-
public FcpMessage waitForMessage(String... messageNames) throws FcpException {
FcpMessage oldMessage = null;
synchronized (messageWaitSync) {
import java.nio.charset.Charset;
/**
- * TODO
+ * Handles an FCP connection to a node.
*
* @author David ‘Bombe’ Roden <bombe@freenetproject.org>
* @version $Id$
*/
public class FcpConnectionHandler implements Runnable {
+ /** The underlying connection. */
private final FcpConnection fcpConnection;
+
+ /** The input stream from the node. */
private final InputStream remoteInputStream;
-
+
+ /** Whether to stop the connection handler. */
+ private boolean shouldStop;
+
+ /** Whether the next read line feed should be ignored. */
private boolean ignoreNextLinefeed;
+ /**
+ * Creates a new connection handler that operates on the given connection
+ * and input stream.
+ *
+ * @param fcpConnection
+ * The underlying FCP connection
+ * @param remoteInputStream
+ * The input stream from the node
+ */
public FcpConnectionHandler(FcpConnection fcpConnection, InputStream remoteInputStream) {
this.fcpConnection = fcpConnection;
this.remoteInputStream = remoteInputStream;
}
+ /**
+ * {@inheritDoc}
+ */
public void run() {
FcpMessage fcpMessage = null;
while (true) {
+ synchronized (this) {
+ if (shouldStop) {
+ break;
+ }
+ }
try {
String line = readLine();
System.out.println("read line: " + line);
String value = line.substring(equalSign + 1);
fcpMessage.setField(field, value);
} catch (IOException e) {
+ break;
}
}
}
/**
+ * Stops the connection handler.
+ */
+ public void stop() {
+ synchronized (this) {
+ shouldStop = true;
+ }
+ }
+
+ //
+ // PRIVATE METHODS
+ //
+
+ /**
* Reads bytes from {@link #remoteInputStream} until ‘\r’ or ‘\n’ are
* encountered and decodes the read bytes using UTF-8.
*
package net.pterodactylus.util.fcp.client;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import net.pterodactylus.util.fcp.FcpConnection;
import net.pterodactylus.util.fcp.FcpException;
+import net.pterodactylus.util.fcp.FcpListener;
import net.pterodactylus.util.fcp.FcpMessage;
/**
* @author David Roden <droden@gmail.com>
* @version $Id$
*/
-public class FcpHighLevelClient {
+public class FcpHighLevelClient implements FcpListener {
/** The FCP connection of this client. */
private final FcpConnection fcpConnection;
+ /** Storage for incoming messages. */
+ private final List<FcpMessage> incomingMessages = Collections.synchronizedList(new ArrayList<FcpMessage>());
+
+ /** Registered handlers for incoming messages. */
+ private final List<Callback<?>> incomingMessageCallbacks = Collections.synchronizedList(new ArrayList<Callback<?>>());
+
/**
* Creates a new high-level client that operates on the given FCP
* connection.
*/
public FcpHighLevelClient(FcpConnection fcpConnection) {
this.fcpConnection = fcpConnection;
+ fcpConnection.addFcpListener(this);
}
-
- public FcpNodeInformation login(String clientName, NodeHelloCallback nodeHelloCallback) throws FcpException, IOException {
+
+ //
+ // ACTIONS
+ //
+
+ /**
+ * Stops the high-level client.
+ */
+ public void stop() {
+ fcpConnection.removeFcpListener(this);
+ }
+
+ public Map<String, String> login(String clientName, final NodeHelloCallback nodeHelloCallback) throws FcpException, IOException {
FcpMessage clientHelloMessage = new FcpMessage("ClientHello");
clientHelloMessage.setField("Name", clientName);
clientHelloMessage.setField("ExpectedVersion", "2.0");
- fcpConnection.sendMessage(clientHelloMessage);
- FcpMessage nodeHelloMessage = fcpConnection.waitForMessage("NodeHello", "CloseConnectionDuplicateClientName");
- if (nodeHelloMessage.getName().equals("CloseConnectionDuplicateClientName")) {
- throw new FcpException("duplicate client name");
+ Callback<Map<String, String>> callback = new Callback<Map<String, String>>() {
+ /**
+ * {@inheritDoc}
+ */
+ @SuppressWarnings("synthetic-access")
+ public CallbackContainer<Map<String, String>> consumeMessage(FcpMessage fcpMessage) throws FcpException {
+ String messageName = fcpMessage.getName();
+ if ("CloseConnectionDuplicateClientName".equals(messageName)) {
+ incomingMessages.remove(0);
+ if (nodeHelloCallback == null) {
+ throw new FcpException("duplicate client name");
+ }
+ nodeHelloCallback.closeConnectionDuplicateClientName();
+ }
+ if ("NodeHello".equals(messageName)) {
+ incomingMessages.remove(0);
+ return new CallbackContainer<Map<String, String>>(true, fcpMessage.getFields());
+ }
+ return new CallbackContainer<Map<String, String>>(false, null);
+ }
+ };
+ synchronized (incomingMessages) {
+ synchronized (incomingMessageCallbacks) {
+ fcpConnection.sendMessage(clientHelloMessage);
+ if (nodeHelloCallback != null) {
+ incomingMessageCallbacks.add(callback);
+ return null;
+ }
+ }
+ return processMessages(callback);
+ }
+ }
+
+ //
+ // PRIVATE METHODS
+ //
+
+ private <T> T processMessages(Callback<T> callback) throws FcpException {
+ while (true) {
+ FcpMessage fcpMessage;
+ synchronized (incomingMessages) {
+ while (incomingMessages.isEmpty()) {
+ try {
+ incomingMessages.wait();
+ } catch (InterruptedException ie1) {
+ }
+ }
+ fcpMessage = incomingMessages.get(0);
+ }
+ CallbackContainer<T> callbackContainer = callback.consumeMessage(fcpMessage);
+ if (callbackContainer.isConsumed()) {
+ incomingMessages.remove(0);
+ return callbackContainer.getContent();
+ }
+ }
+ }
+
+ //
+ // INTERFACE FcpListener
+ //
+
+ /**
+ * {@inheritDoc}
+ */
+ public void receivedMessage(FcpConnection fcpConnection, FcpMessage fcpMessage) {
+ if (this.fcpConnection != fcpConnection) {
+ return;
+ }
+ synchronized (incomingMessages) {
+ incomingMessages.add(fcpMessage);
+ incomingMessages.notifyAll();
+ }
+ synchronized (incomingMessageCallbacks) {
+ for (Callback<?> callback: incomingMessageCallbacks) {
+ try {
+ CallbackContainer<?> callbackContainer = callback.consumeMessage(fcpMessage);
+ } catch (FcpException e) {
+ }
+
+ }
}
}
+ public static interface Callback<T> {
+
+ /**
+ * TODO
+ *
+ * @param fcpMessage
+ * @return
+ */
+ public CallbackContainer<T> consumeMessage(FcpMessage fcpMessage) throws FcpException;
+
+ }
+
+ public static class CallbackContainer<T> {
+
+ private final boolean consumed;
+ private final T content;
+
+ public CallbackContainer(boolean consumed, T content) {
+ this.consumed = consumed;
+ this.content = content;
+ }
+
+ /**
+ * TODO
+ *
+ * @return
+ */
+ public boolean isConsumed() {
+ return consumed;
+ }
+
+ /**
+ * TODO
+ *
+ * @return
+ */
+ public T getContent() {
+ return content;
+ }
+
+ }
+
}
public interface NodeHelloCallback {
public void nodeHello(FcpNodeInformation fcpNodeInformation);
+ public void closeConnectionDuplicateClientName();
}