Pull all interfaces into a single interface: Filter.
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / filter / RateLimitingFilter.java
index 7af827c..04ed2d1 100644 (file)
 
 package net.pterodactylus.sonitus.data.filter;
 
-import java.io.EOFException;
 import java.io.IOException;
 import java.util.logging.Logger;
 
-import net.pterodactylus.sonitus.data.ConnectException;
+import net.pterodactylus.sonitus.data.AbstractFilter;
 import net.pterodactylus.sonitus.data.Filter;
 import net.pterodactylus.sonitus.data.Metadata;
-import net.pterodactylus.sonitus.data.Source;
-
-import com.google.common.base.Preconditions;
 
 /**
  * Rate limiting filter that only passes a specified amount of data per second
- * from its {@link Source} to its {@link net.pterodactylus.sonitus.data.Sink}.
+ * from its {@link net.pterodactylus.sonitus.data.Source} to its {@link
+ * net.pterodactylus.sonitus.data.Sink}.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
-public class RateLimitingFilter implements Filter {
+public class RateLimitingFilter extends AbstractFilter implements Filter {
 
        /** The logger. */
        private static final Logger logger = Logger.getLogger(RateLimitingFilter.class.getName());
@@ -42,17 +39,37 @@ public class RateLimitingFilter implements Filter {
        /** The limiting rate in bytes/second. */
        private final int rate;
 
-       /** The source. */
-       private Source source;
+       /** The start time. */
+       private long startTime;
+
+       /** The number of bytes. */
+       private long counter;
+
+       /**
+        * Creates a new rate limiting filter.
+        *
+        * @param name
+        *              The name of the filter
+        * @param rate
+        */
+       public RateLimitingFilter(String name, int rate) {
+               this(name, rate, 0);
+       }
 
        /**
         * Creates a new rate limiting filter.
         *
+        * @param name
+        *              The name of the filter
         * @param rate
         *              The limiting rate (in bytes/second)
+        * @param fastStartTime
+        *              The amount of time at the start of the filtering during which no delay
         */
-       public RateLimitingFilter(int rate) {
+       public RateLimitingFilter(String name, int rate, long fastStartTime) {
+               super(name);
                this.rate = rate;
+               this.counter = (long) (-rate * (fastStartTime / 1000.0));
        }
 
        //
@@ -60,38 +77,27 @@ public class RateLimitingFilter implements Filter {
        //
 
        @Override
-       public Metadata metadata() {
-               return source.metadata();
+       public void open(Metadata metadata) throws IOException {
+               super.open(metadata);
+               startTime = System.currentTimeMillis();
        }
 
        @Override
-       public byte[] get(int bufferSize) throws EOFException, IOException {
-               long now = System.currentTimeMillis();
-               byte[] buffer = source.get(bufferSize);
+       public void process(byte[] buffer) throws IOException {
+               super.process(buffer);
                /* delay. */
-               long waitTime = 1000 * buffer.length / rate;
-               while ((System.currentTimeMillis() - now) < waitTime) {
+               counter += buffer.length;
+               long waitTime = (long) (counter / (rate / 1000.0));
+               while ((System.currentTimeMillis() - startTime) < waitTime) {
                        try {
-                               long limitDelay = waitTime - (System.currentTimeMillis() - now);
+                               long limitDelay = waitTime - (System.currentTimeMillis() - startTime);
                                logger.finest(String.format("Waiting %d ms...", limitDelay));
                                Thread.sleep(limitDelay);
                        } catch (InterruptedException ie1) {
                                /* ignore, keep looping. */
                        }
                }
-               return buffer;
-       }
-
-       @Override
-       public void connect(Source source) throws ConnectException {
-               Preconditions.checkNotNull(source, "source must not be null");
-
-               this.source = source;
-       }
-
-       @Override
-       public void metadataUpdated() {
-               /* ignore. */
+               logger.finest(String.format("Processed %d Bytes during %d ms, that’s %.1f bytes/s.", counter, System.currentTimeMillis() - startTime, counter / ((System.currentTimeMillis() - startTime) / 1000.0)));
        }
 
 }