Don’t connect sources and sinks directly, use a pipeline to move data around.
authorDavid ‘Bombe’ Roden <bombe@pterodactylus.net>
Tue, 19 Mar 2013 07:21:16 +0000 (08:21 +0100)
committerDavid ‘Bombe’ Roden <bombe@pterodactylus.net>
Mon, 27 May 2013 20:54:28 +0000 (22:54 +0200)
21 files changed:
src/main/java/net/pterodactylus/sonitus/data/ConnectException.java [deleted file]
src/main/java/net/pterodactylus/sonitus/data/Connection.java [deleted file]
src/main/java/net/pterodactylus/sonitus/data/Filter.java
src/main/java/net/pterodactylus/sonitus/data/Pipeline.java [new file with mode: 0644]
src/main/java/net/pterodactylus/sonitus/data/ReusableSink.java [deleted file]
src/main/java/net/pterodactylus/sonitus/data/Sink.java
src/main/java/net/pterodactylus/sonitus/data/Source.java
src/main/java/net/pterodactylus/sonitus/data/filter/DummyFilter.java [new file with mode: 0644]
src/main/java/net/pterodactylus/sonitus/data/filter/ExternalFilter.java
src/main/java/net/pterodactylus/sonitus/data/filter/ExternalMp3Decoder.java
src/main/java/net/pterodactylus/sonitus/data/filter/ExternalMp3Encoder.java
src/main/java/net/pterodactylus/sonitus/data/filter/MultiSourceFilter.java [deleted file]
src/main/java/net/pterodactylus/sonitus/data/filter/OggVorbisDecoder.java
src/main/java/net/pterodactylus/sonitus/data/filter/PredicateFilter.java [new file with mode: 0644]
src/main/java/net/pterodactylus/sonitus/data/filter/RateLimitingFilter.java
src/main/java/net/pterodactylus/sonitus/data/filter/SoxResampleFilter.java
src/main/java/net/pterodactylus/sonitus/data/sink/AudioSink.java
src/main/java/net/pterodactylus/sonitus/data/sink/FileSink.java
src/main/java/net/pterodactylus/sonitus/data/sink/Icecast2Sink.java
src/main/java/net/pterodactylus/sonitus/data/source/FileSource.java
src/main/java/net/pterodactylus/sonitus/data/source/MultiSource.java [new file with mode: 0644]

