X-Git-Url: https://git.pterodactylus.net/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fnet%2Fpterodactylus%2Fsonitus%2Fdata%2Ffilter%2FRateLimitingFilter.java;h=04ed2d18635b908dee782578ea06055228c739d3;hb=633a841142f978235ed9f745b6ba16c278963e62;hp=a5732955696e4df5bb8c33e387558bcf4216fe5e;hpb=6e88b4e3439165dbd6584c79d3f909e10af95f49;p=sonitus.git diff --git a/src/main/java/net/pterodactylus/sonitus/data/filter/RateLimitingFilter.java b/src/main/java/net/pterodactylus/sonitus/data/filter/RateLimitingFilter.java index a573295..04ed2d1 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/filter/RateLimitingFilter.java +++ b/src/main/java/net/pterodactylus/sonitus/data/filter/RateLimitingFilter.java @@ -17,31 +17,21 @@ package net.pterodactylus.sonitus.data.filter; -import static com.google.common.io.Closeables.close; - -import java.io.EOFException; import java.io.IOException; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.util.Arrays; import java.util.logging.Logger; -import net.pterodactylus.sonitus.data.ConnectException; -import net.pterodactylus.sonitus.data.Connection; +import net.pterodactylus.sonitus.data.AbstractFilter; import net.pterodactylus.sonitus.data.Filter; -import net.pterodactylus.sonitus.data.Format; import net.pterodactylus.sonitus.data.Metadata; -import net.pterodactylus.sonitus.data.Source; - -import com.google.common.base.Preconditions; /** * Rate limiting filter that only passes a specified amount of data per second - * from its {@link Source} to its {@link net.pterodactylus.sonitus.data.Sink}. + * from its {@link net.pterodactylus.sonitus.data.Source} to its {@link + * net.pterodactylus.sonitus.data.Sink}. * * @author David ‘Bombe’ Roden */ -public class RateLimitingFilter implements Filter { +public class RateLimitingFilter extends AbstractFilter implements Filter { /** The logger. */ private static final Logger logger = Logger.getLogger(RateLimitingFilter.class.getName()); @@ -49,20 +39,37 @@ public class RateLimitingFilter implements Filter { /** The limiting rate in bytes/second. */ private final int rate; - /** The source’s format. */ - private Source source; + /** The start time. */ + private long startTime; + + /** The number of bytes. */ + private long counter; - /** The input stream to read from. */ - private PipedInputStream pipedInputStream = new PipedInputStream(); + /** + * Creates a new rate limiting filter. + * + * @param name + * The name of the filter + * @param rate + */ + public RateLimitingFilter(String name, int rate) { + this(name, rate, 0); + } /** * Creates a new rate limiting filter. * + * @param name + * The name of the filter * @param rate * The limiting rate (in bytes/second) + * @param fastStartTime + * The amount of time at the start of the filtering during which no delay */ - public RateLimitingFilter(int rate) { + public RateLimitingFilter(String name, int rate, long fastStartTime) { + super(name); this.rate = rate; + this.counter = (long) (-rate * (fastStartTime / 1000.0)); } // @@ -70,67 +77,27 @@ public class RateLimitingFilter implements Filter { // @Override - public Format format() { - return source.format(); - } - - @Override - public Metadata metadata() { - return source.metadata(); - } - - @Override - public byte[] get(int bufferSize) throws EOFException, IOException { - byte[] buffer = new byte[bufferSize]; - int read = pipedInputStream.read(buffer); - if (read == -1) { - throw new EOFException(); - } - return Arrays.copyOf(buffer, read); + public void open(Metadata metadata) throws IOException { + super.open(metadata); + startTime = System.currentTimeMillis(); } @Override - public void connect(Source source) throws ConnectException { - Preconditions.checkNotNull(source, "source must not be null"); - - this.source = source; - final long start = System.currentTimeMillis(); - try { - pipedInputStream = new PipedInputStream(); - final PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream); - new Thread(new Connection(source) { - - @Override - protected int bufferSize() { - return rate; - } - - @Override - protected void feed(byte[] buffer) throws IOException { - long waitTime = 1000 * buffer.length / rate; - long now = System.currentTimeMillis(); - pipedOutputStream.write(buffer); - pipedOutputStream.flush(); - while ((System.currentTimeMillis() - now) < waitTime) { - try { - long limitDelay = waitTime - (System.currentTimeMillis() - now); - logger.finest(String.format("Waiting %d ms...", limitDelay)); - Thread.sleep(limitDelay); - } catch (InterruptedException ie1) { - /* ignore, keep looping. */ - } - } - } - - @Override - protected void finish() throws IOException { - close(pipedInputStream, true); - close(pipedOutputStream, true); - } - }).start(); - } catch (IOException ioe1) { - throw new ConnectException(ioe1); + public void process(byte[] buffer) throws IOException { + super.process(buffer); + /* delay. */ + counter += buffer.length; + long waitTime = (long) (counter / (rate / 1000.0)); + while ((System.currentTimeMillis() - startTime) < waitTime) { + try { + long limitDelay = waitTime - (System.currentTimeMillis() - startTime); + logger.finest(String.format("Waiting %d ms...", limitDelay)); + Thread.sleep(limitDelay); + } catch (InterruptedException ie1) { + /* ignore, keep looping. */ + } } + logger.finest(String.format("Processed %d Bytes during %d ms, that’s %.1f bytes/s.", counter, System.currentTimeMillis() - startTime, counter / ((System.currentTimeMillis() - startTime) / 1000.0))); } }