Split the engine in smaller parts.
authorDavid ‘Bombe’ Roden <bombe@pterodactylus.net>
Fri, 28 Mar 2014 21:04:06 +0000 (22:04 +0100)
committerDavid ‘Bombe’ Roden <bombe@pterodactylus.net>
Fri, 28 Mar 2014 21:04:06 +0000 (22:04 +0100)
src/main/java/net/pterodactylus/rhynodge/engine/Engine.java
src/main/java/net/pterodactylus/rhynodge/engine/ReactionRunner.java [new file with mode: 0644]
src/main/java/net/pterodactylus/rhynodge/engine/ReactionState.java [new file with mode: 0644]
src/main/java/net/pterodactylus/rhynodge/engine/Starter.java

index 9234cfd..5c289c5 100644 (file)
 
 package net.pterodactylus.rhynodge.engine;
 
-import static com.google.common.collect.Maps.newTreeMap;
-import static java.lang.String.format;
-import static java.util.Optional.empty;
-import static java.util.Optional.of;
+import static java.lang.System.currentTimeMillis;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
-import java.util.HashMap;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Optional;
-import java.util.SortedMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
-import net.pterodactylus.rhynodge.Filter;
-import net.pterodactylus.rhynodge.Query;
 import net.pterodactylus.rhynodge.Reaction;
-import net.pterodactylus.rhynodge.Trigger;
-import net.pterodactylus.rhynodge.states.AbstractState;
-import net.pterodactylus.rhynodge.states.FailedState;
 import net.pterodactylus.rhynodge.states.StateManager;
 
