From aeca75a399a76358c7474168197911ed39ab5e56 Mon Sep 17 00:00:00 2001 From: =?utf8?q?David=20=E2=80=98Bombe=E2=80=99=20Roden?= Date: Sun, 17 Mar 2013 15:00:51 +0100 Subject: [PATCH] Forward get() calls directly to the current source. --- .../sonitus/data/filter/MultiSourceFilter.java | 141 ++------------------- 1 file changed, 12 insertions(+), 129 deletions(-) diff --git a/src/main/java/net/pterodactylus/sonitus/data/filter/MultiSourceFilter.java b/src/main/java/net/pterodactylus/sonitus/data/filter/MultiSourceFilter.java index 1dc3a8c..6092a4e 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/filter/MultiSourceFilter.java +++ b/src/main/java/net/pterodactylus/sonitus/data/filter/MultiSourceFilter.java @@ -21,12 +21,7 @@ import static com.google.common.base.Preconditions.*; import java.io.EOFException; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.PipedInputStream; -import java.io.PipedOutputStream; -import java.util.Arrays; -import java.util.logging.Level; +import java.util.concurrent.atomic.AtomicReference; import java.util.logging.Logger; import net.pterodactylus.sonitus.data.ConnectException; @@ -50,14 +45,11 @@ public class MultiSourceFilter implements Filter, ReusableSink { /** The logger. */ private static final Logger logger = Logger.getLogger(MultiSourceFilter.class.getName()); - /** Object used for synchronization. */ - private final Object syncObject = new Object(); - /** The event bus. */ private final EventBus eventBus; - /** The connection. */ - private Connection connection; + /** The current source. */ + private final AtomicReference source = new AtomicReference(); @Inject public MultiSourceFilter(EventBus eventBus) { @@ -66,39 +58,25 @@ public class MultiSourceFilter implements Filter, ReusableSink { @Override public Metadata metadata() { - synchronized (syncObject) { - return connection.source.metadata(); - } + return source.get().metadata(); } @Override public byte[] get(int bufferSize) throws EOFException, IOException { - byte[] buffer = new byte[bufferSize]; - InputStream inputStream; - synchronized (syncObject) { - inputStream = connection.pipedInputStream; - } - int read = inputStream.read(buffer); - return Arrays.copyOf(buffer, read); + return source.get().get(bufferSize); } @Override public void connect(Source source) throws ConnectException { checkNotNull(source, "source must not be null"); - if ((connection != null) && (connection.source != null)) { - checkArgument(connection.source.metadata().channels() == source.metadata().channels(), "source’s channel count must equal existing source’s channel count"); - checkArgument(connection.source.metadata().frequency() == source.metadata().frequency(), "source’s frequency must equal existing source’s frequency"); - checkArgument(connection.source.metadata().encoding().equalsIgnoreCase(source.metadata().encoding()), "source’s encoding must equal existing source’s encoding"); - } - if (connection == null) { - connection = new Connection(); - new Thread(connection).start(); - } - try { - connection.source(source); - } catch (IOException ioe1) { - throw new ConnectException(ioe1); + Source oldSource = this.source.getAndSet(source); + if (oldSource != null) { + checkArgument(oldSource.metadata().channels() == source.metadata().channels(), "source’s channel count must equal existing source’s channel count"); + checkArgument(oldSource.metadata().frequency() == source.metadata().frequency(), "source’s frequency must equal existing source’s frequency"); + checkArgument(oldSource.metadata().encoding().equalsIgnoreCase(source.metadata().encoding()), "source’s encoding must equal existing source’s encoding"); + + eventBus.post(new SourceFinishedEvent(oldSource)); } } @@ -107,99 +85,4 @@ public class MultiSourceFilter implements Filter, ReusableSink { /* ignore. */ } - /** - * The connection feeds the input from the currently connected source to the - * input stream that {@link #get(int)} will get its data from. - * - * @author David ‘Bombe’ Roden - */ - private class Connection implements Runnable { - - /** The currently connected source. */ - /* synchronized by syncObject. */ - private Source source; - - /** The input stream that {@link #get(int)} will read from. */ - /* synchronized by syncObject. */ - private PipedInputStream pipedInputStream; - - /** The output stream that the source will be fed into. */ - /* synchronized by syncObject. */ - private PipedOutputStream pipedOutputStream; - - /** - * Changes the source of the connection. - * - * @param source - * The new source of the connection - * @return This connection - * @throws IOException - * if an I/O error occurs - */ - public Connection source(Source source) throws IOException { - synchronized (syncObject) { - Source oldSource = this.source; - this.source = source; - if (oldSource != null) { - eventBus.post(new SourceFinishedEvent(oldSource)); - } - pipedInputStream = new PipedInputStream(); - pipedOutputStream = new PipedOutputStream(pipedInputStream); - syncObject.notifyAll(); - } - return this; - } - - @Override - public void run() { - while (true) { - /* wait for source to be set. */ - OutputStream outputStream; - Source source; - logger.finest("Entering synchronized block..."); - synchronized (syncObject) { - logger.finest("Entered synchronized block."); - source = this.source; - while (source == null) { - try { - logger.finest("Waiting for source to connect..."); - syncObject.wait(); - } catch (InterruptedException ie1) { - /* ignore, keep waiting. */ - } - source = this.source; - } - outputStream = pipedOutputStream; - } - logger.finest("Exited synchronized block."); - - byte[] buffer = null; - boolean readSuccessful = false; - while (!readSuccessful) { - try { - buffer = source.get(4096); - logger.finest(String.format("Read %d Bytes.", buffer.length)); - if (buffer.length > 0) { - readSuccessful = true; - } - } catch (IOException e) { - /* TODO - notify & wait */ - } - } - - try { - outputStream.write(buffer); - logger.finest(String.format("Wrote %d Bytes.", buffer.length)); - } catch (IOException ioe1) { - /* okay, the sink has died, exit. */ - logger.log(Level.WARNING, "Could not write to pipe!", ioe1); - break; - } - } - - logger.info("Exiting."); - } - - } - } -- 2.7.4