X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Frhynodge%2Fengine%2FEngine.java;h=079350b274594d18e101838b0fcc28c3406dd6e3;hb=7a62280f42b7430279b8fd1b5818ddf0e7530b2a;hp=9e4d883706222a5fd7deeaf771a327e20c9b84b7;hpb=6ec36ef950c23c135bf0e112d932c5b7068189b8;p=rhynodge.git diff --git a/src/main/java/net/pterodactylus/rhynodge/engine/Engine.java b/src/main/java/net/pterodactylus/rhynodge/engine/Engine.java index 9e4d883..079350b 100644 --- a/src/main/java/net/pterodactylus/rhynodge/engine/Engine.java +++ b/src/main/java/net/pterodactylus/rhynodge/engine/Engine.java @@ -17,50 +17,42 @@ package net.pterodactylus.rhynodge.engine; -import java.util.HashMap; +import static java.lang.System.currentTimeMillis; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + import java.util.Map; -import java.util.Map.Entry; -import java.util.SortedMap; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import javax.inject.Inject; +import javax.inject.Singleton; -import net.pterodactylus.rhynodge.Filter; -import net.pterodactylus.rhynodge.Query; import net.pterodactylus.rhynodge.Reaction; -import net.pterodactylus.rhynodge.Trigger; -import net.pterodactylus.rhynodge.states.AbstractState; -import net.pterodactylus.rhynodge.states.FailedState; +import net.pterodactylus.rhynodge.actions.EmailAction; import net.pterodactylus.rhynodge.states.StateManager; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.log4j.Logger; - -import com.google.common.collect.Maps; -import com.google.common.util.concurrent.AbstractExecutionThreadService; - /** * Rhynodge main engine. * * @author David ‘Bombe’ Roden */ -public class Engine extends AbstractExecutionThreadService { +@Singleton +public class Engine { - /** The logger. */ - private static final Logger logger = Logger.getLogger(Engine.class); - - /** The state manager. */ private final StateManager stateManager; + private final ScheduledExecutorService executorService; + private final Map> scheduledFutures = new ConcurrentHashMap<>(); + private final EmailAction errorEmailAction; - /** All defined reactions. */ - /* synchronize on itself. */ - private final Map reactions = new HashMap(); - - /** - * Creates a new engine. - * - * @param stateManager - * The state manager - */ - public Engine(StateManager stateManager) { + @Inject + public Engine(StateManager stateManager, EmailAction errorEmailAction) { this.stateManager = stateManager; + this.errorEmailAction = errorEmailAction; + executorService = new ScheduledThreadPoolExecutor(10); } // @@ -71,141 +63,31 @@ public class Engine extends AbstractExecutionThreadService { * Adds the given reaction to this engine. * * @param name - * The name of the reaction + * The name of the reaction * @param reaction - * The reaction to add to this engine + * The reaction to add to this engine */ public void addReaction(String name, Reaction reaction) { - synchronized (reactions) { - reactions.put(name, reaction); - reactions.notifyAll(); - } + ReactionState reactionState = new ReactionState(stateManager, name); + Optional lastState = reactionState.loadLastState(); + long lastExecutionTime = lastState.map(net.pterodactylus.rhynodge.State::time).orElse(0L); + long nextExecutionTime = lastExecutionTime + reaction.updateInterval(); + ReactionRunner reactionRunner = new ReactionRunner(reaction, reactionState, errorEmailAction); + ScheduledFuture future = executorService.scheduleWithFixedDelay(reactionRunner, nextExecutionTime - currentTimeMillis(), reaction.updateInterval(), MILLISECONDS); + scheduledFutures.put(name, future); } /** * Removes the reaction with the given name. * * @param name - * The name of the reaction to remove + * The name of the reaction to remove */ public void removeReaction(String name) { - synchronized (reactions) { - if (!reactions.containsKey(name)) { - return; - } - reactions.remove(name); - reactions.notifyAll(); - } - } - - // - // ABSTRACTSERVICE METHODS - // - - /** - * {@inheritDoc} - */ - @Override - public void run() { - while (isRunning()) { - - /* delay if we have no reactions. */ - synchronized (reactions) { - if (reactions.isEmpty()) { - logger.debug("Sleeping while no Reactions available."); - try { - reactions.wait(); - } catch (InterruptedException ie1) { - /* ignore, we’re looping anyway. */ - } - continue; - } - } - - /* find next reaction. */ - SortedMap> nextReactions = Maps.newTreeMap(); - String reactionName; - Reaction nextReaction; - synchronized (reactions) { - for (Entry reactionEntry : reactions.entrySet()) { - net.pterodactylus.rhynodge.State state = stateManager.loadLastState(reactionEntry.getKey()); - long stateTime = (state != null) ? state.time() : 0; - nextReactions.put(stateTime + reactionEntry.getValue().updateInterval(), Pair.of(reactionEntry.getKey(), reactionEntry.getValue())); - } - reactionName = nextReactions.get(nextReactions.firstKey()).getLeft(); - nextReaction = nextReactions.get(nextReactions.firstKey()).getRight(); - } - logger.debug(String.format("Next Reaction: %s.", reactionName)); - - /* wait until the next reaction has to run. */ - net.pterodactylus.rhynodge.State lastState = stateManager.loadLastState(reactionName); - long lastStateTime = (lastState != null) ? lastState.time() : 0; - int lastStateFailCount = (lastState != null) ? lastState.failCount() : 0; - long waitTime = (lastStateTime + nextReaction.updateInterval()) - System.currentTimeMillis(); - logger.debug(String.format("Time to wait for next Reaction: %d millseconds.", waitTime)); - if (waitTime > 0) { - synchronized (reactions) { - try { - logger.info(String.format("Waiting until %tc.", lastStateTime + nextReaction.updateInterval())); - reactions.wait(waitTime); - } catch (InterruptedException ie1) { - /* we’re looping! */ - } - } - - /* re-start loop to check for new reactions. */ - continue; - } - - /* run reaction. */ - logger.info(String.format("Running Query for %s...", reactionName)); - Query query = nextReaction.query(); - net.pterodactylus.rhynodge.State state; - try { - logger.debug("Querying system..."); - state = query.state(); - if (state == null) { - state = FailedState.INSTANCE; - } - logger.debug("System queried."); - } catch (Throwable t1) { - logger.warn("Querying system failed!", t1); - state = new AbstractState(t1) { - /* no further state. */ - }; - } - logger.debug(String.format("State is %s.", state)); - - /* convert states. */ - for (Filter filter : nextReaction.filters()) { - if (state.success()) { - net.pterodactylus.rhynodge.State newState = filter.filter(state); - logger.debug(String.format("Old state is %s, new state is %s.", state, newState)); - state = newState; - } - } - if (!state.success()) { - state.setFailCount(lastStateFailCount + 1); - } - net.pterodactylus.rhynodge.State lastSuccessfulState = stateManager.loadLastSuccessfulState(reactionName); - stateManager.saveState(reactionName, state); - - /* only run trigger if we have collected two successful states. */ - Trigger trigger = nextReaction.trigger(); - boolean triggerHit = false; - if ((lastSuccessfulState != null) && lastSuccessfulState.success() && state.success()) { - logger.debug("Checking Trigger for changes..."); - triggerHit = trigger.triggers(state, lastSuccessfulState); - } - - /* run action if trigger was hit. */ - logger.debug(String.format("Trigger was hit: %s.", triggerHit)); - if (triggerHit) { - logger.info("Executing Action..."); - nextReaction.action().execute(trigger.output(nextReaction)); - } - + if (!scheduledFutures.containsKey(name)) { + return; } + scheduledFutures.remove(name).cancel(true); } }