-import com.google.common.util.concurrent.AbstractExecutionThreadService;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.log4j.Logger;
-
 /**
  * Rhynodge main engine.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
-public class Engine extends AbstractExecutionThreadService {
-
-       /** The logger. */
-       private static final Logger logger = Logger.getLogger(Engine.class);
+public class Engine {
 
-       /** The state manager. */
        private final StateManager stateManager;
-
-       /** All defined reactions. */
-       /* synchronize on itself. */
-       private final Map<String, Reaction> reactions = new HashMap<String, Reaction>();
+       private final ScheduledExecutorService executorService;
+       private final Map<String, Future<?>> scheduledFutures = new ConcurrentHashMap<>();
 
        /**
         * Creates a new engine.
         *
         * @param stateManager
-        *            The state manager
+        *              The state manager
         */
        public Engine(StateManager stateManager) {
                this.stateManager = stateManager;
+               executorService = new ScheduledThreadPoolExecutor(10);
        }
 
        //
@@ -75,175 +61,31 @@ public class Engine extends AbstractExecutionThreadService {
         * Adds the given reaction to this engine.
         *
         * @param name
-        *            The name of the reaction
+        *              The name of the reaction
         * @param reaction
-        *            The reaction to add to this engine
+        *              The reaction to add to this engine
         */
        public void addReaction(String name, Reaction reaction) {
-               synchronized (reactions) {
-                       reactions.put(name, reaction);
-                       reactions.notifyAll();
-               }
+               ReactionState reactionState = new ReactionState(stateManager, name);
+               Optional<net.pterodactylus.rhynodge.State> lastState = reactionState.loadLastState();
+               long lastExecutionTime = lastState.map(net.pterodactylus.rhynodge.State::time).orElse(0L);
+               long nextExecutionTime = lastExecutionTime + reaction.updateInterval();
+               ReactionRunner reactionRunner = new ReactionRunner(reaction, reactionState);
+               ScheduledFuture<?> future = executorService.scheduleWithFixedDelay(reactionRunner, nextExecutionTime - currentTimeMillis(), reaction.updateInterval(), MILLISECONDS);
+               scheduledFutures.put(name, future);
        }
 
        /**
         * Removes the reaction with the given name.
         *
         * @param name
-        *            The name of the reaction to remove
+        *              The name of the reaction to remove
         */
        public void removeReaction(String name) {
-               synchronized (reactions) {
-                       if (!reactions.containsKey(name)) {
-                               return;
-                       }
-                       reactions.remove(name);
-                       reactions.notifyAll();
-               }
-       }
-
-       //
-       // ABSTRACTSERVICE METHODS
-       //
-
-       /**
-        * {@inheritDoc}
-        */
-       @Override
-       public void run() {
-               while (isRunning()) {
-                       Optional<NextReaction> nextReaction = getNextReaction();
-                       if (!nextReaction.isPresent()) {
-                               continue;
-                       }
-
-                       String reactionName = nextReaction.get().getKey();
-                       logger.debug(format("Next Reaction: %s.", reactionName));
-
-                       /* wait until the next reaction has to run. */
-                       long waitTime = nextReaction.get().getNextTime() - System.currentTimeMillis();
-                       logger.debug(format("Time to wait for next Reaction: %d millseconds.", waitTime));
-                       if (waitTime > 0) {
-                               waitForNextReactionToStart(nextReaction, waitTime);
-
-                               /* re-start loop to check for new reactions. */
-                               continue;
-                       }
-
-                       net.pterodactylus.rhynodge.State state = runReaction(nextReaction, reactionName);
-                       logger.debug(format("State is %s.", state));
-
-                       /* convert states. */
-                       for (Filter filter : nextReaction.get().getReaction().filters()) {
-                               if (state.success()) {
-                                       state = filter.filter(state);
-                               }
-                       }
-                       if (!state.success()) {
-                               Optional<net.pterodactylus.rhynodge.State> lastState = stateManager.loadLastState(reactionName);
-                               state.setFailCount(lastState.map(net.pterodactylus.rhynodge.State::failCount).orElse(0) + 1);
-                       }
-                       Optional<net.pterodactylus.rhynodge.State> lastSuccessfulState = stateManager.loadLastSuccessfulState(reactionName);
-
-                       /* merge states. */
-                       if (lastSuccessfulState.isPresent() && lastSuccessfulState.get().success() && state.success()) {
-                               Trigger trigger = nextReaction.get().getReaction().trigger();
-                               net.pterodactylus.rhynodge.State newState = trigger.mergeStates(lastSuccessfulState.get(), state);
-                               stateManager.saveState(reactionName, newState);
-                               if (trigger.triggers()) {
-                                       logger.info("Executing Action...");
-                                       nextReaction.get().getReaction().action().execute(trigger.output(nextReaction.get().getReaction()));
-                               }
-                       } else {
-                               /* save first or error state. */
-                               stateManager.saveState(reactionName, state);
-                       }
-               }
-       }
-
-       private net.pterodactylus.rhynodge.State runReaction(Optional<NextReaction> nextReaction, String reactionName) {
-               logger.info(format("Running Query for %s...", reactionName));
-               Query query = nextReaction.get().getReaction().query();
-               net.pterodactylus.rhynodge.State state;
-               try {
-                       logger.debug("Querying system...");
-                       state = query.state();
-                       if (state == null) {
-                               state = FailedState.INSTANCE;
-                       }
-                       logger.debug("System queried.");
-               } catch (Throwable t1) {
-                       logger.warn("Querying system failed!", t1);
-                       state = new AbstractState(t1) {
-                       };
-               }
-               return state;
-       }
-
-       private void waitForNextReactionToStart(Optional<NextReaction> nextReaction, long waitTime) {
-               synchronized (reactions) {
-                       try {
-                               logger.info(format("Waiting until %tc.", nextReaction.get().getNextTime()));
-                               reactions.wait(waitTime);
-                       } catch (InterruptedException ie1) {
-                               /* we’re looping! */
-                       }
-               }
-       }
-
-       private Optional<NextReaction> getNextReaction() {
-               while (isRunning()) {
-                       synchronized (reactions) {
-                               if (reactions.isEmpty()) {
-                                       logger.debug("Sleeping while no Reactions available.");
-                                       try {
-                                               reactions.wait();
-                                       } catch (InterruptedException ie1) {
-                                               /* ignore, we’re looping anyway. */
-                                       }
-                                       continue;
-                               }
-                       }
-
-                       /* find next reaction. */
-                       SortedMap<Long, Pair<String, Reaction>> nextReactions = newTreeMap();
-                       synchronized (reactions) {
-                               for (Entry<String, Reaction> reactionEntry : reactions.entrySet()) {
-                                       Optional<net.pterodactylus.rhynodge.State> state = stateManager.loadLastState(reactionEntry.getKey());
-                                       long stateTime = state.isPresent() ? state.get().time() : 0;
-                                       nextReactions.put(stateTime + reactionEntry.getValue().updateInterval(), Pair.of(reactionEntry.getKey(), reactionEntry.getValue()));
-                               }
-                               Pair<String, Reaction> keyReaction = nextReactions.get(nextReactions.firstKey());
-                               return of(new NextReaction(keyReaction.getKey(), keyReaction.getValue(), nextReactions.firstKey()));
-                       }
-               }
-               return empty();
-       }
-
-       private static class NextReaction {
-
-               private final String key;
-               private final Reaction reaction;
-               private final long nextTime;
-
-               private NextReaction(String key, Reaction reaction, long nextTime) {
-                       this.key = key;
-                       this.reaction = reaction;
-                       this.nextTime = nextTime;
-               }
-
-               public String getKey() {
-                       return key;
+               if (!scheduledFutures.containsKey(name)) {
+                       return;
                }
-
-               public Reaction getReaction() {
-                       return reaction;
-               }
-
-               public long getNextTime() {
-                       return nextTime;
-               }
-
+               scheduledFutures.remove(name).cancel(true);
        }
 
 }
diff --git a/src/main/java/net/pterodactylus/rhynodge/engine/ReactionRunner.java b/src/main/java/net/pterodactylus/rhynodge/engine/ReactionRunner.java
new file mode 100644 (file)
index 0000000..37bec10
--- /dev/null
@@ -0,0 +1,94 @@
+package net.pterodactylus.rhynodge.engine;
+
+import static java.lang.String.format;
+import static java.util.Optional.ofNullable;
+import static net.pterodactylus.rhynodge.states.FailedState.INSTANCE;
+import static org.apache.log4j.Logger.getLogger;
+
+import java.util.Optional;
+
+import net.pterodactylus.rhynodge.Action;
+import net.pterodactylus.rhynodge.Filter;
+import net.pterodactylus.rhynodge.Query;
+import net.pterodactylus.rhynodge.Reaction;
+import net.pterodactylus.rhynodge.State;
+import net.pterodactylus.rhynodge.Trigger;
+import net.pterodactylus.rhynodge.states.FailedState;
+
+import org.apache.log4j.Logger;
+
+/**
+ * Runs a {@link Reaction}, starting with its {@link Query}, running the {@link
+ * State} through its {@link Filter}s, and finally checking the {@link Trigger}
+ * for whether an {@link Action} needs to be executed.
+ *
+ * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+ */
+public class ReactionRunner implements Runnable {
+
+       private static final Logger logger = getLogger(ReactionRunner.class);
+       private final Reaction reaction;
+       private final ReactionState reactionState;
+
+       public ReactionRunner(Reaction reaction, ReactionState reactionState) {
+               this.reactionState = reactionState;
+               this.reaction = reaction;
+       }
+
+       @Override
+       public void run() {
+               State state = runQuery();
+               state = runStateThroughFilters(state);
+               if (!state.success()) {
+                       logger.info(format("Reaction %s failed.", reaction.name()));
+                       saveStateWithIncreasedFailCount(state);
+                       return;
+               }
+               Optional<State> lastSuccessfulState = reactionState.loadLastSuccessfulState();
+               if (!lastSuccessfulState.isPresent()) {
+                       logger.info(format("No last state for %s.", reaction.name()));
+                       reactionState.saveState(state);
+                       return;
+               }
+               Trigger trigger = reaction.trigger();
+               State newState = trigger.mergeStates(lastSuccessfulState.get(), state);
+               reactionState.saveState(newState);
+               if (trigger.triggers()) {
+                       logger.info(format("Trigger was hit for %s, executing action...", reaction.name()));
+                       reaction.action().execute(trigger.output(reaction));
+               }
+               logger.info(format("Reaction %s finished.", reaction.name()));
+       }
+
+       private void saveStateWithIncreasedFailCount(State state) {
+               Optional<State> lastState = reactionState.loadLastState();
+               state.setFailCount(lastState.map(State::failCount).orElse(0) + 1);
+               reactionState.saveState(state);
+       }
+
+       private State runQuery() {
+               logger.info(format("Querying %s...", reaction.name()));
+               try {
+                       return ofNullable(reaction.query().state()).orElse(INSTANCE);
+               } catch (Throwable t1) {
+                       logger.warn(format("Could not query %s.", reaction.name()), t1);
+                       return new FailedState(t1);
+               }
+       }
+
+       private State runStateThroughFilters(State state) {
+               State currentState = state;
+               for (Filter filter : reaction.filters()) {
+                       if (currentState.success()) {
+                               logger.debug(format("Filtering state through %s...", filter.getClass().getSimpleName()));
+                               try {
+                                       currentState = filter.filter(currentState);
+                               } catch (Throwable t1) {
+                                       logger.warn(format("Error during filter %s for %s.", filter.getClass().getSimpleName(), reaction.name()), t1);
+                               }
+                       }
+               }
+               return currentState;
+       }
+
+}
diff --git a/src/main/java/net/pterodactylus/rhynodge/engine/ReactionState.java b/src/main/java/net/pterodactylus/rhynodge/engine/ReactionState.java
new file mode 100644 (file)
index 0000000..a97143e
--- /dev/null
@@ -0,0 +1,37 @@
+package net.pterodactylus.rhynodge.engine;
+
+import java.util.Optional;
+
+import net.pterodactylus.rhynodge.Reaction;
+import net.pterodactylus.rhynodge.State;
+import net.pterodactylus.rhynodge.states.StateManager;
+
+/**
+ * Allows simple access to a {@link Reaction}’s saved states without exposing
+ * the key used to access the state on disk.
+ *
+ * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+ */
+public class ReactionState {
+
+       private final StateManager stateManager;
+       private final String reactionName;
+
+       public ReactionState(StateManager stateManager, String reactionName) {
+               this.stateManager = stateManager;
+               this.reactionName = reactionName;
+       }
+
+       public Optional<State> loadLastState() {
+               return stateManager.loadLastState(reactionName);
+       }
+
+       public Optional<State> loadLastSuccessfulState() {
+               return stateManager.loadLastSuccessfulState(reactionName);
+       }
+
+       public void saveState(State state) {
+               stateManager.saveState(reactionName, state);
+       }
+
+}
index a5d79ac..4cfb857 100644 (file)
@@ -50,9 +50,6 @@ public class Starter {
                /* start a watcher. */
                ChainWatcher chainWatcher = new ChainWatcher(engine, parameters.getChainDirectory());
                chainWatcher.start();
-
-               /* start the engine. */
-               engine.start();
        }
 
        /**