From: David ‘Bombe’ Roden Date: Fri, 28 Mar 2014 21:04:06 +0000 (+0100) Subject: Split the engine in smaller parts. X-Git-Tag: v2~236 X-Git-Url: https://git.pterodactylus.net/?a=commitdiff_plain;h=09c47ffd3d61d5d8d63e95d253318ec361433c76;p=rhynodge.git Split the engine in smaller parts. --- diff --git a/src/main/java/net/pterodactylus/rhynodge/engine/Engine.java b/src/main/java/net/pterodactylus/rhynodge/engine/Engine.java index 9234cfd..5c289c5 100644 --- a/src/main/java/net/pterodactylus/rhynodge/engine/Engine.java +++ b/src/main/java/net/pterodactylus/rhynodge/engine/Engine.java @@ -17,54 +17,40 @@ package net.pterodactylus.rhynodge.engine; -import static com.google.common.collect.Maps.newTreeMap; -import static java.lang.String.format; -import static java.util.Optional.empty; -import static java.util.Optional.of; +import static java.lang.System.currentTimeMillis; +import static java.util.concurrent.TimeUnit.MILLISECONDS; -import java.util.HashMap; import java.util.Map; -import java.util.Map.Entry; import java.util.Optional; -import java.util.SortedMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; -import net.pterodactylus.rhynodge.Filter; -import net.pterodactylus.rhynodge.Query; import net.pterodactylus.rhynodge.Reaction; -import net.pterodactylus.rhynodge.Trigger; -import net.pterodactylus.rhynodge.states.AbstractState; -import net.pterodactylus.rhynodge.states.FailedState; import net.pterodactylus.rhynodge.states.StateManager; -import com.google.common.util.concurrent.AbstractExecutionThreadService; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.log4j.Logger; - /** * Rhynodge main engine. * * @author David ‘Bombe’ Roden */ -public class Engine extends AbstractExecutionThreadService { - - /** The logger. */ - private static final Logger logger = Logger.getLogger(Engine.class); +public class Engine { - /** The state manager. */ private final StateManager stateManager; - - /** All defined reactions. */ - /* synchronize on itself. */ - private final Map reactions = new HashMap(); + private final ScheduledExecutorService executorService; + private final Map> scheduledFutures = new ConcurrentHashMap<>(); /** * Creates a new engine. * * @param stateManager - * The state manager + * The state manager */ public Engine(StateManager stateManager) { this.stateManager = stateManager; + executorService = new ScheduledThreadPoolExecutor(10); } // @@ -75,175 +61,31 @@ public class Engine extends AbstractExecutionThreadService { * Adds the given reaction to this engine. * * @param name - * The name of the reaction + * The name of the reaction * @param reaction - * The reaction to add to this engine + * The reaction to add to this engine */ public void addReaction(String name, Reaction reaction) { - synchronized (reactions) { - reactions.put(name, reaction); - reactions.notifyAll(); - } + ReactionState reactionState = new ReactionState(stateManager, name); + Optional lastState = reactionState.loadLastState(); + long lastExecutionTime = lastState.map(net.pterodactylus.rhynodge.State::time).orElse(0L); + long nextExecutionTime = lastExecutionTime + reaction.updateInterval(); + ReactionRunner reactionRunner = new ReactionRunner(reaction, reactionState); + ScheduledFuture future = executorService.scheduleWithFixedDelay(reactionRunner, nextExecutionTime - currentTimeMillis(), reaction.updateInterval(), MILLISECONDS); + scheduledFutures.put(name, future); } /** * Removes the reaction with the given name. * * @param name - * The name of the reaction to remove + * The name of the reaction to remove */ public void removeReaction(String name) { - synchronized (reactions) { - if (!reactions.containsKey(name)) { - return; - } - reactions.remove(name); - reactions.notifyAll(); - } - } - - // - // ABSTRACTSERVICE METHODS - // - - /** - * {@inheritDoc} - */ - @Override - public void run() { - while (isRunning()) { - Optional nextReaction = getNextReaction(); - if (!nextReaction.isPresent()) { - continue; - } - - String reactionName = nextReaction.get().getKey(); - logger.debug(format("Next Reaction: %s.", reactionName)); - - /* wait until the next reaction has to run. */ - long waitTime = nextReaction.get().getNextTime() - System.currentTimeMillis(); - logger.debug(format("Time to wait for next Reaction: %d millseconds.", waitTime)); - if (waitTime > 0) { - waitForNextReactionToStart(nextReaction, waitTime); - - /* re-start loop to check for new reactions. */ - continue; - } - - net.pterodactylus.rhynodge.State state = runReaction(nextReaction, reactionName); - logger.debug(format("State is %s.", state)); - - /* convert states. */ - for (Filter filter : nextReaction.get().getReaction().filters()) { - if (state.success()) { - state = filter.filter(state); - } - } - if (!state.success()) { - Optional lastState = stateManager.loadLastState(reactionName); - state.setFailCount(lastState.map(net.pterodactylus.rhynodge.State::failCount).orElse(0) + 1); - } - Optional lastSuccessfulState = stateManager.loadLastSuccessfulState(reactionName); - - /* merge states. */ - if (lastSuccessfulState.isPresent() && lastSuccessfulState.get().success() && state.success()) { - Trigger trigger = nextReaction.get().getReaction().trigger(); - net.pterodactylus.rhynodge.State newState = trigger.mergeStates(lastSuccessfulState.get(), state); - stateManager.saveState(reactionName, newState); - if (trigger.triggers()) { - logger.info("Executing Action..."); - nextReaction.get().getReaction().action().execute(trigger.output(nextReaction.get().getReaction())); - } - } else { - /* save first or error state. */ - stateManager.saveState(reactionName, state); - } - } - } - - private net.pterodactylus.rhynodge.State runReaction(Optional nextReaction, String reactionName) { - logger.info(format("Running Query for %s...", reactionName)); - Query query = nextReaction.get().getReaction().query(); - net.pterodactylus.rhynodge.State state; - try { - logger.debug("Querying system..."); - state = query.state(); - if (state == null) { - state = FailedState.INSTANCE; - } - logger.debug("System queried."); - } catch (Throwable t1) { - logger.warn("Querying system failed!", t1); - state = new AbstractState(t1) { - }; - } - return state; - } - - private void waitForNextReactionToStart(Optional nextReaction, long waitTime) { - synchronized (reactions) { - try { - logger.info(format("Waiting until %tc.", nextReaction.get().getNextTime())); - reactions.wait(waitTime); - } catch (InterruptedException ie1) { - /* we’re looping! */ - } - } - } - - private Optional getNextReaction() { - while (isRunning()) { - synchronized (reactions) { - if (reactions.isEmpty()) { - logger.debug("Sleeping while no Reactions available."); - try { - reactions.wait(); - } catch (InterruptedException ie1) { - /* ignore, we’re looping anyway. */ - } - continue; - } - } - - /* find next reaction. */ - SortedMap> nextReactions = newTreeMap(); - synchronized (reactions) { - for (Entry reactionEntry : reactions.entrySet()) { - Optional state = stateManager.loadLastState(reactionEntry.getKey()); - long stateTime = state.isPresent() ? state.get().time() : 0; - nextReactions.put(stateTime + reactionEntry.getValue().updateInterval(), Pair.of(reactionEntry.getKey(), reactionEntry.getValue())); - } - Pair keyReaction = nextReactions.get(nextReactions.firstKey()); - return of(new NextReaction(keyReaction.getKey(), keyReaction.getValue(), nextReactions.firstKey())); - } - } - return empty(); - } - - private static class NextReaction { - - private final String key; - private final Reaction reaction; - private final long nextTime; - - private NextReaction(String key, Reaction reaction, long nextTime) { - this.key = key; - this.reaction = reaction; - this.nextTime = nextTime; - } - - public String getKey() { - return key; + if (!scheduledFutures.containsKey(name)) { + return; } - - public Reaction getReaction() { - return reaction; - } - - public long getNextTime() { - return nextTime; - } - + scheduledFutures.remove(name).cancel(true); } } diff --git a/src/main/java/net/pterodactylus/rhynodge/engine/ReactionRunner.java b/src/main/java/net/pterodactylus/rhynodge/engine/ReactionRunner.java new file mode 100644 index 0000000..37bec10 --- /dev/null +++ b/src/main/java/net/pterodactylus/rhynodge/engine/ReactionRunner.java @@ -0,0 +1,94 @@ +package net.pterodactylus.rhynodge.engine; + +import static java.lang.String.format; +import static java.util.Optional.ofNullable; +import static net.pterodactylus.rhynodge.states.FailedState.INSTANCE; +import static org.apache.log4j.Logger.getLogger; + +import java.util.Optional; + +import net.pterodactylus.rhynodge.Action; +import net.pterodactylus.rhynodge.Filter; +import net.pterodactylus.rhynodge.Query; +import net.pterodactylus.rhynodge.Reaction; +import net.pterodactylus.rhynodge.State; +import net.pterodactylus.rhynodge.Trigger; +import net.pterodactylus.rhynodge.states.FailedState; + +import org.apache.log4j.Logger; + +/** + * Runs a {@link Reaction}, starting with its {@link Query}, running the {@link + * State} through its {@link Filter}s, and finally checking the {@link Trigger} + * for whether an {@link Action} needs to be executed. + * + * @author David ‘Bombe’ Roden + */ +public class ReactionRunner implements Runnable { + + private static final Logger logger = getLogger(ReactionRunner.class); + private final Reaction reaction; + private final ReactionState reactionState; + + public ReactionRunner(Reaction reaction, ReactionState reactionState) { + this.reactionState = reactionState; + this.reaction = reaction; + } + + @Override + public void run() { + State state = runQuery(); + state = runStateThroughFilters(state); + if (!state.success()) { + logger.info(format("Reaction %s failed.", reaction.name())); + saveStateWithIncreasedFailCount(state); + return; + } + Optional lastSuccessfulState = reactionState.loadLastSuccessfulState(); + if (!lastSuccessfulState.isPresent()) { + logger.info(format("No last state for %s.", reaction.name())); + reactionState.saveState(state); + return; + } + Trigger trigger = reaction.trigger(); + State newState = trigger.mergeStates(lastSuccessfulState.get(), state); + reactionState.saveState(newState); + if (trigger.triggers()) { + logger.info(format("Trigger was hit for %s, executing action...", reaction.name())); + reaction.action().execute(trigger.output(reaction)); + } + logger.info(format("Reaction %s finished.", reaction.name())); + } + + private void saveStateWithIncreasedFailCount(State state) { + Optional lastState = reactionState.loadLastState(); + state.setFailCount(lastState.map(State::failCount).orElse(0) + 1); + reactionState.saveState(state); + } + + private State runQuery() { + logger.info(format("Querying %s...", reaction.name())); + try { + return ofNullable(reaction.query().state()).orElse(INSTANCE); + } catch (Throwable t1) { + logger.warn(format("Could not query %s.", reaction.name()), t1); + return new FailedState(t1); + } + } + + private State runStateThroughFilters(State state) { + State currentState = state; + for (Filter filter : reaction.filters()) { + if (currentState.success()) { + logger.debug(format("Filtering state through %s...", filter.getClass().getSimpleName())); + try { + currentState = filter.filter(currentState); + } catch (Throwable t1) { + logger.warn(format("Error during filter %s for %s.", filter.getClass().getSimpleName(), reaction.name()), t1); + } + } + } + return currentState; + } + +} diff --git a/src/main/java/net/pterodactylus/rhynodge/engine/ReactionState.java b/src/main/java/net/pterodactylus/rhynodge/engine/ReactionState.java new file mode 100644 index 0000000..a97143e --- /dev/null +++ b/src/main/java/net/pterodactylus/rhynodge/engine/ReactionState.java @@ -0,0 +1,37 @@ +package net.pterodactylus.rhynodge.engine; + +import java.util.Optional; + +import net.pterodactylus.rhynodge.Reaction; +import net.pterodactylus.rhynodge.State; +import net.pterodactylus.rhynodge.states.StateManager; + +/** + * Allows simple access to a {@link Reaction}’s saved states without exposing + * the key used to access the state on disk. + * + * @author David ‘Bombe’ Roden + */ +public class ReactionState { + + private final StateManager stateManager; + private final String reactionName; + + public ReactionState(StateManager stateManager, String reactionName) { + this.stateManager = stateManager; + this.reactionName = reactionName; + } + + public Optional loadLastState() { + return stateManager.loadLastState(reactionName); + } + + public Optional loadLastSuccessfulState() { + return stateManager.loadLastSuccessfulState(reactionName); + } + + public void saveState(State state) { + stateManager.saveState(reactionName, state); + } + +} diff --git a/src/main/java/net/pterodactylus/rhynodge/engine/Starter.java b/src/main/java/net/pterodactylus/rhynodge/engine/Starter.java index a5d79ac..4cfb857 100644 --- a/src/main/java/net/pterodactylus/rhynodge/engine/Starter.java +++ b/src/main/java/net/pterodactylus/rhynodge/engine/Starter.java @@ -50,9 +50,6 @@ public class Starter { /* start a watcher. */ ChainWatcher chainWatcher = new ChainWatcher(engine, parameters.getChainDirectory()); chainWatcher.start(); - - /* start the engine. */ - engine.start(); } /**