From d9a909a0e39b2925aece35c2298cfc3c35d35051 Mon Sep 17 00:00:00 2001 From: =?utf8?q?David=20=E2=80=98Bombe=E2=80=99=20Roden?= Date: Sun, 17 Mar 2013 15:04:36 +0100 Subject: [PATCH] Read directly from the source. --- .../sonitus/data/filter/RateLimitingFilter.java | 64 +++++----------------- 1 file changed, 13 insertions(+), 51 deletions(-) diff --git a/src/main/java/net/pterodactylus/sonitus/data/filter/RateLimitingFilter.java b/src/main/java/net/pterodactylus/sonitus/data/filter/RateLimitingFilter.java index e108a3a..7af827c 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/filter/RateLimitingFilter.java +++ b/src/main/java/net/pterodactylus/sonitus/data/filter/RateLimitingFilter.java @@ -17,17 +17,11 @@ package net.pterodactylus.sonitus.data.filter; -import static com.google.common.io.Closeables.close; - import java.io.EOFException; import java.io.IOException; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.util.Arrays; import java.util.logging.Logger; import net.pterodactylus.sonitus.data.ConnectException; -import net.pterodactylus.sonitus.data.Connection; import net.pterodactylus.sonitus.data.Filter; import net.pterodactylus.sonitus.data.Metadata; import net.pterodactylus.sonitus.data.Source; @@ -51,9 +45,6 @@ public class RateLimitingFilter implements Filter { /** The source. */ private Source source; - /** The input stream to read from. */ - private PipedInputStream pipedInputStream = new PipedInputStream(); - /** * Creates a new rate limiting filter. * @@ -75,12 +66,20 @@ public class RateLimitingFilter implements Filter { @Override public byte[] get(int bufferSize) throws EOFException, IOException { - byte[] buffer = new byte[bufferSize]; - int read = pipedInputStream.read(buffer); - if (read == -1) { - throw new EOFException(); + long now = System.currentTimeMillis(); + byte[] buffer = source.get(bufferSize); + /* delay. */ + long waitTime = 1000 * buffer.length / rate; + while ((System.currentTimeMillis() - now) < waitTime) { + try { + long limitDelay = waitTime - (System.currentTimeMillis() - now); + logger.finest(String.format("Waiting %d ms...", limitDelay)); + Thread.sleep(limitDelay); + } catch (InterruptedException ie1) { + /* ignore, keep looping. */ + } } - return Arrays.copyOf(buffer, read); + return buffer; } @Override @@ -88,43 +87,6 @@ public class RateLimitingFilter implements Filter { Preconditions.checkNotNull(source, "source must not be null"); this.source = source; - final long start = System.currentTimeMillis(); - try { - pipedInputStream = new PipedInputStream(); - final PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream); - new Thread(new Connection(source) { - - @Override - protected int bufferSize() { - return rate; - } - - @Override - protected void feed(byte[] buffer) throws IOException { - long waitTime = 1000 * buffer.length / rate; - long now = System.currentTimeMillis(); - pipedOutputStream.write(buffer); - pipedOutputStream.flush(); - while ((System.currentTimeMillis() - now) < waitTime) { - try { - long limitDelay = waitTime - (System.currentTimeMillis() - now); - logger.finest(String.format("Waiting %d ms...", limitDelay)); - Thread.sleep(limitDelay); - } catch (InterruptedException ie1) { - /* ignore, keep looping. */ - } - } - } - - @Override - protected void finish() throws IOException { - close(pipedInputStream, true); - close(pipedOutputStream, true); - } - }).start(); - } catch (IOException ioe1) { - throw new ConnectException(ioe1); - } } @Override -- 2.7.4