Read directly from the source.
authorDavid ‘Bombe’ Roden <bombe@pterodactylus.net>
Sun, 17 Mar 2013 14:04:36 +0000 (15:04 +0100)
committerDavid ‘Bombe’ Roden <bombe@pterodactylus.net>
Sun, 17 Mar 2013 14:04:36 +0000 (15:04 +0100)
src/main/java/net/pterodactylus/sonitus/data/filter/RateLimitingFilter.java

index e108a3a..7af827c 100644 (file)
 
 package net.pterodactylus.sonitus.data.filter;
 
-import static com.google.common.io.Closeables.close;
-
 import java.io.EOFException;
 import java.io.IOException;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.util.Arrays;
 import java.util.logging.Logger;
 
 import net.pterodactylus.sonitus.data.ConnectException;
-import net.pterodactylus.sonitus.data.Connection;
 import net.pterodactylus.sonitus.data.Filter;
 import net.pterodactylus.sonitus.data.Metadata;
 import net.pterodactylus.sonitus.data.Source;
@@ -51,9 +45,6 @@ public class RateLimitingFilter implements Filter {
        /** The source. */
        private Source source;
 
-       /** The input stream to read from. */
-       private PipedInputStream pipedInputStream = new PipedInputStream();
-
        /**
         * Creates a new rate limiting filter.
         *
@@ -75,12 +66,20 @@ public class RateLimitingFilter implements Filter {
 
        @Override
        public byte[] get(int bufferSize) throws EOFException, IOException {
-               byte[] buffer = new byte[bufferSize];
-               int read = pipedInputStream.read(buffer);
-               if (read == -1) {
-                       throw new EOFException();
+               long now = System.currentTimeMillis();
+               byte[] buffer = source.get(bufferSize);
+               /* delay. */
+               long waitTime = 1000 * buffer.length / rate;
+               while ((System.currentTimeMillis() - now) < waitTime) {
+                       try {
+                               long limitDelay = waitTime - (System.currentTimeMillis() - now);
+                               logger.finest(String.format("Waiting %d ms...", limitDelay));
+                               Thread.sleep(limitDelay);
+                       } catch (InterruptedException ie1) {
+                               /* ignore, keep looping. */
+                       }
                }
-               return Arrays.copyOf(buffer, read);
+               return buffer;
        }
 
        @Override
@@ -88,43 +87,6 @@ public class RateLimitingFilter implements Filter {
                Preconditions.checkNotNull(source, "source must not be null");
 
                this.source = source;
-               final long start = System.currentTimeMillis();
-               try {
-                       pipedInputStream = new PipedInputStream();
-                       final PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream);
-                       new Thread(new Connection(source) {
-
-                               @Override
-                               protected int bufferSize() {
-                                       return rate;
-                               }
-
-                               @Override
-                               protected void feed(byte[] buffer) throws IOException {
-                                       long waitTime = 1000 * buffer.length / rate;
-                                       long now = System.currentTimeMillis();
-                                       pipedOutputStream.write(buffer);
-                                       pipedOutputStream.flush();
-                                       while ((System.currentTimeMillis() - now) < waitTime) {
-                                               try {
-                                                       long limitDelay = waitTime - (System.currentTimeMillis() - now);
-                                                       logger.finest(String.format("Waiting %d ms...", limitDelay));
-                                                       Thread.sleep(limitDelay);
-                                               } catch (InterruptedException ie1) {
-                                                       /* ignore, keep looping. */
-                                               }
-                                       }
-                               }
-
-                               @Override
-                               protected void finish() throws IOException {
-                                       close(pipedInputStream, true);
-                                       close(pipedOutputStream, true);
-                               }
-                       }).start();
-               } catch (IOException ioe1) {
-                       throw new ConnectException(ioe1);
-               }
        }
 
        @Override