X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Freactor%2Fengine%2FEngine.java;h=0fb3f18f8de24a796dfade813f351b5ecacad598;hb=a736da3d2284792908fbe4d0a860127f862be112;hp=123781f0d04af1bb7282c043f4e47dfe008d24d0;hpb=631eb5a4e09bea50955554b982c67fc9a0ff3abd;p=rhynodge.git diff --git a/src/main/java/net/pterodactylus/reactor/engine/Engine.java b/src/main/java/net/pterodactylus/reactor/engine/Engine.java index 123781f..0fb3f18 100644 --- a/src/main/java/net/pterodactylus/reactor/engine/Engine.java +++ b/src/main/java/net/pterodactylus/reactor/engine/Engine.java @@ -17,22 +17,21 @@ package net.pterodactylus.reactor.engine; +import java.util.HashMap; import java.util.Map; -import java.util.Set; import java.util.SortedMap; -import java.util.concurrent.TimeUnit; +import net.pterodactylus.reactor.Filter; import net.pterodactylus.reactor.Query; import net.pterodactylus.reactor.Reaction; import net.pterodactylus.reactor.Trigger; import net.pterodactylus.reactor.states.AbstractState; +import net.pterodactylus.reactor.states.FailedState; import org.apache.log4j.Logger; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.AbstractExecutionThreadService; -import com.google.common.util.concurrent.Uninterruptibles; /** * Reactor main engine. @@ -45,9 +44,11 @@ public class Engine extends AbstractExecutionThreadService { private static final Logger logger = Logger.getLogger(Engine.class); /** All defined reactions. */ - private final Set reactions = Sets.newHashSet(); + /* synchronize on itself. */ + private final Map reactions = new HashMap(); /** Reaction states. */ + /* synchronize on reactions. */ private final Map reactionExecutions = Maps.newHashMap(); // @@ -57,13 +58,24 @@ public class Engine extends AbstractExecutionThreadService { /** * Adds the given reaction to this engine. * + * @param name + * The name of the reaction * @param reaction * The reaction to add to this engine + * @throws IllegalStateException + * if the engine already contains a {@link Reaction} with the + * given name */ @SuppressWarnings("synthetic-access") - public void addReaction(Reaction reaction) { - reactions.add(reaction); - reactionExecutions.put(reaction, new ReactionExecution()); + public void addReaction(String name, Reaction reaction) { + synchronized (reactions) { + if (reactions.containsKey(name)) { + throw new IllegalStateException(String.format("Engine already contains a Reaction named “%s!”", name)); + } + reactions.put(name, reaction); + reactionExecutions.put(reaction, new ReactionExecution()); + reactions.notifyAll(); + } } // @@ -78,40 +90,46 @@ public class Engine extends AbstractExecutionThreadService { while (isRunning()) { /* delay if we have no reactions. */ - if (reactions.isEmpty()) { - logger.trace("Sleeping for 1 second while no Reactions available."); - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - continue; + synchronized (reactions) { + if (reactions.isEmpty()) { + logger.debug("Sleeping while no Reactions available."); + try { + reactions.wait(); + } catch (InterruptedException ie1) { + /* ignore, we’re looping anyway. */ + } + continue; + } } /* find next reaction. */ SortedMap nextReactions = Maps.newTreeMap(); - for (Reaction reaction : reactions) { - ReactionExecution reactionExecution = reactionExecutions.get(reaction); - nextReactions.put(reactionExecution.lastExecutionTime() + reaction.updateInterval(), reaction); + Reaction nextReaction; + ReactionExecution reactionExecution; + synchronized (reactions) { + for (Reaction reaction : reactions.values()) { + nextReactions.put(reactionExecutions.get(reaction).lastExecutionTime() + reaction.updateInterval(), reaction); + } + nextReaction = nextReactions.get(nextReactions.firstKey()); + reactionExecution = reactionExecutions.get(nextReaction); } - Reaction nextReaction = nextReactions.get(nextReactions.firstKey()); - ReactionExecution reactionExecution = reactionExecutions.get(nextReaction); logger.debug(String.format("Next Reaction: %s.", nextReaction)); /* wait until the next reaction has to run. */ - while (isRunning()) { - long waitTime = (reactionExecution.lastExecutionTime() + nextReaction.updateInterval()) - System.currentTimeMillis(); - logger.debug(String.format("Time to wait for next Reaction: %d millseconds.", waitTime)); - if (waitTime <= 0) { - break; - } - try { - logger.debug(String.format("Waiting for %d milliseconds.", waitTime)); - TimeUnit.MILLISECONDS.sleep(waitTime); - } catch (InterruptedException ie1) { - /* we’re looping! */ + long waitTime = (reactionExecution.lastExecutionTime() + nextReaction.updateInterval()) - System.currentTimeMillis(); + logger.debug(String.format("Time to wait for next Reaction: %d millseconds.", waitTime)); + if (waitTime > 0) { + synchronized (reactions) { + try { + logger.debug(String.format("Waiting for %d milliseconds.", waitTime)); + reactions.wait(waitTime); + } catch (InterruptedException ie1) { + /* we’re looping! */ + } } - } - /* are we still running? */ - if (!isRunning()) { - break; + /* re-start loop to check for new reactions. */ + continue; } /* run reaction. */ @@ -121,6 +139,9 @@ public class Engine extends AbstractExecutionThreadService { try { logger.debug("Querying system..."); state = query.state(); + if (state == null) { + state = FailedState.INSTANCE; + } logger.debug("System queried."); } catch (Throwable t1) { logger.warn("Querying system failed!", t1); @@ -129,12 +150,23 @@ public class Engine extends AbstractExecutionThreadService { }; } logger.debug(String.format("State is %s.", state)); - reactionExecution.addState(state); + + /* convert states. */ + for (Filter filter : nextReaction.filters()) { + if (state.success()) { + net.pterodactylus.reactor.State newState = filter.filter(state); + logger.debug(String.format("Old state is %s, new state is %s.", state, newState)); + state = newState; + } + } + if (state.success()) { + reactionExecution.addState(state); + } /* only run trigger if we have collected two states. */ + Trigger trigger = nextReaction.trigger(); boolean triggerHit = false; - if (reactionExecution.previousState() != null) { - Trigger trigger = nextReaction.trigger(); + if ((reactionExecution.previousState() != null) && state.success()) { logger.debug("Checking Trigger for changes..."); triggerHit = trigger.triggers(reactionExecution.currentState(), reactionExecution.previousState()); } @@ -143,8 +175,9 @@ public class Engine extends AbstractExecutionThreadService { logger.debug(String.format("Trigger was hit: %s.", triggerHit)); if (triggerHit) { logger.info("Executing Action..."); - nextReaction.action().execute(reactionExecution.currentState(), reactionExecution.previousState()); + nextReaction.action().execute(trigger.output()); } + } }