import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import java.util.logging.Logger;
import net.pterodactylus.sonitus.data.AbstractFilter;
import net.pterodactylus.sonitus.data.DataPacket;
import net.pterodactylus.sonitus.data.Metadata;
import net.pterodactylus.sonitus.data.Pipeline.Connection;
-import com.google.common.base.Function;
-import com.google.common.collect.FluentIterable;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
/**
* {@link Filter} that combines several filters into one.
*/
public class PipelineFilter extends AbstractFilter implements Filter {
+ /** The logger. */
+ private static final Logger logger = Logger.getLogger(PipelineFilter.class.getName());
+
/** The first filter. */
private final Filter source;
/** The last filter (for convenience). */
private final Filter lastFilter;
+ /** The connections for each filter. */
+ private final Map<Filter, Connection> filterConnections = Maps.newHashMap();
+
/**
* Creates a new pipeline filter.
*
filter.open(currentMetadata);
currentMetadata = filter.metadata();
Connection connection = new Connection(currentSource, Arrays.asList(filter));
- String threadName = String.format("%s → %s.", connection.source().name(), FluentIterable.from(connection.sinks()).transform(new Function<Filter, String>() {
-
- @Override
- public String apply(Filter sink) {
- return sink.name();
- }
- }));
+ filterConnections.put(filter, connection);
+ String threadName = String.format("%s → %s", connection.source().name(), filter.name());
+ logger.info(String.format("Starting Thread: %s.", threadName));
new Thread(connection, threadName).start();
currentSource = filter;
}
+ metadataUpdated(currentMetadata);
}
@Override
public DataPacket get(int bufferSize) throws IOException {
+ if (filterConnections.get(lastFilter).ioException().isPresent()) {
+ logger.info(String.format("Rethrowing exception from %s: %s", lastFilter.name(), filterConnections.get(lastFilter).ioException().get().getMessage()));
+ throw filterConnections.get(lastFilter).ioException().get();
+ }
+ logger.info(String.format("Requesting %d bytes from %s...", bufferSize, lastFilter.name()));
return lastFilter.get(bufferSize);
}