Pull all interfaces into a single interface: Filter.
[sonitus.git] / src / main / java / net / pterodactylus / sonitus / data / filter / RateLimitingFilter.java
index e108a3a..04ed2d1 100644 (file)
 
 package net.pterodactylus.sonitus.data.filter;
 
-import static com.google.common.io.Closeables.close;
-
-import java.io.EOFException;
 import java.io.IOException;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.util.Arrays;
 import java.util.logging.Logger;
 
-import net.pterodactylus.sonitus.data.ConnectException;
-import net.pterodactylus.sonitus.data.Connection;
+import net.pterodactylus.sonitus.data.AbstractFilter;
 import net.pterodactylus.sonitus.data.Filter;
 import net.pterodactylus.sonitus.data.Metadata;
-import net.pterodactylus.sonitus.data.Source;
-
-import com.google.common.base.Preconditions;
 
 /**
  * Rate limiting filter that only passes a specified amount of data per second
- * from its {@link Source} to its {@link net.pterodactylus.sonitus.data.Sink}.
+ * from its {@link net.pterodactylus.sonitus.data.Source} to its {@link
+ * net.pterodactylus.sonitus.data.Sink}.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
-public class RateLimitingFilter implements Filter {
+public class RateLimitingFilter extends AbstractFilter implements Filter {
 
        /** The logger. */
        private static final Logger logger = Logger.getLogger(RateLimitingFilter.class.getName());
@@ -48,20 +39,37 @@ public class RateLimitingFilter implements Filter {
        /** The limiting rate in bytes/second. */
        private final int rate;
 
-       /** The source. */
-       private Source source;
+       /** The start time. */
+       private long startTime;
+
+       /** The number of bytes. */
+       private long counter;
 
-       /** The input stream to read from. */
-       private PipedInputStream pipedInputStream = new PipedInputStream();
+       /**
+        * Creates a new rate limiting filter.
+        *
+        * @param name
+        *              The name of the filter
+        * @param rate
+        */
+       public RateLimitingFilter(String name, int rate) {
+               this(name, rate, 0);
+       }
 
        /**
         * Creates a new rate limiting filter.
         *
+        * @param name
+        *              The name of the filter
         * @param rate
         *              The limiting rate (in bytes/second)
+        * @param fastStartTime
+        *              The amount of time at the start of the filtering during which no delay
         */
-       public RateLimitingFilter(int rate) {
+       public RateLimitingFilter(String name, int rate, long fastStartTime) {
+               super(name);
                this.rate = rate;
+               this.counter = (long) (-rate * (fastStartTime / 1000.0));
        }
 
        //
@@ -69,67 +77,27 @@ public class RateLimitingFilter implements Filter {
        //
 
        @Override
-       public Metadata metadata() {
-               return source.metadata();
-       }
-
-       @Override
-       public byte[] get(int bufferSize) throws EOFException, IOException {
-               byte[] buffer = new byte[bufferSize];
-               int read = pipedInputStream.read(buffer);
-               if (read == -1) {
-                       throw new EOFException();
-               }
-               return Arrays.copyOf(buffer, read);
+       public void open(Metadata metadata) throws IOException {
+               super.open(metadata);
+               startTime = System.currentTimeMillis();
        }
 
        @Override
-       public void connect(Source source) throws ConnectException {
-               Preconditions.checkNotNull(source, "source must not be null");
-
-               this.source = source;
-               final long start = System.currentTimeMillis();
-               try {
-                       pipedInputStream = new PipedInputStream();
-                       final PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream);
-                       new Thread(new Connection(source) {
-
-                               @Override
-                               protected int bufferSize() {
-                                       return rate;
-                               }
-
-                               @Override
-                               protected void feed(byte[] buffer) throws IOException {
-                                       long waitTime = 1000 * buffer.length / rate;
-                                       long now = System.currentTimeMillis();
-                                       pipedOutputStream.write(buffer);
-                                       pipedOutputStream.flush();
-                                       while ((System.currentTimeMillis() - now) < waitTime) {
-                                               try {
-                                                       long limitDelay = waitTime - (System.currentTimeMillis() - now);
-                                                       logger.finest(String.format("Waiting %d ms...", limitDelay));
-                                                       Thread.sleep(limitDelay);
-                                               } catch (InterruptedException ie1) {
-                                                       /* ignore, keep looping. */
-                                               }
-                                       }
-                               }
-
-                               @Override
-                               protected void finish() throws IOException {
-                                       close(pipedInputStream, true);
-                                       close(pipedOutputStream, true);
-                               }
-                       }).start();
-               } catch (IOException ioe1) {
-                       throw new ConnectException(ioe1);
+       public void process(byte[] buffer) throws IOException {
+               super.process(buffer);
+               /* delay. */
+               counter += buffer.length;
+               long waitTime = (long) (counter / (rate / 1000.0));
+               while ((System.currentTimeMillis() - startTime) < waitTime) {
+                       try {
+                               long limitDelay = waitTime - (System.currentTimeMillis() - startTime);
+                               logger.finest(String.format("Waiting %d ms...", limitDelay));
+                               Thread.sleep(limitDelay);
+                       } catch (InterruptedException ie1) {
+                               /* ignore, keep looping. */
+                       }
                }
-       }
-
-       @Override
-       public void metadataUpdated() {
-               /* ignore. */
+               logger.finest(String.format("Processed %d Bytes during %d ms, that’s %.1f bytes/s.", counter, System.currentTimeMillis() - startTime, counter / ((System.currentTimeMillis() - startTime) / 1000.0)));
        }
 
 }