🚸 Scan windows in separate thread
authorDavid Roden <github-a8in@qsheltier.de>
Sun, 26 Jan 2025 14:41:30 +0000 (15:41 +0100)
committerDavid Roden <github-a8in@qsheltier.de>
Sun, 26 Jan 2025 14:41:30 +0000 (15:41 +0100)
Also, don’t report new-window events for windows that were already open.
Also, don’t send separate new-frame events anymore, instead just set
is-frame attribute.

server/src/main/java/de/qsheltier/msta/Server.java
server/src/test/java/de/qsheltier/msta/ServerTest.java

index 626d976..20b981d 100644 (file)
@@ -17,9 +17,11 @@ import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketAddress;
-import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
@@ -79,6 +81,9 @@ public class Server implements Closeable {
         */
        public void start() throws InterruptedException {
                var startLatch = new CountDownLatch(1);
+               Thread scannerThread = new Thread(this::scanForWindows, "MSTA Window Scanner");
+               scannerThread.setDaemon(true);
+               scannerThread.start();
                new Thread(() -> {
                        startLatch.countDown();
                        while (!closed.get()) {
@@ -91,7 +96,7 @@ public class Server implements Closeable {
                                                        handleSocket(inputStream, outputStream);
                                                } catch (IOException e) {
                                                        /* swallow. */
-                                       }
+                                               }
                                        }, "MSTA Handler for " + socket.getRemoteSocketAddress()).start();
                                } catch (IOException e) {
                                        /* swallow exceptions. */
@@ -123,65 +128,55 @@ public class Server implements Closeable {
                return serverSocket.getLocalPort();
        }
 
+       private void scanForWindows() {
+               while (!closed.get()) {
+                       var currentlyOpenWindows = stream(Window.getWindows()).filter(Component::isVisible).toList();
+                       var newWindows = currentlyOpenWindows.stream().filter(window -> !windows.containsValue(window)).toList();
+                       newWindows.forEach(newWindow -> {
+                               synchronized (outputWriters) {
+                                       outputWriters.forEach(writeLine -> writeLine.accept(createEvent("new-window", new Pair("id", newWindow.getName()), new Pair("is-frame", newWindow instanceof Frame))));
+                               }
+                               windows.put(newWindow.getName(), newWindow);
+                       });
+                       Thread.yield();
+               }
+       }
+
        private void handleSocket(InputStream inputStream, OutputStream outputStream) throws IOException {
-               var finished = new AtomicBoolean(false);
-               var lastOpenWindows = new ArrayList<Window>();
-               var lastOpenFrames = new ArrayList<Frame>();
-               try (var inputReader = new BufferedReader(new InputStreamReader(inputStream));
-                    var outputWriter = new BufferedWriter(new OutputStreamWriter(outputStream, UTF_8))) {
+               try (var outputWriter = new BufferedWriter(new OutputStreamWriter(outputStream, UTF_8))) {
                        Consumer<String> writeLine = (String line) -> {
-                               try {
-                                       outputWriter.write(line + "\r\n");
-                                       outputWriter.flush();
-                               } catch (IOException e) {
-                                       /* ignore. */
-                               }
-                       };
-                       writeLine.accept(createEvent("connected"));
-                       new Thread(() -> {
-                               while (!finished.get()) {
-                                       var allWindows = stream(Window.getWindows()).filter(Component::isVisible).toList();
-                                       var openWindows = allWindows.stream()
-                                                       .filter(window -> !(window instanceof Frame))
-                                                       .filter(window -> !lastOpenWindows.contains(window))
-                                                       .toList();
-                                       openWindows.stream()
-                                                       .map(window -> createEvent("new-window", new Pair("id", window.getName())))
-                                                       .forEach(writeLine);
-
-                                       var openFrames = allWindows.stream()
-                                                       .filter(Frame.class::isInstance)
-                                                       .map(Frame.class::cast)
-                                                       .filter(frame -> !lastOpenFrames.contains(frame))
-                                                       .toList();
-                                       openFrames.stream()
-                                                       .map(frame -> createEvent("new-frame", new Pair("id", frame.getName())))
-                                                       .forEach(writeLine);
-
-                                       lastOpenWindows.addAll(openWindows);
-                                       lastOpenFrames.addAll(openFrames);
+                               synchronized (outputWriter) {
                                        try {
-                                               Thread.sleep(100);
-                                       } catch (InterruptedException e) {
-                                               /* ignore. */
+                                               outputWriter.write(line + "\r\n");
+                                               outputWriter.flush();
+                                       } catch (IOException e) {
+                                               /* ignore, not much we can do here. */
                                        }
                                }
-                       }).start();
-                       String line;
-                       while ((line = inputReader.readLine()) != null) {
-                               var words = stream(line.split(" ")).toList();
-                               if (words.getFirst().equalsIgnoreCase("shutdown")) {
-                                       shutdownHook.run();
-                                       break;
+                       };
+                       try (var inputReader = new BufferedReader(new InputStreamReader(inputStream))) {
+                               writeLine.accept(createEvent("connected"));
+                               synchronized (outputWriters) {
+                                       outputWriters.add(writeLine);
+                               }
+                               String line;
+                               while ((line = inputReader.readLine()) != null) {
+                                       var words = stream(line.split(" ")).toList();
+                                       if (words.getFirst().equalsIgnoreCase("shutdown")) {
+                                               shutdownHook.run();
+                                               break;
+                                       }
+                                       if (words.getFirst().equalsIgnoreCase("info") && (words.size() == 3) && words.get(1).equalsIgnoreCase("window")) {
+                                               var windowName = words.get(2);
+                                               stream(Window.getWindows()).filter(window -> window.getName().equals(windowName))
+                                                               .forEach(window -> writeLine.accept(createMessage(new Pair("info", "window"), new Pair("id", window.getName()))));
+                                       }
                                }
-                               if (words.getFirst().equalsIgnoreCase("info") && (words.size() == 3) && words.get(1).equalsIgnoreCase("window")) {
-                                       var windowName = words.get(2);
-                                       stream(Window.getWindows()).filter(window -> window.getName().equals(windowName))
-                                                       .forEach(window -> writeLine.accept(createMessage(new Pair("info", "window"), new Pair("id", window.getName()))));
+                       } finally {
+                               synchronized (outputWriters) {
+                                       outputWriters.remove(writeLine);
                                }
                        }
-               } finally {
-                       finished.set(true);
                }
        }
 
@@ -204,7 +199,9 @@ public class Server implements Closeable {
        private final ServerSocket serverSocket = new ServerSocket();
        private final Runnable shutdownHook;
        private final AtomicBoolean closed = new AtomicBoolean(false);
+       private final Set<Consumer<String>> outputWriters = new HashSet<>();
+       private final Map<String, Window> windows = new ConcurrentHashMap<>();
 
-       private record Pair(String first, String second) {}
+       private record Pair(String first, Object second) {}
 
 }
index e850046..2d84ef9 100644 (file)
@@ -10,6 +10,9 @@ import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Consumer;
@@ -18,6 +21,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
 import static com.spotify.hamcrest.jackson.JsonMatchers.isJsonStringMatching;
+import static com.spotify.hamcrest.jackson.JsonMatchers.jsonBoolean;
 import static com.spotify.hamcrest.jackson.JsonMatchers.jsonMissing;
 import static com.spotify.hamcrest.jackson.JsonMatchers.jsonObject;
 import static com.spotify.hamcrest.jackson.JsonMatchers.jsonText;
@@ -55,7 +59,7 @@ public class ServerTest {
                var shutdown = new AtomicBoolean();
                try (var server = new Server(() -> shutdown.set(true))) {
                        server.start();
-                       createConnection(server, verifyConnectedEvent((reader, writer) -> {
+                       createConnection(server, verifyConnectedEvent((reader, ready, writer) -> {
                                writer.accept("shutdown");
                                Thread.yield();
                        }));
@@ -80,12 +84,12 @@ public class ServerTest {
 
        @Test
        @Timeout(value = 5, unit = TimeUnit.SECONDS, threadMode = SEPARATE_THREAD)
-       public void serverSendsEventWhenAWindowIsOpened() throws Throwable {
+       public void serverDoesNotSendEventsForExistingWindows() throws Throwable {
                var window = new Window(null);
                window.setVisible(true);
                try {
-                       connectToServer(verifyConnectedEvent((reader, writer) -> {
-                               assertThat(reader.get(), isJsonStringMatching(jsonObject().where("event", jsonText("new-window"))));
+                       connectToServer(verifyConnectedEvent((reader, ready, writer) -> {
+                               assertTimeout(ready, Duration.of(1, ChronoUnit.SECONDS));
                        }));
                } finally {
                        window.setVisible(false);
@@ -94,16 +98,30 @@ public class ServerTest {
 
        @Test
        @Timeout(value = 5, unit = TimeUnit.SECONDS, threadMode = SEPARATE_THREAD)
+       public void serverSendsEventWhenAWindowIsOpened() throws Throwable {
+               connectToServer(verifyConnectedEvent((reader, ready, writer) -> {
+                       var window = new Window(null);
+                       try {
+                               window.setVisible(true);
+                               assertThat(reader.get(), isJsonStringMatching(jsonObject().where("event", jsonText("new-window")).where("is-frame", jsonBoolean(false))));
+                       } finally {
+                               window.setVisible(false);
+                       }
+               }));
+       }
+
+       @Test
+       @Timeout(value = 5, unit = TimeUnit.SECONDS, threadMode = SEPARATE_THREAD)
        public void serverSendsEventWhenAFrameIsOpened() throws Throwable {
-               var frame = new Frame("Frame Title");
-               frame.setVisible(true);
-               try {
-                       connectToServer(verifyConnectedEvent((reader, writer) -> {
-                               assertThat(reader.get(), isJsonStringMatching(jsonObject().where("event", jsonText("new-frame"))));
-                       }));
-               } finally {
-                       frame.setVisible(false);
-               }
+               connectToServer(verifyConnectedEvent((reader, ready, writer) -> {
+                       var frame = new Frame("Frame Title");
+                       frame.setVisible(true);
+                       try {
+                               assertThat(reader.get(), isJsonStringMatching(jsonObject().where("event", jsonText("new-window")).where("is-frame", jsonBoolean(true))));
+                       } finally {
+                               frame.setVisible(false);
+                       }
+               }));
        }
 
        @Test
@@ -111,18 +129,63 @@ public class ServerTest {
        public void serverCanHandleMultipleConnections() throws Exception {
                try (var server = new Server()) {
                        server.start();
-                       createConnection(server, verifyConnectedEvent(((reader, writer) -> {})));
-                       createConnection(server, verifyConnectedEvent(((reader, writer) -> {})));
+                       createConnection(server, verifyConnectedEvent(((reader, ready, writer) -> {})));
+                       createConnection(server, verifyConnectedEvent(((reader, ready, writer) -> {})));
+               }
+       }
+
+       @Test
+       @Timeout(value = 5, unit = TimeUnit.SECONDS, threadMode = SEPARATE_THREAD)
+       public void serverSendsNewWindowNotificationsToAllOpenConnections() throws Exception {
+               try (var server = new Server()) {
+                       server.start();
+                       var startedThreads = new CountDownLatch(2);
+                       var notifiedThreads = new CountDownLatch(2);
+                       var windowOpened = new CountDownLatch(1);
+                       new Thread(() -> {
+                               try {
+                                       createConnection(server, verifyConnectedEvent(((reader, ready, writer) -> {
+                                               startedThreads.countDown();
+                                               windowOpened.await();
+                                               assertThat(reader.get(), isJsonStringMatching(jsonObject().where("event", jsonText("new-window"))));
+                                               notifiedThreads.countDown();
+                                       })));
+                               } catch (Exception e) {
+                                       throw new RuntimeException(e);
+                               }
+                       }).start();
+                       new Thread(() -> {
+                               try {
+                                       createConnection(server, verifyConnectedEvent(((reader, ready, writer) -> {
+                                               startedThreads.countDown();
+                                               windowOpened.await();
+                                               assertThat(reader.get(), isJsonStringMatching(jsonObject().where("event", jsonText("new-window"))));
+                                               notifiedThreads.countDown();
+                                       })));
+                               } catch (Exception e) {
+                                       throw new RuntimeException(e);
+                               }
+                       }).start();
+                       new Thread(() -> {
+                               try {
+                                       startedThreads.await();
+                                       new Window(null).setVisible(true);
+                                       windowOpened.countDown();
+                               } catch (InterruptedException e) {
+                                       throw new RuntimeException(e);
+                               }
+                       }).start();
+                       assertThat(notifiedThreads.await(2, TimeUnit.SECONDS), equalTo(true));
                }
        }
 
        @Test
        @Timeout(value = 5, unit = TimeUnit.SECONDS, threadMode = SEPARATE_THREAD)
        public void windowInfoContainsButtonInfo() throws Throwable {
-               var window = new Window(null);
-               window.setVisible(true);
-               try {
-                       connectToServer(verifyConnectedEvent((reader, writer) -> {
+               connectToServer(verifyConnectedEvent((reader, ready, writer) -> {
+                       var window = new Window(null);
+                       window.setVisible(true);
+                       try {
                                var reply = objectMapper.readTree(reader.get());
                                assertThat(reply, jsonObject().where("event", jsonText("new-window")).where("id", not(jsonMissing())));
                                var windowName = reply.get("id").asText();
@@ -132,27 +195,35 @@ public class ServerTest {
                                                .where("info", jsonText("window"))
                                                .where("id", jsonText(windowName))
                                );
-                       }));
-               } finally {
-                       window.setVisible(false);
+                       } finally {
+                               window.setVisible(false);
+                       }
+               }));
+       }
+
+       private static void assertTimeout(Supplier<Boolean> failure, Duration timeout) {
+               var now = System.currentTimeMillis();
+               while (!failure.get() && (System.currentTimeMillis() < (now + timeout.toMillis()))) {
+                       Thread.yield();
                }
+               assertThat(failure.get(), equalTo(false));
        }
 
-       private static ThrowingBiConsumer<Supplier<String>, Consumer<String>> verifyConnectedEvent(ThrowingBiConsumer<Supplier<String>, Consumer<String>> connectionConsumer) {
-               return (reader, writer) -> {
+       private static ThrowingTriConsumer<Supplier<String>, Supplier<Boolean>, Consumer<String>> verifyConnectedEvent(ThrowingTriConsumer<Supplier<String>, Supplier<Boolean>, Consumer<String>> connectionConsumer) {
+               return (reader, ready, writer) -> {
                        assertThat(reader.get(), isJsonStringMatching(jsonObject().where("event", jsonText("connected"))));
-                       connectionConsumer.accept(reader, writer);
+                       connectionConsumer.accept(reader, ready, writer);
                };
        }
 
-       private static void connectToServer(ThrowingBiConsumer<Supplier<String>, Consumer<String>> connectionConsumer) throws Exception {
+       private static void connectToServer(ThrowingTriConsumer<Supplier<String>, Supplier<Boolean>, Consumer<String>> connectionConsumer) throws Exception {
                try (var server = new Server()) {
                        server.start();
                        createConnection(server, connectionConsumer);
                }
        }
 
-       private static void createConnection(Server server, ThrowingBiConsumer<Supplier<String>, Consumer<String>> connectionConsumer) throws Exception {
+       private static void createConnection(Server server, ThrowingTriConsumer<Supplier<String>, Supplier<Boolean>, Consumer<String>> connectionConsumer) throws Exception {
                try (var connection = new Socket("localhost", server.getPort());
                     var reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), UTF_8));
                     var writer = new BufferedWriter(new OutputStreamWriter(connection.getOutputStream(), UTF_8))) {
@@ -162,6 +233,12 @@ public class ServerTest {
                                } catch (IOException e) {
                                        throw new RuntimeException(e);
                                }
+                       }, () -> {
+                               try {
+                                       return reader.ready();
+                               } catch (IOException e) {
+                                       throw new RuntimeException(e);
+                               }
                        }, line -> {
                                try {
                                        writer.write(line + "\r\n");
@@ -175,8 +252,8 @@ public class ServerTest {
 
        private final ObjectMapper objectMapper = new ObjectMapper();
 
-       private interface ThrowingBiConsumer<A, B> {
-               void accept(A a, B b) throws Exception;
+       private interface ThrowingTriConsumer<A, B, C> {
+               void accept(A a, B b, C c) throws Exception;
        }
 
 }