diff --git a/src/main/java/net/pterodactylus/sonitus/data/ConnectException.java b/src/main/java/net/pterodactylus/sonitus/data/ConnectException.java
deleted file mode 100644 (file)
index 004d88c..0000000
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Sonitus - ConnectException.java - Copyright © 2013 David Roden
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
-package net.pterodactylus.sonitus.data;
-
-/**
- * Exception that signals an error when {@link Sink#connect(Source) connecting}
- * a {@link Source} to a {@link Sink}.
- *
- * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
- */
-public class ConnectException extends Exception {
-
-       /** Creates a new connect exception. */
-       public ConnectException() {
-               super();
-       }
-
-       /**
-        * Creates a new connect exception.
-        *
-        * @param message
-        *              The message of the exception
-        */
-       public ConnectException(String message) {
-               super(message);
-       }
-
-       /**
-        * Creates a new connect exception.
-        *
-        * @param cause
-        *              The cause of the exception
-        */
-       public ConnectException(Throwable cause) {
-               super(cause);
-       }
-
-       /**
-        * Creates a new connect exception.
-        *
-        * @param message
-        *              The message of the exception
-        * @param cause
-        *              The cause of the exception
-        */
-       public ConnectException(String message, Throwable cause) {
-               super(message, cause);
-       }
-
-}
diff --git a/src/main/java/net/pterodactylus/sonitus/data/Connection.java b/src/main/java/net/pterodactylus/sonitus/data/Connection.java
deleted file mode 100644 (file)
index 169bfa5..0000000
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Sonitus - Connection.java - Copyright © 2013 David Roden
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
-package net.pterodactylus.sonitus.data;
-
-import java.io.IOException;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-/**
- * A connection reads bytes from a {@link Source} and feeds it to a sink. This
- * class is meant to be subclassed by each {@link Sink}, overriding the {@link
- * #feed(byte[])} method to actually feed the data into the sink. The {@link
- * #feed(byte[])} method is also responsible for blocking for an appropriate
- * amount of time; this method determines how fast a {@link Source} is
- * consumed.
- *
- * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
- */
-public abstract class Connection implements Runnable {
-
-       /** The logger. */
-       private static final Logger logger = Logger.getLogger(Connection.class.getName());
-
-       /** The source to consume. */
-       private final Source source;
-
-       /**
-        * Creates a new connection that will read from the given source.
-        *
-        * @param source
-        *              The source to read
-        */
-       public Connection(Source source) {
-               this.source = source;
-       }
-
-       //
-       // RUNNABLE METHODS
-       //
-
-       @Override
-       public void run() {
-               while (true) {
-                       byte[] buffer = null;
-                       try {
-                               buffer = source.get(bufferSize());
-                       } catch (IOException ioe1) {
-                               logger.log(Level.WARNING, "Source died!", ioe1);
-                               break;
-                       }
-                       try {
-                               feed(buffer);
-                       } catch (IOException ioe1) {
-                               logger.log(Level.WARNING, "Sink died!", ioe1);
-                               break;
-                       }
-               }
-               try {
-                       logger.info("Connection finished.");
-                       finish();
-               } catch (IOException ioe1) {
-                       /* well, what can we do? nothing. */
-               }
-       }
-
-       //
-       // SUBCLASS METHODS
-       //
-
-       /**
-        * Returns the number of bytes that will be requested from the source.
-        *
-        * @return The number of bytes that will be requested from the source
-        */
-       protected abstract int bufferSize();
-
-       /**
-        * Feeds the read data into the sink. The given buffer is always filled and
-        * never contains excess elements.
-        *
-        * @param buffer
-        *              The data
-        * @throws IOException
-        *              if an I/O error occurs
-        */
-       protected abstract void feed(byte[] buffer) throws IOException;
-
-       /**
-        * Notifies the sink that the source does not deliver any more data.
-        *
-        * @throws IOException
-        *              if an I/O error occurs
-        */
-       protected abstract void finish() throws IOException;
-
-}
index d0bad5d..c6fab7e 100644 (file)
@@ -18,7 +18,8 @@
 package net.pterodactylus.sonitus.data;
 
 /**
- * A filter processes an input to produce an output.
+ * A filter is both a {@link Source} and a {@link Sink}. It is used to process
+ * the audio date in whatever way seems appropriate.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
diff --git a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java
new file mode 100644 (file)
index 0000000..3ee0c9d
--- /dev/null
@@ -0,0 +1,300 @@
+/*
+ * Sonitus - Pipeline.java - Copyright © 2013 David Roden
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package net.pterodactylus.sonitus.data;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+
+/**
+ * A pipeline is responsible for streaming audio data from a {@link Source} to
+ * an arbitrary number of connected {@link Filter}s and {@link Sink}s.
+ *
+ * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+ */
+public class Pipeline {
+
+       /** The logger. */
+       private static final Logger logger = Logger.getLogger(Pipeline.class.getName());
+
+       /** The source of the audio stream. */
+       private final Source source;
+
+       /** The sinks for each source. */
+       private final Multimap<Source, Sink> sinks;
+
+       /** All started feeders. */
+       private final List<Feeder> feeders = Lists.newArrayList();
+
+       /**
+        * Creates a new pipeline.
+        *
+        * @param source
+        *              The source of the audio stream
+        * @param sinks
+        *              The sinks for each source
+        */
+       private Pipeline(Source source, Multimap<Source, Sink> sinks) {
+               this.source = Preconditions.checkNotNull(source, "source must not be null");
+               this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null");
+       }
+
+       //
+       // ACTIONS
+       //
+
+       /**
+        * Starts the pipeline.
+        *
+        * @throws IOException
+        *              if any of the sinks can not be opened
+        * @throws IllegalStateException
+        *              if the pipeline is already running
+        */
+       public void start() throws IOException, IllegalStateException {
+               if (!feeders.isEmpty()) {
+                       throw new IllegalStateException("Pipeline is already running!");
+               }
+               List<Source> sources = Lists.newArrayList();
+               sources.add(source);
+               /* collect all source->sink pairs. */
+               while (!sources.isEmpty()) {
+                       Source source = sources.remove(0);
+                       Collection<Sink> sinks = this.sinks.get(source);
+                       feeders.add(new Feeder(source, sinks));
+                       for (Sink sink : sinks) {
+                               sink.open(source.metadata());
+                               if (sink instanceof Filter) {
+                                       sources.add((Source) sink);
+                               }
+                       }
+               }
+               for (Feeder feeder : feeders) {
+                       logger.info(String.format("Starting Feeder from %s to %s.", feeder.source, feeder.sinks));
+                       new Thread(feeder).start();
+               }
+       }
+
+       public void stop() {
+               if (!feeders.isEmpty()) {
+                       /* pipeline is not running. */
+                       return;
+               }
+               for (Feeder feeder : feeders) {
+                       feeder.stop();
+               }
+       }
+
+       //
+       // STATIC METHODS
+       //
+
+       /**
+        * Returns a new pipeline builder.
+        *
+        * @param source
+        *              The source at which to start
+        * @return A builder for a new pipeline
+        */
+       public static Builder builder(Source source) {
+               return new Builder(source);
+       }
+
+       /**
+        * A builder for a {@link Pipeline}.
+        *
+        * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+        */
+       public static class Builder {
+
+               /** The source of the pipeline. */
+               private final Source source;
+
+               /** The sinks to which each source streams. */
+               private Multimap<Source, Sink> nextSinks = ArrayListMultimap.create();
+
+               /** The last added source. */
+               private Source lastSource;
+
+               /**
+                * Creates a new builder.
+                *
+                * @param source
+                *              The source that starts the pipeline
+                */
+               private Builder(Source source) {
+                       this.source = source;
+                       lastSource = source;
+               }
+
+               /**
+                * Adds a {@link Sink} (or {@link Filter} as a recipient for the last added
+                * {@link Source}.
+                *
+                * @param sink
+                *              The sink to add
+                * @return This builder
+                * @throws IllegalStateException
+                *              if the last added {@link Sink} was not also a {@link Source}
+                */
+               public Builder to(Sink sink) {
+                       Preconditions.checkState(lastSource != null, "last added Sink was not a Source");
+                       nextSinks.put(lastSource, sink);
+                       lastSource = (sink instanceof Filter) ? (Source) sink : null;
+                       return this;
+               }
+
+               /**
+                * Locates the given source and sets it as the last added node so that the
+                * next invocation of {@link #to(Sink)} can “fork” the pipeline.
+                *
+                * @param source
+                *              The source to locate
+                * @return This builder
+                * @throws IllegalStateException
+                *              if the given source was not previously added as a sink
+                */
+               public Builder find(Source source) {
+                       Preconditions.checkState(nextSinks.containsValue(source));
+                       lastSource = source;
+                       return this;
+               }
+
+               /**
+                * Builds the pipeline.
+                *
+                * @return The created pipeline
+                */
+               public Pipeline build() {
+                       return new Pipeline(source, ImmutableMultimap.copyOf(nextSinks));
+               }
+
+       }
+
+       /**
+        * A feeder is responsible for streaming audio from one {@link Source} to an
+        * arbitrary number of {@link Sink}s it is connected to. A feeder is started by
+        * creating a {@link Thread} wrapping it and starting said thread.
+        *
+        * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+        */
+       private class Feeder implements Runnable {
+
+               /** The source. */
+               private final Source source;
+
+               /** The sinks. */
+               private final Collection<Sink> sinks;
+
+               /** Whether the feeder was stopped. */
+               private final AtomicBoolean stopped = new AtomicBoolean(false);
+
+               /** The executor service. */
+               private final ExecutorService executorService;
+
+               /**
+                * Creates a new feeder.
+                *
+                * @param source
+                *              The source of the stream
+                * @param sinks
+                *              The sinks to which to stream
+                */
+               public Feeder(Source source, Collection<Sink> sinks) {
+                       this.source = source;
+                       this.sinks = sinks;
+                       if (sinks.size() == 1) {
+                               executorService = Executors.newSingleThreadExecutor();
+                       } else {
+                               executorService = Executors.newCachedThreadPool();
+                       }
+               }
+
+               //
+               // ACTIONS
+               //
+
+               /** Stops this feeder. */
+               public void stop() {
+                       stopped.set(true);
+               }
+
+               //
+               // RUNNABLE METHODS
+               //
+
+               @Override
+               public void run() {
+                       Metadata firstMetadata = source.metadata();
+                       while (!stopped.get()) {
+                               try {
+                                       final Metadata lastMetadata = firstMetadata;
+                                       final Metadata metadata = firstMetadata = source.metadata();
+                                       final byte[] buffer = source.get(4096);
+                                       List<Future<Void>> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function<Sink, Callable<Void>>() {
+
+                                               @Override
+                                               public Callable<Void> apply(final Sink sink) {
+                                                       return new Callable<Void>() {
+
+                                                               @Override
+                                                               public Void call() throws Exception {
+                                                                       if (!metadata.equals(lastMetadata)) {
+                                                                               sink.metadataUpdated(metadata);
+                                                                       }
+                                                                       sink.process(buffer);
+                                                                       return null;
+                                                               }
+                                                       };
+                                               }
+                                       }).toList());
+                                       /* check all threads for exceptions. */
+                                       for (Future<Void> future : futures) {
+                                               future.get();
+                                       }
+                               } catch (IOException e) {
+                                       /* TODO */
+                                       e.printStackTrace();
+                               } catch (InterruptedException e) {
+                                       /* TODO */
+                                       e.printStackTrace();
+                               } catch (ExecutionException e) {
+                                       /* TODO */
+                                       e.printStackTrace();
+                               }
+                       }
+               }
+
+       }
+
+}
diff --git a/src/main/java/net/pterodactylus/sonitus/data/ReusableSink.java b/src/main/java/net/pterodactylus/sonitus/data/ReusableSink.java
deleted file mode 100644 (file)
index 2d4f4bb..0000000
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Sonitus - MultiSourceSink.java - Copyright © 2013 David Roden
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
-package net.pterodactylus.sonitus.data;
-
-/**
- * Extension of the {@link Sink} interface that supports changing the source
- * without causing a reconnection in the sink.
- *
- * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
- */
-public interface ReusableSink extends Sink {
-
-}
index 0fbfb9e..09679c6 100644 (file)
@@ -1,43 +1,46 @@
-/*
- * Sonitus - Sink.java - Copyright © 2013 David Roden
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
 package net.pterodactylus.sonitus.data;
 
+import java.io.IOException;
+
 /**
- * A sink is an endpoint for an audio stream. It might be a file on disk, it can
- * be an audio output in the current system, or it can be sent to a remote
- * streaming server for broadcasting.
+ * A sink is a destination for audio data. It can be played on speakers, it can
+ * be written to a file, or it can be sent to a remote streaming server.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
 public interface Sink {
 
        /**
-        * Connects this sink to the given source.
+        * Opens this sink using the format parameters of the given metadata.
+        *
+        * @param metadata
+        *              The metadata of the stream
+        * @throws IOException
+        *              if an I/O error occurs
+        */
+       void open(Metadata metadata) throws IOException;
+
+       /** Closes this sink. */
+       void close();
+
+       /**
+        * Processes the given buffer of data.
         *
-        * @param source
-        *              The source to connect to
-        * @throws ConnectException
-        *              if the source can not be connected, e.g. due to a {@link Metadata}
-        *              mismatch
+        * @param buffer
+        *              The data to process
+        * @throws IOException
+        *              if an I/O error occurs
         */
