X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Freactor%2Fengine%2FEngine.java;h=0fb3f18f8de24a796dfade813f351b5ecacad598;hb=a736da3d2284792908fbe4d0a860127f862be112;hp=949ce127119f10438465f715c860dec790fbeb85;hpb=92ffd9cd79bb63c41cc81ec580ec331bec345456;p=rhynodge.git diff --git a/src/main/java/net/pterodactylus/reactor/engine/Engine.java b/src/main/java/net/pterodactylus/reactor/engine/Engine.java index 949ce12..0fb3f18 100644 --- a/src/main/java/net/pterodactylus/reactor/engine/Engine.java +++ b/src/main/java/net/pterodactylus/reactor/engine/Engine.java @@ -17,10 +17,9 @@ package net.pterodactylus.reactor.engine; +import java.util.HashMap; import java.util.Map; -import java.util.Set; import java.util.SortedMap; -import java.util.concurrent.TimeUnit; import net.pterodactylus.reactor.Filter; import net.pterodactylus.reactor.Query; @@ -32,9 +31,7 @@ import net.pterodactylus.reactor.states.FailedState; import org.apache.log4j.Logger; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.AbstractExecutionThreadService; -import com.google.common.util.concurrent.Uninterruptibles; /** * Reactor main engine. @@ -47,9 +44,11 @@ public class Engine extends AbstractExecutionThreadService { private static final Logger logger = Logger.getLogger(Engine.class); /** All defined reactions. */ - private final Set reactions = Sets.newHashSet(); + /* synchronize on itself. */ + private final Map reactions = new HashMap(); /** Reaction states. */ + /* synchronize on reactions. */ private final Map reactionExecutions = Maps.newHashMap(); // @@ -59,13 +58,24 @@ public class Engine extends AbstractExecutionThreadService { /** * Adds the given reaction to this engine. * + * @param name + * The name of the reaction * @param reaction * The reaction to add to this engine + * @throws IllegalStateException + * if the engine already contains a {@link Reaction} with the + * given name */ @SuppressWarnings("synthetic-access") - public void addReaction(Reaction reaction) { - reactions.add(reaction); - reactionExecutions.put(reaction, new ReactionExecution()); + public void addReaction(String name, Reaction reaction) { + synchronized (reactions) { + if (reactions.containsKey(name)) { + throw new IllegalStateException(String.format("Engine already contains a Reaction named “%s!”", name)); + } + reactions.put(name, reaction); + reactionExecutions.put(reaction, new ReactionExecution()); + reactions.notifyAll(); + } } // @@ -80,40 +90,46 @@ public class Engine extends AbstractExecutionThreadService { while (isRunning()) { /* delay if we have no reactions. */ - if (reactions.isEmpty()) { - logger.trace("Sleeping for 1 second while no Reactions available."); - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - continue; + synchronized (reactions) { + if (reactions.isEmpty()) { + logger.debug("Sleeping while no Reactions available."); + try { + reactions.wait(); + } catch (InterruptedException ie1) { + /* ignore, we’re looping anyway. */ + } + continue; + } } /* find next reaction. */ SortedMap nextReactions = Maps.newTreeMap(); - for (Reaction reaction : reactions) { - ReactionExecution reactionExecution = reactionExecutions.get(reaction); - nextReactions.put(reactionExecution.lastExecutionTime() + reaction.updateInterval(), reaction); + Reaction nextReaction; + ReactionExecution reactionExecution; + synchronized (reactions) { + for (Reaction reaction : reactions.values()) { + nextReactions.put(reactionExecutions.get(reaction).lastExecutionTime() + reaction.updateInterval(), reaction); + } + nextReaction = nextReactions.get(nextReactions.firstKey()); + reactionExecution = reactionExecutions.get(nextReaction); } - Reaction nextReaction = nextReactions.get(nextReactions.firstKey()); - ReactionExecution reactionExecution = reactionExecutions.get(nextReaction); logger.debug(String.format("Next Reaction: %s.", nextReaction)); /* wait until the next reaction has to run. */ - while (isRunning()) { - long waitTime = (reactionExecution.lastExecutionTime() + nextReaction.updateInterval()) - System.currentTimeMillis(); - logger.debug(String.format("Time to wait for next Reaction: %d millseconds.", waitTime)); - if (waitTime <= 0) { - break; + long waitTime = (reactionExecution.lastExecutionTime() + nextReaction.updateInterval()) - System.currentTimeMillis(); + logger.debug(String.format("Time to wait for next Reaction: %d millseconds.", waitTime)); + if (waitTime > 0) { + synchronized (reactions) { + try { + logger.debug(String.format("Waiting for %d milliseconds.", waitTime)); + reactions.wait(waitTime); + } catch (InterruptedException ie1) { + /* we’re looping! */ + } } - try { - logger.debug(String.format("Waiting for %d milliseconds.", waitTime)); - TimeUnit.MILLISECONDS.sleep(waitTime); - } catch (InterruptedException ie1) { - /* we’re looping! */ - } - } - /* are we still running? */ - if (!isRunning()) { - break; + /* re-start loop to check for new reactions. */ + continue; } /* run reaction. */ @@ -137,16 +153,20 @@ public class Engine extends AbstractExecutionThreadService { /* convert states. */ for (Filter filter : nextReaction.filters()) { - net.pterodactylus.reactor.State newState = filter.filter(state); - logger.debug(String.format("Old state is %s, new state is %s.", state, newState)); - state = newState; + if (state.success()) { + net.pterodactylus.reactor.State newState = filter.filter(state); + logger.debug(String.format("Old state is %s, new state is %s.", state, newState)); + state = newState; + } + } + if (state.success()) { + reactionExecution.addState(state); } - reactionExecution.addState(state); /* only run trigger if we have collected two states. */ Trigger trigger = nextReaction.trigger(); boolean triggerHit = false; - if (reactionExecution.previousState() != null) { + if ((reactionExecution.previousState() != null) && state.success()) { logger.debug("Checking Trigger for changes..."); triggerHit = trigger.triggers(reactionExecution.currentState(), reactionExecution.previousState()); } @@ -155,7 +175,7 @@ public class Engine extends AbstractExecutionThreadService { logger.debug(String.format("Trigger was hit: %s.", triggerHit)); if (triggerHit) { logger.info("Executing Action..."); - nextReaction.action().execute(trigger.trigger()); + nextReaction.action().execute(trigger.output()); } }