From: David Roden Date: Sun, 26 Jan 2025 14:41:30 +0000 (+0100) Subject: 🚸 Scan windows in separate thread X-Git-Url: https://git.pterodactylus.net/?a=commitdiff_plain;h=2ac26692207534d33d9cbec5bb549a20b515c152;p=msta.git 🚸 Scan windows in separate thread Also, don’t report new-window events for windows that were already open. Also, don’t send separate new-frame events anymore, instead just set is-frame attribute. --- diff --git a/server/src/main/java/de/qsheltier/msta/Server.java b/server/src/main/java/de/qsheltier/msta/Server.java index 626d976..20b981d 100644 --- a/server/src/main/java/de/qsheltier/msta/Server.java +++ b/server/src/main/java/de/qsheltier/msta/Server.java @@ -17,9 +17,11 @@ import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketAddress; -import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -79,6 +81,9 @@ public class Server implements Closeable { */ public void start() throws InterruptedException { var startLatch = new CountDownLatch(1); + Thread scannerThread = new Thread(this::scanForWindows, "MSTA Window Scanner"); + scannerThread.setDaemon(true); + scannerThread.start(); new Thread(() -> { startLatch.countDown(); while (!closed.get()) { @@ -91,7 +96,7 @@ public class Server implements Closeable { handleSocket(inputStream, outputStream); } catch (IOException e) { /* swallow. */ - } + } }, "MSTA Handler for " + socket.getRemoteSocketAddress()).start(); } catch (IOException e) { /* swallow exceptions. */ @@ -123,65 +128,55 @@ public class Server implements Closeable { return serverSocket.getLocalPort(); } + private void scanForWindows() { + while (!closed.get()) { + var currentlyOpenWindows = stream(Window.getWindows()).filter(Component::isVisible).toList(); + var newWindows = currentlyOpenWindows.stream().filter(window -> !windows.containsValue(window)).toList(); + newWindows.forEach(newWindow -> { + synchronized (outputWriters) { + outputWriters.forEach(writeLine -> writeLine.accept(createEvent("new-window", new Pair("id", newWindow.getName()), new Pair("is-frame", newWindow instanceof Frame)))); + } + windows.put(newWindow.getName(), newWindow); + }); + Thread.yield(); + } + } + private void handleSocket(InputStream inputStream, OutputStream outputStream) throws IOException { - var finished = new AtomicBoolean(false); - var lastOpenWindows = new ArrayList(); - var lastOpenFrames = new ArrayList(); - try (var inputReader = new BufferedReader(new InputStreamReader(inputStream)); - var outputWriter = new BufferedWriter(new OutputStreamWriter(outputStream, UTF_8))) { + try (var outputWriter = new BufferedWriter(new OutputStreamWriter(outputStream, UTF_8))) { Consumer writeLine = (String line) -> { - try { - outputWriter.write(line + "\r\n"); - outputWriter.flush(); - } catch (IOException e) { - /* ignore. */ - } - }; - writeLine.accept(createEvent("connected")); - new Thread(() -> { - while (!finished.get()) { - var allWindows = stream(Window.getWindows()).filter(Component::isVisible).toList(); - var openWindows = allWindows.stream() - .filter(window -> !(window instanceof Frame)) - .filter(window -> !lastOpenWindows.contains(window)) - .toList(); - openWindows.stream() - .map(window -> createEvent("new-window", new Pair("id", window.getName()))) - .forEach(writeLine); - - var openFrames = allWindows.stream() - .filter(Frame.class::isInstance) - .map(Frame.class::cast) - .filter(frame -> !lastOpenFrames.contains(frame)) - .toList(); - openFrames.stream() - .map(frame -> createEvent("new-frame", new Pair("id", frame.getName()))) - .forEach(writeLine); - - lastOpenWindows.addAll(openWindows); - lastOpenFrames.addAll(openFrames); + synchronized (outputWriter) { try { - Thread.sleep(100); - } catch (InterruptedException e) { - /* ignore. */ + outputWriter.write(line + "\r\n"); + outputWriter.flush(); + } catch (IOException e) { + /* ignore, not much we can do here. */ } } - }).start(); - String line; - while ((line = inputReader.readLine()) != null) { - var words = stream(line.split(" ")).toList(); - if (words.getFirst().equalsIgnoreCase("shutdown")) { - shutdownHook.run(); - break; + }; + try (var inputReader = new BufferedReader(new InputStreamReader(inputStream))) { + writeLine.accept(createEvent("connected")); + synchronized (outputWriters) { + outputWriters.add(writeLine); + } + String line; + while ((line = inputReader.readLine()) != null) { + var words = stream(line.split(" ")).toList(); + if (words.getFirst().equalsIgnoreCase("shutdown")) { + shutdownHook.run(); + break; + } + if (words.getFirst().equalsIgnoreCase("info") && (words.size() == 3) && words.get(1).equalsIgnoreCase("window")) { + var windowName = words.get(2); + stream(Window.getWindows()).filter(window -> window.getName().equals(windowName)) + .forEach(window -> writeLine.accept(createMessage(new Pair("info", "window"), new Pair("id", window.getName())))); + } } - if (words.getFirst().equalsIgnoreCase("info") && (words.size() == 3) && words.get(1).equalsIgnoreCase("window")) { - var windowName = words.get(2); - stream(Window.getWindows()).filter(window -> window.getName().equals(windowName)) - .forEach(window -> writeLine.accept(createMessage(new Pair("info", "window"), new Pair("id", window.getName())))); + } finally { + synchronized (outputWriters) { + outputWriters.remove(writeLine); } } - } finally { - finished.set(true); } } @@ -204,7 +199,9 @@ public class Server implements Closeable { private final ServerSocket serverSocket = new ServerSocket(); private final Runnable shutdownHook; private final AtomicBoolean closed = new AtomicBoolean(false); + private final Set> outputWriters = new HashSet<>(); + private final Map windows = new ConcurrentHashMap<>(); - private record Pair(String first, String second) {} + private record Pair(String first, Object second) {} } diff --git a/server/src/test/java/de/qsheltier/msta/ServerTest.java b/server/src/test/java/de/qsheltier/msta/ServerTest.java index e850046..2d84ef9 100644 --- a/server/src/test/java/de/qsheltier/msta/ServerTest.java +++ b/server/src/test/java/de/qsheltier/msta/ServerTest.java @@ -10,6 +10,9 @@ import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.net.ServerSocket; import java.net.Socket; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; @@ -18,6 +21,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import static com.spotify.hamcrest.jackson.JsonMatchers.isJsonStringMatching; +import static com.spotify.hamcrest.jackson.JsonMatchers.jsonBoolean; import static com.spotify.hamcrest.jackson.JsonMatchers.jsonMissing; import static com.spotify.hamcrest.jackson.JsonMatchers.jsonObject; import static com.spotify.hamcrest.jackson.JsonMatchers.jsonText; @@ -55,7 +59,7 @@ public class ServerTest { var shutdown = new AtomicBoolean(); try (var server = new Server(() -> shutdown.set(true))) { server.start(); - createConnection(server, verifyConnectedEvent((reader, writer) -> { + createConnection(server, verifyConnectedEvent((reader, ready, writer) -> { writer.accept("shutdown"); Thread.yield(); })); @@ -80,12 +84,12 @@ public class ServerTest { @Test @Timeout(value = 5, unit = TimeUnit.SECONDS, threadMode = SEPARATE_THREAD) - public void serverSendsEventWhenAWindowIsOpened() throws Throwable { + public void serverDoesNotSendEventsForExistingWindows() throws Throwable { var window = new Window(null); window.setVisible(true); try { - connectToServer(verifyConnectedEvent((reader, writer) -> { - assertThat(reader.get(), isJsonStringMatching(jsonObject().where("event", jsonText("new-window")))); + connectToServer(verifyConnectedEvent((reader, ready, writer) -> { + assertTimeout(ready, Duration.of(1, ChronoUnit.SECONDS)); })); } finally { window.setVisible(false); @@ -94,16 +98,30 @@ public class ServerTest { @Test @Timeout(value = 5, unit = TimeUnit.SECONDS, threadMode = SEPARATE_THREAD) + public void serverSendsEventWhenAWindowIsOpened() throws Throwable { + connectToServer(verifyConnectedEvent((reader, ready, writer) -> { + var window = new Window(null); + try { + window.setVisible(true); + assertThat(reader.get(), isJsonStringMatching(jsonObject().where("event", jsonText("new-window")).where("is-frame", jsonBoolean(false)))); + } finally { + window.setVisible(false); + } + })); + } + + @Test + @Timeout(value = 5, unit = TimeUnit.SECONDS, threadMode = SEPARATE_THREAD) public void serverSendsEventWhenAFrameIsOpened() throws Throwable { - var frame = new Frame("Frame Title"); - frame.setVisible(true); - try { - connectToServer(verifyConnectedEvent((reader, writer) -> { - assertThat(reader.get(), isJsonStringMatching(jsonObject().where("event", jsonText("new-frame")))); - })); - } finally { - frame.setVisible(false); - } + connectToServer(verifyConnectedEvent((reader, ready, writer) -> { + var frame = new Frame("Frame Title"); + frame.setVisible(true); + try { + assertThat(reader.get(), isJsonStringMatching(jsonObject().where("event", jsonText("new-window")).where("is-frame", jsonBoolean(true)))); + } finally { + frame.setVisible(false); + } + })); } @Test @@ -111,18 +129,63 @@ public class ServerTest { public void serverCanHandleMultipleConnections() throws Exception { try (var server = new Server()) { server.start(); - createConnection(server, verifyConnectedEvent(((reader, writer) -> {}))); - createConnection(server, verifyConnectedEvent(((reader, writer) -> {}))); + createConnection(server, verifyConnectedEvent(((reader, ready, writer) -> {}))); + createConnection(server, verifyConnectedEvent(((reader, ready, writer) -> {}))); + } + } + + @Test + @Timeout(value = 5, unit = TimeUnit.SECONDS, threadMode = SEPARATE_THREAD) + public void serverSendsNewWindowNotificationsToAllOpenConnections() throws Exception { + try (var server = new Server()) { + server.start(); + var startedThreads = new CountDownLatch(2); + var notifiedThreads = new CountDownLatch(2); + var windowOpened = new CountDownLatch(1); + new Thread(() -> { + try { + createConnection(server, verifyConnectedEvent(((reader, ready, writer) -> { + startedThreads.countDown(); + windowOpened.await(); + assertThat(reader.get(), isJsonStringMatching(jsonObject().where("event", jsonText("new-window")))); + notifiedThreads.countDown(); + }))); + } catch (Exception e) { + throw new RuntimeException(e); + } + }).start(); + new Thread(() -> { + try { + createConnection(server, verifyConnectedEvent(((reader, ready, writer) -> { + startedThreads.countDown(); + windowOpened.await(); + assertThat(reader.get(), isJsonStringMatching(jsonObject().where("event", jsonText("new-window")))); + notifiedThreads.countDown(); + }))); + } catch (Exception e) { + throw new RuntimeException(e); + } + }).start(); + new Thread(() -> { + try { + startedThreads.await(); + new Window(null).setVisible(true); + windowOpened.countDown(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }).start(); + assertThat(notifiedThreads.await(2, TimeUnit.SECONDS), equalTo(true)); } } @Test @Timeout(value = 5, unit = TimeUnit.SECONDS, threadMode = SEPARATE_THREAD) public void windowInfoContainsButtonInfo() throws Throwable { - var window = new Window(null); - window.setVisible(true); - try { - connectToServer(verifyConnectedEvent((reader, writer) -> { + connectToServer(verifyConnectedEvent((reader, ready, writer) -> { + var window = new Window(null); + window.setVisible(true); + try { var reply = objectMapper.readTree(reader.get()); assertThat(reply, jsonObject().where("event", jsonText("new-window")).where("id", not(jsonMissing()))); var windowName = reply.get("id").asText(); @@ -132,27 +195,35 @@ public class ServerTest { .where("info", jsonText("window")) .where("id", jsonText(windowName)) ); - })); - } finally { - window.setVisible(false); + } finally { + window.setVisible(false); + } + })); + } + + private static void assertTimeout(Supplier failure, Duration timeout) { + var now = System.currentTimeMillis(); + while (!failure.get() && (System.currentTimeMillis() < (now + timeout.toMillis()))) { + Thread.yield(); } + assertThat(failure.get(), equalTo(false)); } - private static ThrowingBiConsumer, Consumer> verifyConnectedEvent(ThrowingBiConsumer, Consumer> connectionConsumer) { - return (reader, writer) -> { + private static ThrowingTriConsumer, Supplier, Consumer> verifyConnectedEvent(ThrowingTriConsumer, Supplier, Consumer> connectionConsumer) { + return (reader, ready, writer) -> { assertThat(reader.get(), isJsonStringMatching(jsonObject().where("event", jsonText("connected")))); - connectionConsumer.accept(reader, writer); + connectionConsumer.accept(reader, ready, writer); }; } - private static void connectToServer(ThrowingBiConsumer, Consumer> connectionConsumer) throws Exception { + private static void connectToServer(ThrowingTriConsumer, Supplier, Consumer> connectionConsumer) throws Exception { try (var server = new Server()) { server.start(); createConnection(server, connectionConsumer); } } - private static void createConnection(Server server, ThrowingBiConsumer, Consumer> connectionConsumer) throws Exception { + private static void createConnection(Server server, ThrowingTriConsumer, Supplier, Consumer> connectionConsumer) throws Exception { try (var connection = new Socket("localhost", server.getPort()); var reader = new BufferedReader(new InputStreamReader(connection.getInputStream(), UTF_8)); var writer = new BufferedWriter(new OutputStreamWriter(connection.getOutputStream(), UTF_8))) { @@ -162,6 +233,12 @@ public class ServerTest { } catch (IOException e) { throw new RuntimeException(e); } + }, () -> { + try { + return reader.ready(); + } catch (IOException e) { + throw new RuntimeException(e); + } }, line -> { try { writer.write(line + "\r\n"); @@ -175,8 +252,8 @@ public class ServerTest { private final ObjectMapper objectMapper = new ObjectMapper(); - private interface ThrowingBiConsumer { - void accept(A a, B b) throws Exception; + private interface ThrowingTriConsumer { + void accept(A a, B b, C c) throws Exception; } }