-       void connect(Source source) throws ConnectException;
+       void process(byte[] buffer) throws IOException;
 
-       /** Notifies the sink that a source has updated its metadata. */
-       void metadataUpdated();
+       /**
+        * Notifies the sink that the metadata of the audio stream has changed. This
+        * method should return as fast as possible, i.e. every heavy lifting should be
+        * done from another thread.
+        *
+        * @param metadata
+        *              The new metadata
+        */
+       void metadataUpdated(Metadata metadata);
 
 }
index 7bb10bd..4e9895e 100644 (file)
@@ -1,55 +1,32 @@
-/*
- * Sonitus - Source.java - Copyright © 2013 David Roden
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
 package net.pterodactylus.sonitus.data;
 
-import java.io.EOFException;
 import java.io.IOException;
 
 /**
- * Defines an arbitrary media source. This can be almost anything; an MP3 file,
- * a FastTracker module, or a decoded WAVE file.
+ * A source produces an audio stream and accompanying metadata.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
 public interface Source {
 
        /**
-        * Returns the metadata of this source.
+        * Returns the metadata of the audio stream.
         *
-        * @return The metadata of this source
+        * @return The metadata of the audio stream
         */
        Metadata metadata();
 
        /**
-        * Retrieves the given name of bytes from this source. The source should always
-        * try to read as much data as was requested but is free to return a byte array
-        * with less elements that requested. However, the byte array will always be
-        * the same size as the data that was actually read, i.e. there are no excess
-        * elements in the returned array.
+        * Retrieves data from the audio stream.
         *
         * @param bufferSize
-        *              The size of the buffer
-        * @return A buffer that contains the read data
-        * @throws EOFException
-        *              if the end of the source was reached
+        *              The maximum amount of bytes to retrieve from the audio stream
+        * @return A buffer filled with up to {@code bufferSize} bytes of data; the
+        *         returned buffer may contain less data than requested but will not
+        *         contain excess elements
         * @throws IOException
         *              if an I/O error occurs
         */
-       byte[] get(int bufferSize) throws EOFException, IOException;
+       byte[] get(int bufferSize) throws IOException;
 
 }
diff --git a/src/main/java/net/pterodactylus/sonitus/data/filter/DummyFilter.java b/src/main/java/net/pterodactylus/sonitus/data/filter/DummyFilter.java
new file mode 100644 (file)
index 0000000..a3ce099
--- /dev/null
@@ -0,0 +1,126 @@
+/*
+ * Sonitus - AbstractFilter.java - Copyright © 2013 David Roden
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package net.pterodactylus.sonitus.data.filter;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Arrays;
+
+import net.pterodactylus.sonitus.data.Filter;
+import net.pterodactylus.sonitus.data.Metadata;
+
+import com.google.common.io.Closeables;
+
+/**
+ * Dummy {@link Filter} implementation that pipes its input to its output.
+ *
+ * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+ */
+public class DummyFilter implements Filter {
+
+       /** The input stream from which to read. */
+       private InputStream inputStream;
+
+       /** The output stream to which to write. */
+       private OutputStream outputStream;
+
+       /** The current metadata. */
+       private Metadata metadata;
+
+       //
+       // FILTER METHODS
+       //
+
+       @Override
+       public void open(Metadata metadata) throws IOException {
+               this.metadata = metadata;
+               inputStream = createInputStream();
+               outputStream = createOutputStream();
+       }
+
+       @Override
+       public void close() {
+               try {
+                       Closeables.close(outputStream, true);
+                       Closeables.close(inputStream, true);
+               } catch (IOException e) {
+                       /* won’t throw. */
+               }
+       }
+
+       @Override
+       public Metadata metadata() {
+               return metadata;
+       }
+
+       @Override
+       public void metadataUpdated(Metadata metadata) {
+               this.metadata = metadata;
+       }
+
+       @Override
+       public void process(byte[] buffer) throws IOException {
+               outputStream.write(buffer);
+               outputStream.flush();
+       }
+
+       @Override
+       public byte[] get(int bufferSize) throws IOException {
+               byte[] buffer = new byte[bufferSize];
+               int read = inputStream.read(buffer);
+               if (read == -1) {
+                       throw new EOFException();
+               }
+               return Arrays.copyOf(buffer, read);
+       }
+
+       //
+       // SUBCLASS METHODS
+       //
+
+       /**
+        * Creates the input stream from which {@link net.pterodactylus.sonitus.data.Pipeline}
+        * will read the audio data. If you override this, you have to override {@link
+        * #createOutputStream()}, too!
+        *
+        * @return The input stream to read from
+        * @throws IOException
+        *              if an I/O error occurs
+        */
+       protected InputStream createInputStream() throws IOException {
+               return new PipedInputStream();
+       }
+
+       /**
+        * Creates the output stream to which {@link net.pterodactylus.sonitus.data.Pipeline}
+        * will write the audio data. If you override this, you have to override {@link
+        * #createInputStream()}, too!
+        *
+        * @return The output stream to write to
+        * @throws IOException
+        *              if an I/O error occurs
+        */
+       protected OutputStream createOutputStream() throws IOException {
+               return new PipedOutputStream((PipedInputStream) inputStream);
+       }
+
+}
index 6b68c28..72c6177 100644 (file)
 
 package net.pterodactylus.sonitus.data.filter;
 
-import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.io.PipedOutputStream;
-import java.util.Arrays;
 import java.util.logging.Logger;
 
-import net.pterodactylus.sonitus.data.ConnectException;
-import net.pterodactylus.sonitus.data.Connection;
-import net.pterodactylus.sonitus.data.Filter;
 import net.pterodactylus.sonitus.data.Metadata;
-import net.pterodactylus.sonitus.data.Source;
 import net.pterodactylus.sonitus.io.InputStreamDrainer;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 
 /**
- * {@link Filter} implementation that runs its {@link Source} through an
- * external program.
+ * {@link net.pterodactylus.sonitus.data.Filter} implementation that runs its
+ * {@link net.pterodactylus.sonitus.data.Source} through an external program.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
-public abstract class ExternalFilter implements Filter {
+public abstract class ExternalFilter extends DummyFilter {
 
        /** The logger. */
        private final Logger logger = Logger.getLogger(getClass().getName());
 
-       /** The source. */
-       private Source source;
-
-       private InputStream processInputStream;
+       /** The external process. */
+       private Process process;
 
        //
        // FILTER METHODS
        //
 
        @Override
