From a736da3d2284792908fbe4d0a860127f862be112 Mon Sep 17 00:00:00 2001 From: =?utf8?q?David=20=E2=80=98Bombe=E2=80=99=20Roden?= Date: Fri, 4 Jan 2013 17:53:53 +0100 Subject: [PATCH] Store reactions with their name, change synchronization. --- .../net/pterodactylus/reactor/engine/Engine.java | 80 +++++++++++++--------- 1 file changed, 48 insertions(+), 32 deletions(-) diff --git a/src/main/java/net/pterodactylus/reactor/engine/Engine.java b/src/main/java/net/pterodactylus/reactor/engine/Engine.java index 9edbdb4..0fb3f18 100644 --- a/src/main/java/net/pterodactylus/reactor/engine/Engine.java +++ b/src/main/java/net/pterodactylus/reactor/engine/Engine.java @@ -17,10 +17,9 @@ package net.pterodactylus.reactor.engine; +import java.util.HashMap; import java.util.Map; -import java.util.Set; import java.util.SortedMap; -import java.util.concurrent.TimeUnit; import net.pterodactylus.reactor.Filter; import net.pterodactylus.reactor.Query; @@ -32,9 +31,7 @@ import net.pterodactylus.reactor.states.FailedState; import org.apache.log4j.Logger; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.google.common.util.concurrent.AbstractExecutionThreadService; -import com.google.common.util.concurrent.Uninterruptibles; /** * Reactor main engine. @@ -47,9 +44,11 @@ public class Engine extends AbstractExecutionThreadService { private static final Logger logger = Logger.getLogger(Engine.class); /** All defined reactions. */ - private final Set reactions = Sets.newHashSet(); + /* synchronize on itself. */ + private final Map reactions = new HashMap(); /** Reaction states. */ + /* synchronize on reactions. */ private final Map reactionExecutions = Maps.newHashMap(); // @@ -59,13 +58,24 @@ public class Engine extends AbstractExecutionThreadService { /** * Adds the given reaction to this engine. * + * @param name + * The name of the reaction * @param reaction * The reaction to add to this engine + * @throws IllegalStateException + * if the engine already contains a {@link Reaction} with the + * given name */ @SuppressWarnings("synthetic-access") - public void addReaction(Reaction reaction) { - reactions.add(reaction); - reactionExecutions.put(reaction, new ReactionExecution()); + public void addReaction(String name, Reaction reaction) { + synchronized (reactions) { + if (reactions.containsKey(name)) { + throw new IllegalStateException(String.format("Engine already contains a Reaction named “%s!”", name)); + } + reactions.put(name, reaction); + reactionExecutions.put(reaction, new ReactionExecution()); + reactions.notifyAll(); + } } // @@ -80,40 +90,46 @@ public class Engine extends AbstractExecutionThreadService { while (isRunning()) { /* delay if we have no reactions. */ - if (reactions.isEmpty()) { - logger.trace("Sleeping for 1 second while no Reactions available."); - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); - continue; + synchronized (reactions) { + if (reactions.isEmpty()) { + logger.debug("Sleeping while no Reactions available."); + try { + reactions.wait(); + } catch (InterruptedException ie1) { + /* ignore, we’re looping anyway. */ + } + continue; + } } /* find next reaction. */ SortedMap nextReactions = Maps.newTreeMap(); - for (Reaction reaction : reactions) { - ReactionExecution reactionExecution = reactionExecutions.get(reaction); - nextReactions.put(reactionExecution.lastExecutionTime() + reaction.updateInterval(), reaction); + Reaction nextReaction; + ReactionExecution reactionExecution; + synchronized (reactions) { + for (Reaction reaction : reactions.values()) { + nextReactions.put(reactionExecutions.get(reaction).lastExecutionTime() + reaction.updateInterval(), reaction); + } + nextReaction = nextReactions.get(nextReactions.firstKey()); + reactionExecution = reactionExecutions.get(nextReaction); } - Reaction nextReaction = nextReactions.get(nextReactions.firstKey()); - ReactionExecution reactionExecution = reactionExecutions.get(nextReaction); logger.debug(String.format("Next Reaction: %s.", nextReaction)); /* wait until the next reaction has to run. */ - while (isRunning()) { - long waitTime = (reactionExecution.lastExecutionTime() + nextReaction.updateInterval()) - System.currentTimeMillis(); - logger.debug(String.format("Time to wait for next Reaction: %d millseconds.", waitTime)); - if (waitTime <= 0) { - break; - } - try { - logger.debug(String.format("Waiting for %d milliseconds.", waitTime)); - TimeUnit.MILLISECONDS.sleep(waitTime); - } catch (InterruptedException ie1) { - /* we’re looping! */ + long waitTime = (reactionExecution.lastExecutionTime() + nextReaction.updateInterval()) - System.currentTimeMillis(); + logger.debug(String.format("Time to wait for next Reaction: %d millseconds.", waitTime)); + if (waitTime > 0) { + synchronized (reactions) { + try { + logger.debug(String.format("Waiting for %d milliseconds.", waitTime)); + reactions.wait(waitTime); + } catch (InterruptedException ie1) { + /* we’re looping! */ + } } - } - /* are we still running? */ - if (!isRunning()) { - break; + /* re-start loop to check for new reactions. */ + continue; } /* run reaction. */ -- 2.7.4