package net.pterodactylus.sonitus.data.filter;
-import java.io.EOFException;
import java.io.IOException;
import java.util.logging.Logger;
-import net.pterodactylus.sonitus.data.ConnectException;
+import net.pterodactylus.sonitus.data.AbstractFilter;
+import net.pterodactylus.sonitus.data.DataPacket;
import net.pterodactylus.sonitus.data.Filter;
import net.pterodactylus.sonitus.data.Metadata;
-import net.pterodactylus.sonitus.data.Source;
-
-import com.google.common.base.Preconditions;
/**
* Rate limiting filter that only passes a specified amount of data per second
- * from its {@link Source} to its {@link net.pterodactylus.sonitus.data.Sink}.
+ * from its source to its sinks.
*
* @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
*/
-public class RateLimitingFilter implements Filter {
+public class RateLimitingFilter extends AbstractFilter implements Filter {
/** The logger. */
private static final Logger logger = Logger.getLogger(RateLimitingFilter.class.getName());
/** The limiting rate in bytes/second. */
private final int rate;
- /** The fast start time. */
- private final long fastStartTime;
-
- /** The source. */
- private Source source;
+ /** The start time. */
+ private long startTime;
- /** The remaining fast start time. */
- private long remainingFastStartTime;
+ /** The number of bytes. */
+ private long counter;
/**
* Creates a new rate limiting filter.
*
+ * @param name
+ * The name of the filter
* @param rate
- * The limiting rate (in bytes/second)
*/
- public RateLimitingFilter(int rate) {
- this(rate, 0);
+ public RateLimitingFilter(String name, int rate) {
+ this(name, rate, 0);
}
/**
* Creates a new rate limiting filter.
*
+ * @param name
+ * The name of the filter
* @param rate
* The limiting rate (in bytes/second)
* @param fastStartTime
* The amount of time at the start of the filtering during which no delay
- * will occur (in milliseconds)
*/
- public RateLimitingFilter(int rate, long fastStartTime) {
+ public RateLimitingFilter(String name, int rate, long fastStartTime) {
+ super(name);
this.rate = rate;
- this.fastStartTime = fastStartTime;
- remainingFastStartTime = fastStartTime;
+ this.counter = (long) (-rate * (fastStartTime / 1000.0));
}
//
//
@Override
- public Metadata metadata() {
- return source.metadata();
+ public void open(Metadata metadata) throws IOException {
+ super.open(metadata);
+ startTime = System.currentTimeMillis();
}
@Override
- public byte[] get(int bufferSize) throws EOFException, IOException {
- long now = System.currentTimeMillis();
- byte[] buffer = source.get(bufferSize);
+ public void process(DataPacket dataPacket) throws IOException {
+ super.process(dataPacket);
/* delay. */
- long waitTime = 1000 * buffer.length / rate;
- remainingFastStartTime = Math.max(remainingFastStartTime - waitTime, 0);
- while ((remainingFastStartTime == 0) && (System.currentTimeMillis() - now) < waitTime) {
+ counter += dataPacket.buffer().length;
+ long waitTime = (long) (counter / (rate / 1000.0));
+ while ((System.currentTimeMillis() - startTime) < waitTime) {
try {
- long limitDelay = waitTime - (System.currentTimeMillis() - now);
+ long limitDelay = waitTime - (System.currentTimeMillis() - startTime);
logger.finest(String.format("Waiting %d ms...", limitDelay));
Thread.sleep(limitDelay);
} catch (InterruptedException ie1) {
/* ignore, keep looping. */
}
}
- return buffer;
- }
-
- @Override
- public void connect(Source source) throws ConnectException {
- Preconditions.checkNotNull(source, "source must not be null");
-
- this.source = source;
- }
-
- @Override
- public void metadataUpdated() {
- /* ignore. */
+ logger.finest(String.format("Processed %d Bytes during %d ms, that’s %.1f bytes/s.", counter, System.currentTimeMillis() - startTime, counter / ((System.currentTimeMillis() - startTime) / 1000.0)));
}
}