-       public Metadata metadata() {
-               return source.metadata();
+       public void open(Metadata metadata) throws IOException {
+               process = Runtime.getRuntime().exec(Iterables.toArray(ImmutableList.<String>builder().add(binary(metadata)).addAll(parameters(metadata)).build(), String.class));
+               InputStream processError = process.getErrorStream();
+               new Thread(new InputStreamDrainer(processError)).start();
+               super.open(metadata);
        }
 
        @Override
-       public byte[] get(int bufferSize) throws EOFException, IOException {
-               byte[] buffer = new byte[bufferSize];
-               int read = processInputStream.read(buffer);
-               if (read == -1) {
-                       throw new EOFException();
-               }
-               return Arrays.copyOf(buffer, read);
+       public void close() {
+               process.destroy();
        }
 
+       //
+       // DUMMYFILTER METHODS
+       //
+
        @Override
-       public void connect(Source source) throws ConnectException {
-               Preconditions.checkNotNull(source, "source must not be null");
-
-               this.source = source;
-               try {
-                       final Process process = Runtime.getRuntime().exec(Iterables.toArray(ImmutableList.<String>builder().add(binary(source.metadata())).addAll(parameters(source.metadata())).build(), String.class));
-                       processInputStream = process.getInputStream();
-                       final OutputStream processInput = process.getOutputStream();
-                       final InputStream processError = process.getErrorStream();
-                       final PipedOutputStream pipedOutputStream = new PipedOutputStream();
-                       new Thread(new InputStreamDrainer(processError)).start();
-                       new Thread(new Connection(source) {
-
-                               @Override
-                               protected int bufferSize() {
-                                       return 4096;
-                               }
-
-                               @Override
-                               protected void feed(byte[] buffer) throws IOException {
-                                       processInput.write(buffer);
-                                       processInput.flush();
-                               }
-
-                               @Override
-                               protected void finish() throws IOException {
-                                       processInput.close();
-                                       processError.close();
-                               }
-                       }).start();
-               } catch (IOException ioe1) {
-
-               }
+       protected InputStream createInputStream() throws IOException {
+               return process.getInputStream();
        }
 
        @Override
-       public void metadataUpdated() {
-               /* ignore. */
+       protected OutputStream createOutputStream() throws IOException {
+               return process.getOutputStream();
        }
 
        //
index cc52e41..2e80ec7 100644 (file)
 
 package net.pterodactylus.sonitus.data.filter;
 
-import static com.google.common.base.Preconditions.*;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.inject.internal.util.$Preconditions.checkState;
+
+import java.io.IOException;
 
-import net.pterodactylus.sonitus.data.ConnectException;
 import net.pterodactylus.sonitus.data.Metadata;
-import net.pterodactylus.sonitus.data.Source;
 
 /**
- * Basic {@link ExternalFilter} implementation that verifies that the connected
- * source is MP3-encoded and returns a PCM-encoded stream.
+ * Basic {@link net.pterodactylus.sonitus.data.filter.ExternalFilter}
+ * implementation that verifies that the connected source is MP3-encoded and
+ * returns a PCM-encoded stream.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
@@ -37,11 +39,11 @@ public abstract class ExternalMp3Decoder extends ExternalFilter {
        }
 
        @Override
-       public void connect(Source source) throws ConnectException {
-               checkNotNull(source, "source must not be null");
-               checkState(source.metadata().encoding().equalsIgnoreCase("MP3"), "source must be MP3-encoded");
+       public void open(Metadata metadata) throws IOException {
+               checkNotNull(metadata, "metadata must not be null");
+               checkState(metadata.encoding().equalsIgnoreCase("MP3"), "source must be MP3-encoded");
 
-               super.connect(source);
+               super.open(metadata);
        }
 
 }
index 1975e36..391e01b 100644 (file)
 
 package net.pterodactylus.sonitus.data.filter;
 
-import net.pterodactylus.sonitus.data.ConnectException;
-import net.pterodactylus.sonitus.data.Metadata;
-import net.pterodactylus.sonitus.data.Source;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.io.IOException;
 
-import com.google.common.base.Preconditions;
+import net.pterodactylus.sonitus.data.Metadata;
 
 /**
- * Basic {@link ExternalFilter} implementation that verifies that the connected
- * source is PCM-encoded and that returns an MP3-encoded metadata.
+ * Basic {@link net.pterodactylus.sonitus.data.filter.ExternalFilter}
+ * implementation that verifies that the connected source is PCM-encoded and
+ * that returns an MP3-encoded metadata.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
@@ -37,11 +39,11 @@ public abstract class ExternalMp3Encoder extends ExternalFilter {
        }
 
        @Override
-       public void connect(Source source) throws ConnectException {
-               Preconditions.checkNotNull(source, "source must not be null");
-               Preconditions.checkState(source.metadata().encoding().equalsIgnoreCase("PCM"), "source must be PCM-encoded");
+       public void open(Metadata metadata) throws IOException {
+               checkNotNull(metadata, "metadata must not be null");
+               checkState(metadata.encoding().equalsIgnoreCase("PCM"), "source must be PCM-encoded");
 
-               super.connect(source);
+               super.open(metadata);
        }
 
 }
diff --git a/src/main/java/net/pterodactylus/sonitus/data/filter/MultiSourceFilter.java b/src/main/java/net/pterodactylus/sonitus/data/filter/MultiSourceFilter.java
deleted file mode 100644 (file)
index d62ba97..0000000
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Sonitus - MultiSource.java - Copyright © 2013 David Roden
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
-package net.pterodactylus.sonitus.data.filter;
-
-import static com.google.common.base.Preconditions.*;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Logger;
-
-import net.pterodactylus.sonitus.data.ConnectException;
-import net.pterodactylus.sonitus.data.Filter;
-import net.pterodactylus.sonitus.data.Metadata;
-import net.pterodactylus.sonitus.data.ReusableSink;
-import net.pterodactylus.sonitus.data.Source;
-import net.pterodactylus.sonitus.data.event.SourceFinishedEvent;
-
-import com.google.common.eventbus.EventBus;
-import com.google.inject.Inject;
-
-/**
- * {@link ReusableSink} implementation that supports changing the source without
- * letting the {@link net.pterodactylus.sonitus.data.Sink} know.
- *
- * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
- */
-public class MultiSourceFilter implements Filter, ReusableSink {
-
-       /** The logger. */
-       private static final Logger logger = Logger.getLogger(MultiSourceFilter.class.getName());
-
-       /** The event bus. */
-       private final EventBus eventBus;
-
-       /** The current source. */
-       private final AtomicReference<Source> source = new AtomicReference<Source>();
-
-       @Inject
-       public MultiSourceFilter(EventBus eventBus) {
-               this.eventBus = eventBus;
-       }
-
-       @Override
-       public Metadata metadata() {
-               return source.get().metadata();
-       }
-
-       @Override
-       public byte[] get(int bufferSize) throws EOFException, IOException {
-               try {
-                       return source.get().get(bufferSize);
-               } catch (EOFException eofe1) {
-                       eventBus.post(new SourceFinishedEvent(source.get()));
-                       throw eofe1;
-               }
-       }
-
-       @Override
-       public void connect(Source source) throws ConnectException {
-               checkNotNull(source, "source must not be null");
-
-               Source oldSource = this.source.getAndSet(source);
-               if (oldSource != null) {
-                       checkArgument(oldSource.metadata().channels() == source.metadata().channels(), "source’s channel count must equal existing source’s channel count");
-                       checkArgument(oldSource.metadata().frequency() == source.metadata().frequency(), "source’s frequency must equal existing source’s frequency");
-                       checkArgument(oldSource.metadata().encoding().equalsIgnoreCase(source.metadata().encoding()), "source’s encoding must equal existing source’s encoding");
-               }
-       }
-
-       @Override
-       public void metadataUpdated() {
-               /* ignore. */
-       }
-
-}
index 2cd9035..5ea2d48 100644 (file)
 
 package net.pterodactylus.sonitus.data.filter;
 
-import net.pterodactylus.sonitus.data.ConnectException;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+
 import net.pterodactylus.sonitus.data.Metadata;
-import net.pterodactylus.sonitus.data.Source;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
 /**
@@ -70,11 +72,11 @@ public class OggVorbisDecoder extends ExternalFilter {
        }
 
        @Override
-       public void connect(Source source) throws ConnectException {
-               Preconditions.checkNotNull(source, "source must not be null");
-               Preconditions.checkArgument(source.metadata().encoding().equalsIgnoreCase("Vorbis"), "source must be Vorbis-encoded");
+       public void open(Metadata metadata) throws IOException {
+               checkNotNull(metadata, "metadata must not be null");
+               checkArgument(metadata.encoding().equalsIgnoreCase("Vorbis"), "source must be Vorbis-encoded");
 
-               super.connect(source);
+               super.open(metadata);
        }
 
        //
diff --git a/src/main/java/net/pterodactylus/sonitus/data/filter/PredicateFilter.java b/src/main/java/net/pterodactylus/sonitus/data/filter/PredicateFilter.java
new file mode 100644 (file)
index 0000000..1e8c7fa
--- /dev/null
@@ -0,0 +1,122 @@
+/*
+ * Sonitus - PredicateFilter.java - Copyright © 2013 David Roden
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package net.pterodactylus.sonitus.data.filter;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import net.pterodactylus.sonitus.data.Filter;
+import net.pterodactylus.sonitus.data.Metadata;
+
+import com.google.common.base.Predicate;
+
+/**
+ * {@link Filter} implementation that uses a {@link Predicate} to determine
+ * whether a filter will be used or the data will only be passed through.
+ *
+ * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+ */
+public class PredicateFilter extends DummyFilter {
+
+       /** The predicate. */
+       private final Predicate<Metadata> metadataPredicate;
+
+       /** The filter to use if the predicate matches. */
+       private final Filter filter;
+
+       /** Whether the predicate currently matches. */
+       private final AtomicBoolean metadataMatches = new AtomicBoolean(false);
+
+       /**
+        * Creates a new predicate filter.
+        *
+        * @param metadataPredicate
+        *              The predicate to evaluate every time the metadata changes
+        * @param filter
+        *              The filter to use if the predicate matches the metadata
+        */
+       public PredicateFilter(Predicate<Metadata> metadataPredicate, Filter filter) {
+               this.metadataPredicate = metadataPredicate;
+               this.filter = filter;
+       }
+
+       //
+       // FILTER METHODS
+       //
+
+       @Override
+       public void open(Metadata metadata) throws IOException {
+               checkNotNull(metadata, "metadata must not be null");
+
+               metadataMatches.set(metadataPredicate.apply(metadata));
+               if (metadataMatches.get()) {
+                       filter.open(metadata);
+               } else {
+                       super.open(metadata);
+               }
+       }
+
+       @Override
+       public void close() {
+               if (metadataMatches.get()) {
+                       filter.close();
+               } else {
+                       super.close();
+               }
+       }
+
+       @Override
+       public void metadataUpdated(Metadata metadata) {
+               metadataMatches.set(metadataPredicate.apply(metadata));
+               if (metadataMatches.get()) {
+                       filter.metadataUpdated(metadata);
+               } else {
+                       super.metadataUpdated(metadata);
+               }
+       }
+
+       @Override
+       public void process(byte[] buffer) throws IOException {
+               if (metadataMatches.get()) {
+                       filter.process(buffer);
+               } else {
+                       super.process(buffer);
+               }
+       }
+
+       @Override
+       public Metadata metadata() {
+               if (metadataMatches.get()) {
+                       return filter.metadata();
+               } else {
+                       return super.metadata();
+               }
+       }
+
+       @Override
+       public byte[] get(int bufferSize) throws IOException {
+               if (metadataMatches.get()) {
+                       return filter.get(bufferSize);
+               } else {
+                       return super.get(bufferSize);
+               }
+       }
+
+}
index 7ee4116..d513df6 100644 (file)
 
 package net.pterodactylus.sonitus.data.filter;
 
-import java.io.EOFException;
 import java.io.IOException;
 import java.util.logging.Logger;
 
-import net.pterodactylus.sonitus.data.ConnectException;
-import net.pterodactylus.sonitus.data.Filter;
 import net.pterodactylus.sonitus.data.Metadata;
-import net.pterodactylus.sonitus.data.Source;
-
-import com.google.common.base.Preconditions;
 
 /**
  * Rate limiting filter that only passes a specified amount of data per second
- * from its {@link Source} to its {@link net.pterodactylus.sonitus.data.Sink}.
+ * from its {@link net.pterodactylus.sonitus.data.Source} to its {@link
+ * net.pterodactylus.sonitus.data.Sink}.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
-public class RateLimitingFilter implements Filter {
+public class RateLimitingFilter extends DummyFilter {
 
        /** The logger. */
        private static final Logger logger = Logger.getLogger(RateLimitingFilter.class.getName());
@@ -42,14 +37,11 @@ public class RateLimitingFilter implements Filter {
        /** The limiting rate in bytes/second. */
        private final int rate;
 
-       /** The fast start time. */
-       private final long fastStartTime;
-
-       /** The source. */
-       private Source source;
+       /** The start time. */
+       private long startTime;
 
-       /** The remaining fast start time. */
-       private long remainingFastStartTime;
+       /** The number of bytes. */
+       private long counter;
 
        /**
         * Creates a new rate limiting filter.
@@ -72,8 +64,7 @@ public class RateLimitingFilter implements Filter {
         */
        public RateLimitingFilter(int rate, long fastStartTime) {
                this.rate = rate;
-               this.fastStartTime = fastStartTime;
-               remainingFastStartTime = fastStartTime;
+               this.counter = (long) (-rate * (fastStartTime / 1000.0));
        }
 
        //
@@ -81,39 +72,27 @@ public class RateLimitingFilter implements Filter {
        //
 
        @Override
-       public Metadata metadata() {
-               return source.metadata();
+       public void open(Metadata metadata) throws IOException {
+               super.open(metadata);
+               startTime = System.currentTimeMillis();
        }
 
        @Override
-       public byte[] get(int bufferSize) throws EOFException, IOException {
-               long now = System.currentTimeMillis();
-               byte[] buffer = source.get(bufferSize);
+       public void process(byte[] buffer) throws IOException {
+               super.process(buffer);
                /* delay. */
-               long waitTime = 1000 * buffer.length / rate;
-               remainingFastStartTime = Math.max(remainingFastStartTime - waitTime, 0);
-               while ((remainingFastStartTime == 0) && (System.currentTimeMillis() - now) < waitTime) {
+               counter += buffer.length;
+               long waitTime = (long) (counter / (rate / 1000.0));
+               while ((System.currentTimeMillis() - startTime) < waitTime) {
                        try {
-                               long limitDelay = waitTime - (System.currentTimeMillis() - now);
+                               long limitDelay = waitTime - (System.currentTimeMillis() - startTime);
                                logger.finest(String.format("Waiting %d ms...", limitDelay));
                                Thread.sleep(limitDelay);
                        } catch (InterruptedException ie1) {
                                /* ignore, keep looping. */
                        }
                }
-               return buffer;
-       }
-
-       @Override
-       public void connect(Source source) throws ConnectException {
-               Preconditions.checkNotNull(source, "source must not be null");
-
-               this.source = source;
-       }
-
-       @Override
-       public void metadataUpdated() {
-               /* ignore. */
+               logger.finest(String.format("Processed %d Bytes during %d ms, that’s %.1f bytes/s.", counter, System.currentTimeMillis() - startTime, counter / ((System.currentTimeMillis() - startTime) / 1000.0)));
        }
 
 }
index 69daa99..a5061fb 100644 (file)
 
 package net.pterodactylus.sonitus.data.filter;
 
-import static com.google.common.base.Preconditions.*;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.IOException;
 
-import net.pterodactylus.sonitus.data.ConnectException;
 import net.pterodactylus.sonitus.data.Metadata;
-import net.pterodactylus.sonitus.data.Source;
 
 import com.google.common.collect.ImmutableList;
 
@@ -62,11 +63,11 @@ public class SoxResampleFilter extends ExternalFilter {
        }
 
        @Override
-       public void connect(Source source) throws ConnectException {
-               checkNotNull(source, "source must not be null");
-               checkArgument(source.metadata().encoding().equalsIgnoreCase("PCM"), "source must be PCM-encoded");
+       public void open(Metadata metadata) throws IOException {
+               checkNotNull(metadata, "metadata must not be null");
+               checkArgument(metadata.encoding().equalsIgnoreCase("PCM"), "source must be PCM-encoded");
 
-               super.connect(source);
+               super.open(metadata);
        }
 
        //
index f29bde8..689c23d 100644 (file)
 
 package net.pterodactylus.sonitus.data.sink;
 
-import static com.google.common.base.Preconditions.*;
-
+import java.io.IOException;
 import java.util.logging.Logger;
 import javax.sound.sampled.AudioFormat;
 import javax.sound.sampled.AudioSystem;
 import javax.sound.sampled.LineUnavailableException;
 import javax.sound.sampled.SourceDataLine;
 
-import net.pterodactylus.sonitus.data.ConnectException;
-import net.pterodactylus.sonitus.data.Connection;
 import net.pterodactylus.sonitus.data.Metadata;
 import net.pterodactylus.sonitus.data.Sink;
-import net.pterodactylus.sonitus.data.Source;
+
+import com.google.common.base.Preconditions;
 
 /**
- * {@link Sink} implementation that uses the JDK’s {@link
- * javax.sound.sampled.AudioSystem} to play all {@link net.pterodactylus.sonitus.data.Source}s.
+ * {@link net.pterodactylus.sonitus.data.Sink} implementation that uses the
+ * JDK’s {@link javax.sound.sampled.AudioSystem} to play all {@link
+ * net.pterodactylus.sonitus.data.Source}s.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
@@ -42,47 +41,41 @@ public class AudioSink implements Sink {
        /** The logger. */
        private static final Logger logger = Logger.getLogger(AudioSink.class.getName());
 
-       /** The current source. */
-       private Source source;
+       /** The current metadata. */
+       private Metadata metadata;
 
-       @Override
-       public void connect(Source source) throws ConnectException {
-               this.source = checkNotNull(source, "source must not be null");
-               checkState(source.metadata().encoding().equalsIgnoreCase("PCM"), "source must be PCM-encoded");
+       /** The audio output. */
+       private SourceDataLine sourceDataLine;
 
-               final Metadata sourceMetadata = source.metadata();
-               AudioFormat audioFormat = new AudioFormat(sourceMetadata.frequency(), 16, sourceMetadata.channels(), true, false);
+       @Override
+       public void open(Metadata metadata) throws IOException {
+               Preconditions.checkArgument(metadata.encoding().equalsIgnoreCase("PCM"), "source must be PCM-encoded");
+               AudioFormat audioFormat = new AudioFormat(metadata.frequency(), 16, metadata.channels(), true, false);
                try {
-                       final SourceDataLine sourceDataLine = AudioSystem.getSourceDataLine(audioFormat);
+                       sourceDataLine = AudioSystem.getSourceDataLine(audioFormat);
                        sourceDataLine.open(audioFormat);
                        sourceDataLine.start();
-                       new Thread(new Connection(source) {
-
-                               @Override
-                               protected int bufferSize() {
-                                       return sourceMetadata.channels() * sourceMetadata.frequency() * 2;
-                               }
-
-                               @Override
-                               protected void feed(byte[] buffer) {
-                                       sourceDataLine.write(buffer, 0, buffer.length);
-                                       logger.finest(String.format("AudioSink: Wrote %d Bytes.", buffer.length));
-                               }
-
-                               @Override
-                               protected void finish() {
-                                       sourceDataLine.stop();
-                               }
-                       }).start();
-                       metadataUpdated();
-               } catch (LineUnavailableException lue1) {
-                       throw new ConnectException(lue1);
+               } catch (LineUnavailableException e) {
+                       /* TODO */
+                       throw new IOException(e);
                }
        }
 
        @Override
-       public void metadataUpdated() {
+       public void close() {
+               sourceDataLine.stop();
+               sourceDataLine.close();
+       }
+
+       @Override
+       public void metadataUpdated(Metadata metadata) {
                /* ignore. */
        }
 
+       @Override
+       public void process(byte[] buffer) {
+               sourceDataLine.write(buffer, 0, buffer.length);
+               logger.finest(String.format("AudioSink: Wrote %d Bytes.", buffer.length));
+       }
+
 }
index 61e0a27..7210f71 100644 (file)
 
 package net.pterodactylus.sonitus.data.sink;
 
-import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.logging.Logger;
 
-import net.pterodactylus.sonitus.data.ConnectException;
-import net.pterodactylus.sonitus.data.Connection;
+import net.pterodactylus.sonitus.data.Metadata;
 import net.pterodactylus.sonitus.data.Sink;
-import net.pterodactylus.sonitus.data.Source;
-
-import com.google.common.base.Preconditions;
 
 /**
- * {@link Sink} that writes all received data into a file.
+ * {@link net.pterodactylus.sonitus.data.Sink} that writes all received data
+ * into a file.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
@@ -42,6 +38,8 @@ public class FileSink implements Sink {
        /** The path of the file to write to. */
        private final String path;
 
+       private FileOutputStream fileOutputStream;
+
        /**
         * Creates a new file sink that will write to the given path.
         *
@@ -53,37 +51,28 @@ public class FileSink implements Sink {
        }
 
        @Override
-       public void connect(Source source) throws ConnectException {
-               Preconditions.checkNotNull(source, "source must not be null");
+       public void open(Metadata metadata) throws IOException {
+               fileOutputStream = new FileOutputStream(path);
+       }
 
+       @Override
+       public void close() {
                try {
-                       final FileOutputStream fileOutputStream = new FileOutputStream(path);
-                       new Thread(new Connection(source) {
-
-                               @Override
-                               protected int bufferSize() {
-                                       return 65536;
-                               }
-
-                               @Override
-                               protected void feed(byte[] buffer) throws IOException {
-                                       fileOutputStream.write(buffer);
-                                       logger.finest(String.format("FileSink: Wrote %d Bytes.", buffer.length));
-                               }
-
-                               @Override
-                               protected void finish() throws IOException {
-                                       fileOutputStream.close();
-                               }
-                       }).start();
-               } catch (FileNotFoundException fnfe1) {
-                       throw new ConnectException(fnfe1);
+                       fileOutputStream.close();
+               } catch (IOException e) {
+                       /* ignore. */
                }
        }
 
        @Override
-       public void metadataUpdated() {
+       public void metadataUpdated(Metadata metadata) {
                /* ignore. */
        }
 
+       @Override
+       public void process(byte[] buffer) throws IOException {
+               fileOutputStream.write(buffer);
+               logger.finest(String.format("FileSink: Wrote %d Bytes.", buffer.length));
+       }
+
 }
index 0b55049..0a5d22d 100644 (file)
@@ -17,8 +17,6 @@
 
 package net.pterodactylus.sonitus.data.sink;
 
-import static com.google.common.base.Preconditions.*;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -29,11 +27,8 @@ import java.util.Arrays;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import net.pterodactylus.sonitus.data.ConnectException;
-import net.pterodactylus.sonitus.data.Connection;
 import net.pterodactylus.sonitus.data.Metadata;
 import net.pterodactylus.sonitus.data.Sink;
-import net.pterodactylus.sonitus.data.Source;
 import net.pterodactylus.sonitus.io.InputStreamDrainer;
 
 import com.google.common.base.Function;
@@ -44,8 +39,8 @@ import com.google.common.io.BaseEncoding;
 import com.google.common.io.Closeables;
 
 /**
- * {@link Sink} implementation that delivers all incoming data to an Icecast2
- * server.
+ * {@link net.pterodactylus.sonitus.data.Sink} implementation that delivers all
+ * incoming data to an Icecast2 server.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
@@ -78,8 +73,7 @@ public class Icecast2Sink implements Sink {
        /** Whether to publish the server. */
        private final boolean publishServer;
 
-       /** The connected source. */
-       private Source source;
+       private OutputStream socketOutputStream;
 
        /**
         * Creates a new Icecast2 sink.
@@ -118,97 +112,85 @@ public class Icecast2Sink implements Sink {
        //
 
        @Override
-       public void connect(Source source) throws ConnectException {
-               checkNotNull(source, "source must not be null");
+       public void open(Metadata metadata) throws IOException {
+               logger.info(String.format("Connecting to %s:%d...", server, port));
+               Socket socket = new Socket(server, port);
+               logger.info("Connected.");
+               socketOutputStream = socket.getOutputStream();
+               InputStream socketInputStream = socket.getInputStream();
+
+               sendLine(socketOutputStream, String.format("SOURCE /%s ICE/1.0", mountPoint));
+               sendLine(socketOutputStream, String.format("Authorization: Basic %s", generatePassword(password)));
+               sendLine(socketOutputStream, String.format("Content-Type: %s", getContentType(metadata)));
+               sendLine(socketOutputStream, String.format("ICE-Name: %s", serverName));
+               sendLine(socketOutputStream, String.format("ICE-Description: %s", serverDescription));
+               sendLine(socketOutputStream, String.format("ICE-Genre: %s", genre));
+               sendLine(socketOutputStream, String.format("ICE-Public: %d", publishServer ? 1 : 0));
+               sendLine(socketOutputStream, "");
+               socketOutputStream.flush();
+
+               new Thread(new InputStreamDrainer(socketInputStream)).start();
+
+               metadataUpdated(metadata);
+       }
 
-               this.source = source;
+       @Override
+       public void close() {
                try {
-                       logger.info(String.format("Connecting to %s:%d...", server, port));
-                       final Socket socket = new Socket(server, port);
-                       logger.info("Connected.");
-                       final OutputStream socketOutputStream = socket.getOutputStream();
-                       final InputStream socketInputStream = socket.getInputStream();
-
-                       sendLine(socketOutputStream, String.format("SOURCE /%s ICE/1.0", mountPoint));
-                       sendLine(socketOutputStream, String.format("Authorization: Basic %s", generatePassword(password)));
-                       sendLine(socketOutputStream, String.format("Content-Type: %s", getContentType(source.metadata())));
-                       sendLine(socketOutputStream, String.format("ICE-Name: %s", serverName));
-                       sendLine(socketOutputStream, String.format("ICE-Description: %s", serverDescription));
-                       sendLine(socketOutputStream, String.format("ICE-Genre: %s", genre));
-                       sendLine(socketOutputStream, String.format("ICE-Public: %d", publishServer ? 1 : 0));
-                       sendLine(socketOutputStream, "");
-                       socketOutputStream.flush();
-
-                       new Thread(new InputStreamDrainer(socketInputStream)).start();
-                       new Thread(new Connection(source) {
-
-                               private long counter;
-
-                               @Override
-                               protected int bufferSize() {
-                                       return 4096;
-                               }
-
-                               @Override
-                               protected void feed(byte[] buffer) throws IOException {
-                                       socketOutputStream.write(buffer);
-                                       socketOutputStream.flush();
-                                       counter += buffer.length;
-                                       logger.finest(String.format("Wrote %d Bytes.", counter));
-                               }
-
-                               @Override
-                               protected void finish() throws IOException {
-                                       Closeables.close(socketOutputStream, true);
-                                       if (socket != null) {
-                                               socket.close();
-                                       }
-                               }
-                       }).start();
-
-                       metadataUpdated();
-               } catch (IOException ioe1) {
-                       throw new ConnectException(ioe1);
+                       Closeables.close(socketOutputStream, true);
+               } catch (IOException e) {
+                       /* will never throw. */
                }
        }
 
        @Override
-       public void metadataUpdated() {
-               Metadata metadata = source.metadata();
-               String metadataString = String.format("%s (%s)", Joiner.on(" - ").skipNulls().join(FluentIterable.from(Arrays.asList(metadata.artist(), metadata.name())).transform(new Function<Optional<String>, Object>() {
+       public void metadataUpdated(final Metadata metadata) {
+               new Thread(new Runnable() {
 
                        @Override
-                       public Object apply(Optional<String> input) {
-                               return input.orNull();
-                       }
-               })), "Sonitus");
-               logger.info(String.format("Updating metadata to %s", metadataString));
+                       public void run() {
+                               String metadataString = String.format("%s (%s)", Joiner.on(" - ").skipNulls().join(FluentIterable.from(Arrays.asList(metadata.artist(), metadata.name())).transform(new Function<Optional<String>, Object>() {
 
-               Socket socket = null;
-               OutputStream socketOutputStream = null;
-               try {
-                       socket = new Socket(server, port);
-                       socketOutputStream = socket.getOutputStream();
-
-                       sendLine(socketOutputStream, String.format("GET /admin/metadata?pass=%s&mode=updinfo&mount=/%s&song=%s HTTP/1.0", password, mountPoint, URLEncoder.encode(metadataString, "UTF-8")));
-                       sendLine(socketOutputStream, String.format("Authorization: Basic %s", generatePassword(password)));
-                       sendLine(socketOutputStream, String.format("User-Agent: Mozilla/Sonitus"));
-                       sendLine(socketOutputStream, "");
-                       socketOutputStream.flush();
-
-                       new InputStreamDrainer(socket.getInputStream()).run();
-               } catch (IOException ioe1) {
-                       logger.log(Level.WARNING, "Could not update metadata!", ioe1);
-               } finally {
-                       try {
-                               Closeables.close(socketOutputStream, true);
-                               if (socket != null) {
-                                       socket.close();
+                                       @Override
+                                       public Object apply(Optional<String> input) {
+                                               return input.orNull();
+                                       }
+                               })), "Sonitus");
+                               logger.info(String.format("Updating metadata to %s", metadataString));
+
+                               Socket socket = null;
+                               OutputStream socketOutputStream = null;
+                               try {
+                                       socket = new Socket(server, port);
+                                       socketOutputStream = socket.getOutputStream();
+
+                                       sendLine(socketOutputStream, String.format("GET /admin/metadata?pass=%s&mode=updinfo&mount=/%s&song=%s HTTP/1.0", password, mountPoint, URLEncoder.encode(metadataString, "UTF-8")));
+                                       sendLine(socketOutputStream, String.format("Authorization: Basic %s", generatePassword(password)));
+                                       sendLine(socketOutputStream, String.format("User-Agent: Mozilla/Sonitus"));
+                                       sendLine(socketOutputStream, "");
+                                       socketOutputStream.flush();
+
+                                       new InputStreamDrainer(socket.getInputStream()).run();
+                               } catch (IOException ioe1) {
+                                       logger.log(Level.WARNING, "Could not update metadata!", ioe1);
+                               } finally {
+                                       try {
+                                               Closeables.close(socketOutputStream, true);
+                                               if (socket != null) {
+                                                       socket.close();
+                                               }
+                                       } catch (IOException ioe1) {
+                                               /* ignore. */
+                                       }
                                }
-                       } catch (IOException ioe1) {
-                               /* ignore. */
                        }
-               }
+               }).start();
+       }
+
+       @Override
+       public void process(byte[] buffer) throws IOException {
+               socketOutputStream.write(buffer);
+               socketOutputStream.flush();
        }
 
        //
@@ -223,7 +205,7 @@ public class Icecast2Sink implements Sink {
         *              The output stream to send the line to
         * @param line
         *              The line to send
-        * @throws IOException
+        * @throws java.io.IOException
         *              if an I/O error occurs
         */
        private static void sendLine(OutputStream outputStream, String line) throws IOException {
@@ -237,7 +219,7 @@ public class Icecast2Sink implements Sink {
         * @param password
         *              The password to encode
         * @return The encoded password
-        * @throws UnsupportedEncodingException
+        * @throws java.io.UnsupportedEncodingException
         *              if the UTF-8 encoding is not supported (which can never happen)
         */
        private static String generatePassword(String password) throws UnsupportedEncodingException {
index 010eeb9..3d6e871 100644 (file)
@@ -17,7 +17,7 @@
 
 package net.pterodactylus.sonitus.data.source;
 
-import static com.google.common.base.Preconditions.*;
+import static com.google.common.base.Preconditions.checkNotNull;
 import static net.pterodactylus.sonitus.data.Metadata.UNKNOWN_CHANNELS;
 import static net.pterodactylus.sonitus.data.Metadata.UNKNOWN_ENCODING;
 import static net.pterodactylus.sonitus.data.Metadata.UNKNOWN_FREQUENCY;
@@ -33,10 +33,10 @@ import net.pterodactylus.sonitus.data.Source;
 import net.pterodactylus.sonitus.io.IdentifyingInputStream;
 
 import com.google.common.base.Optional;
-import com.google.common.io.ByteStreams;
 
 /**
- * A {@link Source} that is read from the local file system.
+ * A {@link net.pterodactylus.sonitus.data.Source} that is read from the local
+ * file system.
  *
  * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
  */
@@ -56,7 +56,7 @@ public class FileSource implements Source {
         *
         * @param path
         *              The path of the file
-        * @throws IOException
+        * @throws java.io.IOException
         *              if the file can not be found, or an I/O error occurs
         */
        public FileSource(String path) throws IOException {
@@ -78,20 +78,20 @@ public class FileSource implements Source {
        //
 
        @Override
-       public Metadata metadata() {
-               return metadata;
-       }
-
-       @Override
        public byte[] get(int bufferSize) throws IOException {
                byte[] buffer = new byte[bufferSize];
-               int read = ByteStreams.read(fileInputStream, buffer, 0, bufferSize);
-               if (read == 0) {
+               int read = fileInputStream.read(buffer);
+               if (read == -1) {
                        throw new EOFException();
                }
                return Arrays.copyOf(buffer, read);
        }
 
+       @Override
+       public Metadata metadata() {
+               return metadata;
+       }
+
        //
        // OBJECT METHODS
        //
diff --git a/src/main/java/net/pterodactylus/sonitus/data/source/MultiSource.java b/src/main/java/net/pterodactylus/sonitus/data/source/MultiSource.java
new file mode 100644 (file)
index 0000000..b5705d5
--- /dev/null
@@ -0,0 +1,118 @@
+/*
+ * Sonitus - MultiSource.java - Copyright © 2013 David Roden
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+package net.pterodactylus.sonitus.data.source;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+
+import net.pterodactylus.sonitus.data.Metadata;
+import net.pterodactylus.sonitus.data.Source;
+import net.pterodactylus.sonitus.data.event.SourceFinishedEvent;
+
+import com.google.common.eventbus.EventBus;
+import com.google.inject.Inject;
+
+/**
+ * {@link Source} implementation that simply forwards another source and
+ * supports changing the source without letting the {@link
+ * net.pterodactylus.sonitus.data.Sink} know.
+ *
+ * @author <a href="mailto:bombe@pterodactylus.net">David ‘Bombe’ Roden</a>
+ */
+public class MultiSource implements Source {
+
+       /** The logger. */
+       private static final Logger logger = Logger.getLogger(MultiSource.class.getName());
+
+       /** The event bus. */
+       private final EventBus eventBus;
+
+       /** The current source. */
+       private final AtomicReference<Source> source = new AtomicReference<Source>();
+
+       /** Whether the source was changed. */
+       private boolean sourceChanged;
+
+       @Inject
+       public MultiSource(EventBus eventBus) {
+               this.eventBus = eventBus;
+       }
+
+       //
+       // ACTIONS
+       //
+
+       /**
+        * Sets the new source to use.
+        *
+        * @param source
+        *              The new source to use
+        */
+       public void setSource(Source source) {
+               checkNotNull(source, "source must not be null");
+
+               Source oldSource = this.source.getAndSet(source);
+               if (oldSource != null) {
+                       synchronized (this.source) {
+                               sourceChanged = true;
+                               this.source.notifyAll();
+                       }
+                       logger.info(String.format("Next Source set: %s", source));
+               }
+       }
+
+       //
+       // SOURCE METHODS
+       //
+
+       @Override
+       public Metadata metadata() {
+               return source.get().metadata();
+       }
+
+       @Override
+       public byte[] get(int bufferSize) throws EOFException, IOException {
+               while (true) {
+                       try {
+                               return source.get().get(bufferSize);
+                       } catch (EOFException eofe1) {
+                               eventBus.post(new SourceFinishedEvent(source.get()));
+                               synchronized (source) {
+                                       while (!sourceChanged) {
+                                               try {
+                                                       logger.info("Waiting for next Source...");
+                                                       source.wait();
+                                                       logger.info("Was notified.");
+                                               } catch (InterruptedException ioe1) {
+                                                       /* ignore: we’ll end up here again if we were interrupted. */
+                                               }
+                                       }
+                               }
+                       } finally {
+                               synchronized (source) {
+                                       sourceChanged = false;
+                               }
+                       }
+               }
+       }
+
+}