Store reactions with their name, change synchronization.
authorDavid ‘Bombe’ Roden <bombe@pterodactylus.net>
Fri, 4 Jan 2013 16:53:53 +0000 (17:53 +0100)
committerDavid ‘Bombe’ Roden <bombe@pterodactylus.net>
Fri, 4 Jan 2013 16:53:53 +0000 (17:53 +0100)
src/main/java/net/pterodactylus/reactor/engine/Engine.java

index 9edbdb4..0fb3f18 100644 (file)
 
 package net.pterodactylus.reactor.engine;
 
+import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 import java.util.SortedMap;
-import java.util.concurrent.TimeUnit;
 
 import net.pterodactylus.reactor.Filter;
 import net.pterodactylus.reactor.Query;
@@ -32,9 +31,7 @@ import net.pterodactylus.reactor.states.FailedState;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.AbstractExecutionThreadService;
-import com.google.common.util.concurrent.Uninterruptibles;
 
 /**
  * Reactor main engine.
@@ -47,9 +44,11 @@ public class Engine extends AbstractExecutionThreadService {
        private static final Logger logger = Logger.getLogger(Engine.class);
 
        /** All defined reactions. */
-       private final Set<Reaction> reactions = Sets.newHashSet();
+       /* synchronize on itself. */
+       private final Map<String, Reaction> reactions = new HashMap<String, Reaction>();
 
        /** Reaction states. */
+       /* synchronize on reactions. */
        private final Map<Reaction, ReactionExecution> reactionExecutions = Maps.newHashMap();
 
        //
@@ -59,13 +58,24 @@ public class Engine extends AbstractExecutionThreadService {
        /**
         * Adds the given reaction to this engine.
         *
+        * @param name
+        *            The name of the reaction
         * @param reaction
         *            The reaction to add to this engine
+        * @throws IllegalStateException
+        *             if the engine already contains a {@link Reaction} with the
+        *             given name
         */
        @SuppressWarnings("synthetic-access")
-       public void addReaction(Reaction reaction) {
-               reactions.add(reaction);
-               reactionExecutions.put(reaction, new ReactionExecution());
+       public void addReaction(String name, Reaction reaction) {
+               synchronized (reactions) {
+                       if (reactions.containsKey(name)) {
+                               throw new IllegalStateException(String.format("Engine already contains a Reaction named “%s!”", name));
+                       }
+                       reactions.put(name, reaction);
+                       reactionExecutions.put(reaction, new ReactionExecution());
+                       reactions.notifyAll();
+               }
        }
 
        //
@@ -80,40 +90,46 @@ public class Engine extends AbstractExecutionThreadService {
                while (isRunning()) {
 
                        /* delay if we have no reactions. */
-                       if (reactions.isEmpty()) {
-                               logger.trace("Sleeping for 1 second while no Reactions available.");
-                               Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
-                               continue;
+                       synchronized (reactions) {
+                               if (reactions.isEmpty()) {
+                                       logger.debug("Sleeping while no Reactions available.");
+                                       try {
+                                               reactions.wait();
+                                       } catch (InterruptedException ie1) {
+                                               /* ignore, we’re looping anyway. */
+                                       }
+                                       continue;
+                               }
                        }
 
                        /* find next reaction. */
                        SortedMap<Long, Reaction> nextReactions = Maps.newTreeMap();
-                       for (Reaction reaction : reactions) {
-                               ReactionExecution reactionExecution = reactionExecutions.get(reaction);
-                               nextReactions.put(reactionExecution.lastExecutionTime() + reaction.updateInterval(), reaction);
+                       Reaction nextReaction;
+                       ReactionExecution reactionExecution;
+                       synchronized (reactions) {
+                               for (Reaction reaction : reactions.values()) {
+                                       nextReactions.put(reactionExecutions.get(reaction).lastExecutionTime() + reaction.updateInterval(), reaction);
+                               }
+                               nextReaction = nextReactions.get(nextReactions.firstKey());
+                               reactionExecution = reactionExecutions.get(nextReaction);
                        }
-                       Reaction nextReaction = nextReactions.get(nextReactions.firstKey());
-                       ReactionExecution reactionExecution = reactionExecutions.get(nextReaction);
                        logger.debug(String.format("Next Reaction: %s.", nextReaction));
 
                        /* wait until the next reaction has to run. */
-                       while (isRunning()) {
-                               long waitTime = (reactionExecution.lastExecutionTime() + nextReaction.updateInterval()) - System.currentTimeMillis();
-                               logger.debug(String.format("Time to wait for next Reaction: %d millseconds.", waitTime));
-                               if (waitTime <= 0) {
-                                       break;
-                               }
-                               try {
-                                       logger.debug(String.format("Waiting for %d milliseconds.", waitTime));
-                                       TimeUnit.MILLISECONDS.sleep(waitTime);
-                               } catch (InterruptedException ie1) {
-                                       /* we’re looping! */
+                       long waitTime = (reactionExecution.lastExecutionTime() + nextReaction.updateInterval()) - System.currentTimeMillis();
+                       logger.debug(String.format("Time to wait for next Reaction: %d millseconds.", waitTime));
+                       if (waitTime > 0) {
+                               synchronized (reactions) {
+                                       try {
+                                               logger.debug(String.format("Waiting for %d milliseconds.", waitTime));
+                                               reactions.wait(waitTime);
+                                       } catch (InterruptedException ie1) {
+                                               /* we’re looping! */
+                                       }
                                }
-                       }
 
-                       /* are we still running? */
-                       if (!isRunning()) {
-                               break;
+                               /* re-start loop to check for new reactions. */
+                               continue;
                        }
 
                        /* run reaction. */