From 7188da95cfb6dc2bf140eb8ac7e4dc99a0761a97 Mon Sep 17 00:00:00 2001 From: =?utf8?q?David=20=E2=80=98Bombe=E2=80=99=20Roden?= Date: Tue, 19 Mar 2013 08:21:16 +0100 Subject: [PATCH] =?utf8?q?Don=E2=80=99t=20connect=20sources=20and=20sinks?= =?utf8?q?=20directly,=20use=20a=20pipeline=20to=20move=20data=20around.?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../sonitus/data/ConnectException.java | 65 ----- .../net/pterodactylus/sonitus/data/Connection.java | 111 -------- .../net/pterodactylus/sonitus/data/Filter.java | 3 +- .../net/pterodactylus/sonitus/data/Pipeline.java | 300 +++++++++++++++++++++ .../pterodactylus/sonitus/data/ReusableSink.java | 28 -- .../java/net/pterodactylus/sonitus/data/Sink.java | 61 +++-- .../net/pterodactylus/sonitus/data/Source.java | 41 +-- .../sonitus/data/filter/DummyFilter.java | 126 +++++++++ .../sonitus/data/filter/ExternalFilter.java | 79 ++---- .../sonitus/data/filter/ExternalMp3Decoder.java | 20 +- .../sonitus/data/filter/ExternalMp3Encoder.java | 22 +- .../sonitus/data/filter/MultiSourceFilter.java | 91 ------- .../sonitus/data/filter/OggVorbisDecoder.java | 16 +- .../sonitus/data/filter/PredicateFilter.java | 122 +++++++++ .../sonitus/data/filter/RateLimitingFilter.java | 57 ++-- .../sonitus/data/filter/SoxResampleFilter.java | 15 +- .../pterodactylus/sonitus/data/sink/AudioSink.java | 69 +++-- .../pterodactylus/sonitus/data/sink/FileSink.java | 51 ++-- .../sonitus/data/sink/Icecast2Sink.java | 164 +++++------ .../sonitus/data/source/FileSource.java | 22 +- .../sonitus/data/source/MultiSource.java | 118 ++++++++ 21 files changed, 922 insertions(+), 659 deletions(-) delete mode 100644 src/main/java/net/pterodactylus/sonitus/data/ConnectException.java delete mode 100644 src/main/java/net/pterodactylus/sonitus/data/Connection.java create mode 100644 src/main/java/net/pterodactylus/sonitus/data/Pipeline.java delete mode 100644 src/main/java/net/pterodactylus/sonitus/data/ReusableSink.java create mode 100644 src/main/java/net/pterodactylus/sonitus/data/filter/DummyFilter.java delete mode 100644 src/main/java/net/pterodactylus/sonitus/data/filter/MultiSourceFilter.java create mode 100644 src/main/java/net/pterodactylus/sonitus/data/filter/PredicateFilter.java create mode 100644 src/main/java/net/pterodactylus/sonitus/data/source/MultiSource.java diff --git a/src/main/java/net/pterodactylus/sonitus/data/ConnectException.java b/src/main/java/net/pterodactylus/sonitus/data/ConnectException.java deleted file mode 100644 index 004d88c..0000000 --- a/src/main/java/net/pterodactylus/sonitus/data/ConnectException.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Sonitus - ConnectException.java - Copyright © 2013 David Roden - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.pterodactylus.sonitus.data; - -/** - * Exception that signals an error when {@link Sink#connect(Source) connecting} - * a {@link Source} to a {@link Sink}. - * - * @author David ‘Bombe’ Roden - */ -public class ConnectException extends Exception { - - /** Creates a new connect exception. */ - public ConnectException() { - super(); - } - - /** - * Creates a new connect exception. - * - * @param message - * The message of the exception - */ - public ConnectException(String message) { - super(message); - } - - /** - * Creates a new connect exception. - * - * @param cause - * The cause of the exception - */ - public ConnectException(Throwable cause) { - super(cause); - } - - /** - * Creates a new connect exception. - * - * @param message - * The message of the exception - * @param cause - * The cause of the exception - */ - public ConnectException(String message, Throwable cause) { - super(message, cause); - } - -} diff --git a/src/main/java/net/pterodactylus/sonitus/data/Connection.java b/src/main/java/net/pterodactylus/sonitus/data/Connection.java deleted file mode 100644 index 169bfa5..0000000 --- a/src/main/java/net/pterodactylus/sonitus/data/Connection.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Sonitus - Connection.java - Copyright © 2013 David Roden - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.pterodactylus.sonitus.data; - -import java.io.IOException; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * A connection reads bytes from a {@link Source} and feeds it to a sink. This - * class is meant to be subclassed by each {@link Sink}, overriding the {@link - * #feed(byte[])} method to actually feed the data into the sink. The {@link - * #feed(byte[])} method is also responsible for blocking for an appropriate - * amount of time; this method determines how fast a {@link Source} is - * consumed. - * - * @author David ‘Bombe’ Roden - */ -public abstract class Connection implements Runnable { - - /** The logger. */ - private static final Logger logger = Logger.getLogger(Connection.class.getName()); - - /** The source to consume. */ - private final Source source; - - /** - * Creates a new connection that will read from the given source. - * - * @param source - * The source to read - */ - public Connection(Source source) { - this.source = source; - } - - // - // RUNNABLE METHODS - // - - @Override - public void run() { - while (true) { - byte[] buffer = null; - try { - buffer = source.get(bufferSize()); - } catch (IOException ioe1) { - logger.log(Level.WARNING, "Source died!", ioe1); - break; - } - try { - feed(buffer); - } catch (IOException ioe1) { - logger.log(Level.WARNING, "Sink died!", ioe1); - break; - } - } - try { - logger.info("Connection finished."); - finish(); - } catch (IOException ioe1) { - /* well, what can we do? nothing. */ - } - } - - // - // SUBCLASS METHODS - // - - /** - * Returns the number of bytes that will be requested from the source. - * - * @return The number of bytes that will be requested from the source - */ - protected abstract int bufferSize(); - - /** - * Feeds the read data into the sink. The given buffer is always filled and - * never contains excess elements. - * - * @param buffer - * The data - * @throws IOException - * if an I/O error occurs - */ - protected abstract void feed(byte[] buffer) throws IOException; - - /** - * Notifies the sink that the source does not deliver any more data. - * - * @throws IOException - * if an I/O error occurs - */ - protected abstract void finish() throws IOException; - -} diff --git a/src/main/java/net/pterodactylus/sonitus/data/Filter.java b/src/main/java/net/pterodactylus/sonitus/data/Filter.java index d0bad5d..c6fab7e 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/Filter.java +++ b/src/main/java/net/pterodactylus/sonitus/data/Filter.java @@ -18,7 +18,8 @@ package net.pterodactylus.sonitus.data; /** - * A filter processes an input to produce an output. + * A filter is both a {@link Source} and a {@link Sink}. It is used to process + * the audio date in whatever way seems appropriate. * * @author David ‘Bombe’ Roden */ diff --git a/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java new file mode 100644 index 0000000..3ee0c9d --- /dev/null +++ b/src/main/java/net/pterodactylus/sonitus/data/Pipeline.java @@ -0,0 +1,300 @@ +/* + * Sonitus - Pipeline.java - Copyright © 2013 David Roden + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package net.pterodactylus.sonitus.data; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; + +/** + * A pipeline is responsible for streaming audio data from a {@link Source} to + * an arbitrary number of connected {@link Filter}s and {@link Sink}s. + * + * @author David ‘Bombe’ Roden + */ +public class Pipeline { + + /** The logger. */ + private static final Logger logger = Logger.getLogger(Pipeline.class.getName()); + + /** The source of the audio stream. */ + private final Source source; + + /** The sinks for each source. */ + private final Multimap sinks; + + /** All started feeders. */ + private final List feeders = Lists.newArrayList(); + + /** + * Creates a new pipeline. + * + * @param source + * The source of the audio stream + * @param sinks + * The sinks for each source + */ + private Pipeline(Source source, Multimap sinks) { + this.source = Preconditions.checkNotNull(source, "source must not be null"); + this.sinks = Preconditions.checkNotNull(sinks, "sinks must not be null"); + } + + // + // ACTIONS + // + + /** + * Starts the pipeline. + * + * @throws IOException + * if any of the sinks can not be opened + * @throws IllegalStateException + * if the pipeline is already running + */ + public void start() throws IOException, IllegalStateException { + if (!feeders.isEmpty()) { + throw new IllegalStateException("Pipeline is already running!"); + } + List sources = Lists.newArrayList(); + sources.add(source); + /* collect all source->sink pairs. */ + while (!sources.isEmpty()) { + Source source = sources.remove(0); + Collection sinks = this.sinks.get(source); + feeders.add(new Feeder(source, sinks)); + for (Sink sink : sinks) { + sink.open(source.metadata()); + if (sink instanceof Filter) { + sources.add((Source) sink); + } + } + } + for (Feeder feeder : feeders) { + logger.info(String.format("Starting Feeder from %s to %s.", feeder.source, feeder.sinks)); + new Thread(feeder).start(); + } + } + + public void stop() { + if (!feeders.isEmpty()) { + /* pipeline is not running. */ + return; + } + for (Feeder feeder : feeders) { + feeder.stop(); + } + } + + // + // STATIC METHODS + // + + /** + * Returns a new pipeline builder. + * + * @param source + * The source at which to start + * @return A builder for a new pipeline + */ + public static Builder builder(Source source) { + return new Builder(source); + } + + /** + * A builder for a {@link Pipeline}. + * + * @author David ‘Bombe’ Roden + */ + public static class Builder { + + /** The source of the pipeline. */ + private final Source source; + + /** The sinks to which each source streams. */ + private Multimap nextSinks = ArrayListMultimap.create(); + + /** The last added source. */ + private Source lastSource; + + /** + * Creates a new builder. + * + * @param source + * The source that starts the pipeline + */ + private Builder(Source source) { + this.source = source; + lastSource = source; + } + + /** + * Adds a {@link Sink} (or {@link Filter} as a recipient for the last added + * {@link Source}. + * + * @param sink + * The sink to add + * @return This builder + * @throws IllegalStateException + * if the last added {@link Sink} was not also a {@link Source} + */ + public Builder to(Sink sink) { + Preconditions.checkState(lastSource != null, "last added Sink was not a Source"); + nextSinks.put(lastSource, sink); + lastSource = (sink instanceof Filter) ? (Source) sink : null; + return this; + } + + /** + * Locates the given source and sets it as the last added node so that the + * next invocation of {@link #to(Sink)} can “fork” the pipeline. + * + * @param source + * The source to locate + * @return This builder + * @throws IllegalStateException + * if the given source was not previously added as a sink + */ + public Builder find(Source source) { + Preconditions.checkState(nextSinks.containsValue(source)); + lastSource = source; + return this; + } + + /** + * Builds the pipeline. + * + * @return The created pipeline + */ + public Pipeline build() { + return new Pipeline(source, ImmutableMultimap.copyOf(nextSinks)); + } + + } + + /** + * A feeder is responsible for streaming audio from one {@link Source} to an + * arbitrary number of {@link Sink}s it is connected to. A feeder is started by + * creating a {@link Thread} wrapping it and starting said thread. + * + * @author David ‘Bombe’ Roden + */ + private class Feeder implements Runnable { + + /** The source. */ + private final Source source; + + /** The sinks. */ + private final Collection sinks; + + /** Whether the feeder was stopped. */ + private final AtomicBoolean stopped = new AtomicBoolean(false); + + /** The executor service. */ + private final ExecutorService executorService; + + /** + * Creates a new feeder. + * + * @param source + * The source of the stream + * @param sinks + * The sinks to which to stream + */ + public Feeder(Source source, Collection sinks) { + this.source = source; + this.sinks = sinks; + if (sinks.size() == 1) { + executorService = Executors.newSingleThreadExecutor(); + } else { + executorService = Executors.newCachedThreadPool(); + } + } + + // + // ACTIONS + // + + /** Stops this feeder. */ + public void stop() { + stopped.set(true); + } + + // + // RUNNABLE METHODS + // + + @Override + public void run() { + Metadata firstMetadata = source.metadata(); + while (!stopped.get()) { + try { + final Metadata lastMetadata = firstMetadata; + final Metadata metadata = firstMetadata = source.metadata(); + final byte[] buffer = source.get(4096); + List> futures = executorService.invokeAll(FluentIterable.from(sinks).transform(new Function>() { + + @Override + public Callable apply(final Sink sink) { + return new Callable() { + + @Override + public Void call() throws Exception { + if (!metadata.equals(lastMetadata)) { + sink.metadataUpdated(metadata); + } + sink.process(buffer); + return null; + } + }; + } + }).toList()); + /* check all threads for exceptions. */ + for (Future future : futures) { + future.get(); + } + } catch (IOException e) { + /* TODO */ + e.printStackTrace(); + } catch (InterruptedException e) { + /* TODO */ + e.printStackTrace(); + } catch (ExecutionException e) { + /* TODO */ + e.printStackTrace(); + } + } + } + + } + +} diff --git a/src/main/java/net/pterodactylus/sonitus/data/ReusableSink.java b/src/main/java/net/pterodactylus/sonitus/data/ReusableSink.java deleted file mode 100644 index 2d4f4bb..0000000 --- a/src/main/java/net/pterodactylus/sonitus/data/ReusableSink.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Sonitus - MultiSourceSink.java - Copyright © 2013 David Roden - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.pterodactylus.sonitus.data; - -/** - * Extension of the {@link Sink} interface that supports changing the source - * without causing a reconnection in the sink. - * - * @author David ‘Bombe’ Roden - */ -public interface ReusableSink extends Sink { - -} diff --git a/src/main/java/net/pterodactylus/sonitus/data/Sink.java b/src/main/java/net/pterodactylus/sonitus/data/Sink.java index 0fbfb9e..09679c6 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/Sink.java +++ b/src/main/java/net/pterodactylus/sonitus/data/Sink.java @@ -1,43 +1,46 @@ -/* - * Sonitus - Sink.java - Copyright © 2013 David Roden - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - package net.pterodactylus.sonitus.data; +import java.io.IOException; + /** - * A sink is an endpoint for an audio stream. It might be a file on disk, it can - * be an audio output in the current system, or it can be sent to a remote - * streaming server for broadcasting. + * A sink is a destination for audio data. It can be played on speakers, it can + * be written to a file, or it can be sent to a remote streaming server. * * @author David ‘Bombe’ Roden */ public interface Sink { /** - * Connects this sink to the given source. + * Opens this sink using the format parameters of the given metadata. + * + * @param metadata + * The metadata of the stream + * @throws IOException + * if an I/O error occurs + */ + void open(Metadata metadata) throws IOException; + + /** Closes this sink. */ + void close(); + + /** + * Processes the given buffer of data. * - * @param source - * The source to connect to - * @throws ConnectException - * if the source can not be connected, e.g. due to a {@link Metadata} - * mismatch + * @param buffer + * The data to process + * @throws IOException + * if an I/O error occurs */ - void connect(Source source) throws ConnectException; + void process(byte[] buffer) throws IOException; - /** Notifies the sink that a source has updated its metadata. */ - void metadataUpdated(); + /** + * Notifies the sink that the metadata of the audio stream has changed. This + * method should return as fast as possible, i.e. every heavy lifting should be + * done from another thread. + * + * @param metadata + * The new metadata + */ + void metadataUpdated(Metadata metadata); } diff --git a/src/main/java/net/pterodactylus/sonitus/data/Source.java b/src/main/java/net/pterodactylus/sonitus/data/Source.java index 7bb10bd..4e9895e 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/Source.java +++ b/src/main/java/net/pterodactylus/sonitus/data/Source.java @@ -1,55 +1,32 @@ -/* - * Sonitus - Source.java - Copyright © 2013 David Roden - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - package net.pterodactylus.sonitus.data; -import java.io.EOFException; import java.io.IOException; /** - * Defines an arbitrary media source. This can be almost anything; an MP3 file, - * a FastTracker module, or a decoded WAVE file. + * A source produces an audio stream and accompanying metadata. * * @author David ‘Bombe’ Roden */ public interface Source { /** - * Returns the metadata of this source. + * Returns the metadata of the audio stream. * - * @return The metadata of this source + * @return The metadata of the audio stream */ Metadata metadata(); /** - * Retrieves the given name of bytes from this source. The source should always - * try to read as much data as was requested but is free to return a byte array - * with less elements that requested. However, the byte array will always be - * the same size as the data that was actually read, i.e. there are no excess - * elements in the returned array. + * Retrieves data from the audio stream. * * @param bufferSize - * The size of the buffer - * @return A buffer that contains the read data - * @throws EOFException - * if the end of the source was reached + * The maximum amount of bytes to retrieve from the audio stream + * @return A buffer filled with up to {@code bufferSize} bytes of data; the + * returned buffer may contain less data than requested but will not + * contain excess elements * @throws IOException * if an I/O error occurs */ - byte[] get(int bufferSize) throws EOFException, IOException; + byte[] get(int bufferSize) throws IOException; } diff --git a/src/main/java/net/pterodactylus/sonitus/data/filter/DummyFilter.java b/src/main/java/net/pterodactylus/sonitus/data/filter/DummyFilter.java new file mode 100644 index 0000000..a3ce099 --- /dev/null +++ b/src/main/java/net/pterodactylus/sonitus/data/filter/DummyFilter.java @@ -0,0 +1,126 @@ +/* + * Sonitus - AbstractFilter.java - Copyright © 2013 David Roden + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package net.pterodactylus.sonitus.data.filter; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.Arrays; + +import net.pterodactylus.sonitus.data.Filter; +import net.pterodactylus.sonitus.data.Metadata; + +import com.google.common.io.Closeables; + +/** + * Dummy {@link Filter} implementation that pipes its input to its output. + * + * @author David ‘Bombe’ Roden + */ +public class DummyFilter implements Filter { + + /** The input stream from which to read. */ + private InputStream inputStream; + + /** The output stream to which to write. */ + private OutputStream outputStream; + + /** The current metadata. */ + private Metadata metadata; + + // + // FILTER METHODS + // + + @Override + public void open(Metadata metadata) throws IOException { + this.metadata = metadata; + inputStream = createInputStream(); + outputStream = createOutputStream(); + } + + @Override + public void close() { + try { + Closeables.close(outputStream, true); + Closeables.close(inputStream, true); + } catch (IOException e) { + /* won’t throw. */ + } + } + + @Override + public Metadata metadata() { + return metadata; + } + + @Override + public void metadataUpdated(Metadata metadata) { + this.metadata = metadata; + } + + @Override + public void process(byte[] buffer) throws IOException { + outputStream.write(buffer); + outputStream.flush(); + } + + @Override + public byte[] get(int bufferSize) throws IOException { + byte[] buffer = new byte[bufferSize]; + int read = inputStream.read(buffer); + if (read == -1) { + throw new EOFException(); + } + return Arrays.copyOf(buffer, read); + } + + // + // SUBCLASS METHODS + // + + /** + * Creates the input stream from which {@link net.pterodactylus.sonitus.data.Pipeline} + * will read the audio data. If you override this, you have to override {@link + * #createOutputStream()}, too! + * + * @return The input stream to read from + * @throws IOException + * if an I/O error occurs + */ + protected InputStream createInputStream() throws IOException { + return new PipedInputStream(); + } + + /** + * Creates the output stream to which {@link net.pterodactylus.sonitus.data.Pipeline} + * will write the audio data. If you override this, you have to override {@link + * #createInputStream()}, too! + * + * @return The output stream to write to + * @throws IOException + * if an I/O error occurs + */ + protected OutputStream createOutputStream() throws IOException { + return new PipedOutputStream((PipedInputStream) inputStream); + } + +} diff --git a/src/main/java/net/pterodactylus/sonitus/data/filter/ExternalFilter.java b/src/main/java/net/pterodactylus/sonitus/data/filter/ExternalFilter.java index 6b68c28..72c6177 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/filter/ExternalFilter.java +++ b/src/main/java/net/pterodactylus/sonitus/data/filter/ExternalFilter.java @@ -17,99 +17,60 @@ package net.pterodactylus.sonitus.data.filter; -import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.PipedOutputStream; -import java.util.Arrays; import java.util.logging.Logger; -import net.pterodactylus.sonitus.data.ConnectException; -import net.pterodactylus.sonitus.data.Connection; -import net.pterodactylus.sonitus.data.Filter; import net.pterodactylus.sonitus.data.Metadata; -import net.pterodactylus.sonitus.data.Source; import net.pterodactylus.sonitus.io.InputStreamDrainer; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; /** - * {@link Filter} implementation that runs its {@link Source} through an - * external program. + * {@link net.pterodactylus.sonitus.data.Filter} implementation that runs its + * {@link net.pterodactylus.sonitus.data.Source} through an external program. * * @author David ‘Bombe’ Roden */ -public abstract class ExternalFilter implements Filter { +public abstract class ExternalFilter extends DummyFilter { /** The logger. */ private final Logger logger = Logger.getLogger(getClass().getName()); - /** The source. */ - private Source source; - - private InputStream processInputStream; + /** The external process. */ + private Process process; // // FILTER METHODS // @Override - public Metadata metadata() { - return source.metadata(); + public void open(Metadata metadata) throws IOException { + process = Runtime.getRuntime().exec(Iterables.toArray(ImmutableList.builder().add(binary(metadata)).addAll(parameters(metadata)).build(), String.class)); + InputStream processError = process.getErrorStream(); + new Thread(new InputStreamDrainer(processError)).start(); + super.open(metadata); } @Override - public byte[] get(int bufferSize) throws EOFException, IOException { - byte[] buffer = new byte[bufferSize]; - int read = processInputStream.read(buffer); - if (read == -1) { - throw new EOFException(); - } - return Arrays.copyOf(buffer, read); + public void close() { + process.destroy(); } + // + // DUMMYFILTER METHODS + // + @Override - public void connect(Source source) throws ConnectException { - Preconditions.checkNotNull(source, "source must not be null"); - - this.source = source; - try { - final Process process = Runtime.getRuntime().exec(Iterables.toArray(ImmutableList.builder().add(binary(source.metadata())).addAll(parameters(source.metadata())).build(), String.class)); - processInputStream = process.getInputStream(); - final OutputStream processInput = process.getOutputStream(); - final InputStream processError = process.getErrorStream(); - final PipedOutputStream pipedOutputStream = new PipedOutputStream(); - new Thread(new InputStreamDrainer(processError)).start(); - new Thread(new Connection(source) { - - @Override - protected int bufferSize() { - return 4096; - } - - @Override - protected void feed(byte[] buffer) throws IOException { - processInput.write(buffer); - processInput.flush(); - } - - @Override - protected void finish() throws IOException { - processInput.close(); - processError.close(); - } - }).start(); - } catch (IOException ioe1) { - - } + protected InputStream createInputStream() throws IOException { + return process.getInputStream(); } @Override - public void metadataUpdated() { - /* ignore. */ + protected OutputStream createOutputStream() throws IOException { + return process.getOutputStream(); } // diff --git a/src/main/java/net/pterodactylus/sonitus/data/filter/ExternalMp3Decoder.java b/src/main/java/net/pterodactylus/sonitus/data/filter/ExternalMp3Decoder.java index cc52e41..2e80ec7 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/filter/ExternalMp3Decoder.java +++ b/src/main/java/net/pterodactylus/sonitus/data/filter/ExternalMp3Decoder.java @@ -17,15 +17,17 @@ package net.pterodactylus.sonitus.data.filter; -import static com.google.common.base.Preconditions.*; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.inject.internal.util.$Preconditions.checkState; + +import java.io.IOException; -import net.pterodactylus.sonitus.data.ConnectException; import net.pterodactylus.sonitus.data.Metadata; -import net.pterodactylus.sonitus.data.Source; /** - * Basic {@link ExternalFilter} implementation that verifies that the connected - * source is MP3-encoded and returns a PCM-encoded stream. + * Basic {@link net.pterodactylus.sonitus.data.filter.ExternalFilter} + * implementation that verifies that the connected source is MP3-encoded and + * returns a PCM-encoded stream. * * @author David ‘Bombe’ Roden */ @@ -37,11 +39,11 @@ public abstract class ExternalMp3Decoder extends ExternalFilter { } @Override - public void connect(Source source) throws ConnectException { - checkNotNull(source, "source must not be null"); - checkState(source.metadata().encoding().equalsIgnoreCase("MP3"), "source must be MP3-encoded"); + public void open(Metadata metadata) throws IOException { + checkNotNull(metadata, "metadata must not be null"); + checkState(metadata.encoding().equalsIgnoreCase("MP3"), "source must be MP3-encoded"); - super.connect(source); + super.open(metadata); } } diff --git a/src/main/java/net/pterodactylus/sonitus/data/filter/ExternalMp3Encoder.java b/src/main/java/net/pterodactylus/sonitus/data/filter/ExternalMp3Encoder.java index 1975e36..391e01b 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/filter/ExternalMp3Encoder.java +++ b/src/main/java/net/pterodactylus/sonitus/data/filter/ExternalMp3Encoder.java @@ -17,15 +17,17 @@ package net.pterodactylus.sonitus.data.filter; -import net.pterodactylus.sonitus.data.ConnectException; -import net.pterodactylus.sonitus.data.Metadata; -import net.pterodactylus.sonitus.data.Source; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import java.io.IOException; -import com.google.common.base.Preconditions; +import net.pterodactylus.sonitus.data.Metadata; /** - * Basic {@link ExternalFilter} implementation that verifies that the connected - * source is PCM-encoded and that returns an MP3-encoded metadata. + * Basic {@link net.pterodactylus.sonitus.data.filter.ExternalFilter} + * implementation that verifies that the connected source is PCM-encoded and + * that returns an MP3-encoded metadata. * * @author David ‘Bombe’ Roden */ @@ -37,11 +39,11 @@ public abstract class ExternalMp3Encoder extends ExternalFilter { } @Override - public void connect(Source source) throws ConnectException { - Preconditions.checkNotNull(source, "source must not be null"); - Preconditions.checkState(source.metadata().encoding().equalsIgnoreCase("PCM"), "source must be PCM-encoded"); + public void open(Metadata metadata) throws IOException { + checkNotNull(metadata, "metadata must not be null"); + checkState(metadata.encoding().equalsIgnoreCase("PCM"), "source must be PCM-encoded"); - super.connect(source); + super.open(metadata); } } diff --git a/src/main/java/net/pterodactylus/sonitus/data/filter/MultiSourceFilter.java b/src/main/java/net/pterodactylus/sonitus/data/filter/MultiSourceFilter.java deleted file mode 100644 index d62ba97..0000000 --- a/src/main/java/net/pterodactylus/sonitus/data/filter/MultiSourceFilter.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Sonitus - MultiSource.java - Copyright © 2013 David Roden - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -package net.pterodactylus.sonitus.data.filter; - -import static com.google.common.base.Preconditions.*; - -import java.io.EOFException; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicReference; -import java.util.logging.Logger; - -import net.pterodactylus.sonitus.data.ConnectException; -import net.pterodactylus.sonitus.data.Filter; -import net.pterodactylus.sonitus.data.Metadata; -import net.pterodactylus.sonitus.data.ReusableSink; -import net.pterodactylus.sonitus.data.Source; -import net.pterodactylus.sonitus.data.event.SourceFinishedEvent; - -import com.google.common.eventbus.EventBus; -import com.google.inject.Inject; - -/** - * {@link ReusableSink} implementation that supports changing the source without - * letting the {@link net.pterodactylus.sonitus.data.Sink} know. - * - * @author David ‘Bombe’ Roden - */ -public class MultiSourceFilter implements Filter, ReusableSink { - - /** The logger. */ - private static final Logger logger = Logger.getLogger(MultiSourceFilter.class.getName()); - - /** The event bus. */ - private final EventBus eventBus; - - /** The current source. */ - private final AtomicReference source = new AtomicReference(); - - @Inject - public MultiSourceFilter(EventBus eventBus) { - this.eventBus = eventBus; - } - - @Override - public Metadata metadata() { - return source.get().metadata(); - } - - @Override - public byte[] get(int bufferSize) throws EOFException, IOException { - try { - return source.get().get(bufferSize); - } catch (EOFException eofe1) { - eventBus.post(new SourceFinishedEvent(source.get())); - throw eofe1; - } - } - - @Override - public void connect(Source source) throws ConnectException { - checkNotNull(source, "source must not be null"); - - Source oldSource = this.source.getAndSet(source); - if (oldSource != null) { - checkArgument(oldSource.metadata().channels() == source.metadata().channels(), "source’s channel count must equal existing source’s channel count"); - checkArgument(oldSource.metadata().frequency() == source.metadata().frequency(), "source’s frequency must equal existing source’s frequency"); - checkArgument(oldSource.metadata().encoding().equalsIgnoreCase(source.metadata().encoding()), "source’s encoding must equal existing source’s encoding"); - } - } - - @Override - public void metadataUpdated() { - /* ignore. */ - } - -} diff --git a/src/main/java/net/pterodactylus/sonitus/data/filter/OggVorbisDecoder.java b/src/main/java/net/pterodactylus/sonitus/data/filter/OggVorbisDecoder.java index 2cd9035..5ea2d48 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/filter/OggVorbisDecoder.java +++ b/src/main/java/net/pterodactylus/sonitus/data/filter/OggVorbisDecoder.java @@ -17,11 +17,13 @@ package net.pterodactylus.sonitus.data.filter; -import net.pterodactylus.sonitus.data.ConnectException; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; + import net.pterodactylus.sonitus.data.Metadata; -import net.pterodactylus.sonitus.data.Source; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; /** @@ -70,11 +72,11 @@ public class OggVorbisDecoder extends ExternalFilter { } @Override - public void connect(Source source) throws ConnectException { - Preconditions.checkNotNull(source, "source must not be null"); - Preconditions.checkArgument(source.metadata().encoding().equalsIgnoreCase("Vorbis"), "source must be Vorbis-encoded"); + public void open(Metadata metadata) throws IOException { + checkNotNull(metadata, "metadata must not be null"); + checkArgument(metadata.encoding().equalsIgnoreCase("Vorbis"), "source must be Vorbis-encoded"); - super.connect(source); + super.open(metadata); } // diff --git a/src/main/java/net/pterodactylus/sonitus/data/filter/PredicateFilter.java b/src/main/java/net/pterodactylus/sonitus/data/filter/PredicateFilter.java new file mode 100644 index 0000000..1e8c7fa --- /dev/null +++ b/src/main/java/net/pterodactylus/sonitus/data/filter/PredicateFilter.java @@ -0,0 +1,122 @@ +/* + * Sonitus - PredicateFilter.java - Copyright © 2013 David Roden + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package net.pterodactylus.sonitus.data.filter; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import net.pterodactylus.sonitus.data.Filter; +import net.pterodactylus.sonitus.data.Metadata; + +import com.google.common.base.Predicate; + +/** + * {@link Filter} implementation that uses a {@link Predicate} to determine + * whether a filter will be used or the data will only be passed through. + * + * @author David ‘Bombe’ Roden + */ +public class PredicateFilter extends DummyFilter { + + /** The predicate. */ + private final Predicate metadataPredicate; + + /** The filter to use if the predicate matches. */ + private final Filter filter; + + /** Whether the predicate currently matches. */ + private final AtomicBoolean metadataMatches = new AtomicBoolean(false); + + /** + * Creates a new predicate filter. + * + * @param metadataPredicate + * The predicate to evaluate every time the metadata changes + * @param filter + * The filter to use if the predicate matches the metadata + */ + public PredicateFilter(Predicate metadataPredicate, Filter filter) { + this.metadataPredicate = metadataPredicate; + this.filter = filter; + } + + // + // FILTER METHODS + // + + @Override + public void open(Metadata metadata) throws IOException { + checkNotNull(metadata, "metadata must not be null"); + + metadataMatches.set(metadataPredicate.apply(metadata)); + if (metadataMatches.get()) { + filter.open(metadata); + } else { + super.open(metadata); + } + } + + @Override + public void close() { + if (metadataMatches.get()) { + filter.close(); + } else { + super.close(); + } + } + + @Override + public void metadataUpdated(Metadata metadata) { + metadataMatches.set(metadataPredicate.apply(metadata)); + if (metadataMatches.get()) { + filter.metadataUpdated(metadata); + } else { + super.metadataUpdated(metadata); + } + } + + @Override + public void process(byte[] buffer) throws IOException { + if (metadataMatches.get()) { + filter.process(buffer); + } else { + super.process(buffer); + } + } + + @Override + public Metadata metadata() { + if (metadataMatches.get()) { + return filter.metadata(); + } else { + return super.metadata(); + } + } + + @Override + public byte[] get(int bufferSize) throws IOException { + if (metadataMatches.get()) { + return filter.get(bufferSize); + } else { + return super.get(bufferSize); + } + } + +} diff --git a/src/main/java/net/pterodactylus/sonitus/data/filter/RateLimitingFilter.java b/src/main/java/net/pterodactylus/sonitus/data/filter/RateLimitingFilter.java index 7ee4116..d513df6 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/filter/RateLimitingFilter.java +++ b/src/main/java/net/pterodactylus/sonitus/data/filter/RateLimitingFilter.java @@ -17,24 +17,19 @@ package net.pterodactylus.sonitus.data.filter; -import java.io.EOFException; import java.io.IOException; import java.util.logging.Logger; -import net.pterodactylus.sonitus.data.ConnectException; -import net.pterodactylus.sonitus.data.Filter; import net.pterodactylus.sonitus.data.Metadata; -import net.pterodactylus.sonitus.data.Source; - -import com.google.common.base.Preconditions; /** * Rate limiting filter that only passes a specified amount of data per second - * from its {@link Source} to its {@link net.pterodactylus.sonitus.data.Sink}. + * from its {@link net.pterodactylus.sonitus.data.Source} to its {@link + * net.pterodactylus.sonitus.data.Sink}. * * @author David ‘Bombe’ Roden */ -public class RateLimitingFilter implements Filter { +public class RateLimitingFilter extends DummyFilter { /** The logger. */ private static final Logger logger = Logger.getLogger(RateLimitingFilter.class.getName()); @@ -42,14 +37,11 @@ public class RateLimitingFilter implements Filter { /** The limiting rate in bytes/second. */ private final int rate; - /** The fast start time. */ - private final long fastStartTime; - - /** The source. */ - private Source source; + /** The start time. */ + private long startTime; - /** The remaining fast start time. */ - private long remainingFastStartTime; + /** The number of bytes. */ + private long counter; /** * Creates a new rate limiting filter. @@ -72,8 +64,7 @@ public class RateLimitingFilter implements Filter { */ public RateLimitingFilter(int rate, long fastStartTime) { this.rate = rate; - this.fastStartTime = fastStartTime; - remainingFastStartTime = fastStartTime; + this.counter = (long) (-rate * (fastStartTime / 1000.0)); } // @@ -81,39 +72,27 @@ public class RateLimitingFilter implements Filter { // @Override - public Metadata metadata() { - return source.metadata(); + public void open(Metadata metadata) throws IOException { + super.open(metadata); + startTime = System.currentTimeMillis(); } @Override - public byte[] get(int bufferSize) throws EOFException, IOException { - long now = System.currentTimeMillis(); - byte[] buffer = source.get(bufferSize); + public void process(byte[] buffer) throws IOException { + super.process(buffer); /* delay. */ - long waitTime = 1000 * buffer.length / rate; - remainingFastStartTime = Math.max(remainingFastStartTime - waitTime, 0); - while ((remainingFastStartTime == 0) && (System.currentTimeMillis() - now) < waitTime) { + counter += buffer.length; + long waitTime = (long) (counter / (rate / 1000.0)); + while ((System.currentTimeMillis() - startTime) < waitTime) { try { - long limitDelay = waitTime - (System.currentTimeMillis() - now); + long limitDelay = waitTime - (System.currentTimeMillis() - startTime); logger.finest(String.format("Waiting %d ms...", limitDelay)); Thread.sleep(limitDelay); } catch (InterruptedException ie1) { /* ignore, keep looping. */ } } - return buffer; - } - - @Override - public void connect(Source source) throws ConnectException { - Preconditions.checkNotNull(source, "source must not be null"); - - this.source = source; - } - - @Override - public void metadataUpdated() { - /* ignore. */ + logger.finest(String.format("Processed %d Bytes during %d ms, that’s %.1f bytes/s.", counter, System.currentTimeMillis() - startTime, counter / ((System.currentTimeMillis() - startTime) / 1000.0))); } } diff --git a/src/main/java/net/pterodactylus/sonitus/data/filter/SoxResampleFilter.java b/src/main/java/net/pterodactylus/sonitus/data/filter/SoxResampleFilter.java index 69daa99..a5061fb 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/filter/SoxResampleFilter.java +++ b/src/main/java/net/pterodactylus/sonitus/data/filter/SoxResampleFilter.java @@ -17,11 +17,12 @@ package net.pterodactylus.sonitus.data.filter; -import static com.google.common.base.Preconditions.*; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.IOException; -import net.pterodactylus.sonitus.data.ConnectException; import net.pterodactylus.sonitus.data.Metadata; -import net.pterodactylus.sonitus.data.Source; import com.google.common.collect.ImmutableList; @@ -62,11 +63,11 @@ public class SoxResampleFilter extends ExternalFilter { } @Override - public void connect(Source source) throws ConnectException { - checkNotNull(source, "source must not be null"); - checkArgument(source.metadata().encoding().equalsIgnoreCase("PCM"), "source must be PCM-encoded"); + public void open(Metadata metadata) throws IOException { + checkNotNull(metadata, "metadata must not be null"); + checkArgument(metadata.encoding().equalsIgnoreCase("PCM"), "source must be PCM-encoded"); - super.connect(source); + super.open(metadata); } // diff --git a/src/main/java/net/pterodactylus/sonitus/data/sink/AudioSink.java b/src/main/java/net/pterodactylus/sonitus/data/sink/AudioSink.java index f29bde8..689c23d 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/sink/AudioSink.java +++ b/src/main/java/net/pterodactylus/sonitus/data/sink/AudioSink.java @@ -17,23 +17,22 @@ package net.pterodactylus.sonitus.data.sink; -import static com.google.common.base.Preconditions.*; - +import java.io.IOException; import java.util.logging.Logger; import javax.sound.sampled.AudioFormat; import javax.sound.sampled.AudioSystem; import javax.sound.sampled.LineUnavailableException; import javax.sound.sampled.SourceDataLine; -import net.pterodactylus.sonitus.data.ConnectException; -import net.pterodactylus.sonitus.data.Connection; import net.pterodactylus.sonitus.data.Metadata; import net.pterodactylus.sonitus.data.Sink; -import net.pterodactylus.sonitus.data.Source; + +import com.google.common.base.Preconditions; /** - * {@link Sink} implementation that uses the JDK’s {@link - * javax.sound.sampled.AudioSystem} to play all {@link net.pterodactylus.sonitus.data.Source}s. + * {@link net.pterodactylus.sonitus.data.Sink} implementation that uses the + * JDK’s {@link javax.sound.sampled.AudioSystem} to play all {@link + * net.pterodactylus.sonitus.data.Source}s. * * @author David ‘Bombe’ Roden */ @@ -42,47 +41,41 @@ public class AudioSink implements Sink { /** The logger. */ private static final Logger logger = Logger.getLogger(AudioSink.class.getName()); - /** The current source. */ - private Source source; + /** The current metadata. */ + private Metadata metadata; - @Override - public void connect(Source source) throws ConnectException { - this.source = checkNotNull(source, "source must not be null"); - checkState(source.metadata().encoding().equalsIgnoreCase("PCM"), "source must be PCM-encoded"); + /** The audio output. */ + private SourceDataLine sourceDataLine; - final Metadata sourceMetadata = source.metadata(); - AudioFormat audioFormat = new AudioFormat(sourceMetadata.frequency(), 16, sourceMetadata.channels(), true, false); + @Override + public void open(Metadata metadata) throws IOException { + Preconditions.checkArgument(metadata.encoding().equalsIgnoreCase("PCM"), "source must be PCM-encoded"); + AudioFormat audioFormat = new AudioFormat(metadata.frequency(), 16, metadata.channels(), true, false); try { - final SourceDataLine sourceDataLine = AudioSystem.getSourceDataLine(audioFormat); + sourceDataLine = AudioSystem.getSourceDataLine(audioFormat); sourceDataLine.open(audioFormat); sourceDataLine.start(); - new Thread(new Connection(source) { - - @Override - protected int bufferSize() { - return sourceMetadata.channels() * sourceMetadata.frequency() * 2; - } - - @Override - protected void feed(byte[] buffer) { - sourceDataLine.write(buffer, 0, buffer.length); - logger.finest(String.format("AudioSink: Wrote %d Bytes.", buffer.length)); - } - - @Override - protected void finish() { - sourceDataLine.stop(); - } - }).start(); - metadataUpdated(); - } catch (LineUnavailableException lue1) { - throw new ConnectException(lue1); + } catch (LineUnavailableException e) { + /* TODO */ + throw new IOException(e); } } @Override - public void metadataUpdated() { + public void close() { + sourceDataLine.stop(); + sourceDataLine.close(); + } + + @Override + public void metadataUpdated(Metadata metadata) { /* ignore. */ } + @Override + public void process(byte[] buffer) { + sourceDataLine.write(buffer, 0, buffer.length); + logger.finest(String.format("AudioSink: Wrote %d Bytes.", buffer.length)); + } + } diff --git a/src/main/java/net/pterodactylus/sonitus/data/sink/FileSink.java b/src/main/java/net/pterodactylus/sonitus/data/sink/FileSink.java index 61e0a27..7210f71 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/sink/FileSink.java +++ b/src/main/java/net/pterodactylus/sonitus/data/sink/FileSink.java @@ -17,20 +17,16 @@ package net.pterodactylus.sonitus.data.sink; -import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.util.logging.Logger; -import net.pterodactylus.sonitus.data.ConnectException; -import net.pterodactylus.sonitus.data.Connection; +import net.pterodactylus.sonitus.data.Metadata; import net.pterodactylus.sonitus.data.Sink; -import net.pterodactylus.sonitus.data.Source; - -import com.google.common.base.Preconditions; /** - * {@link Sink} that writes all received data into a file. + * {@link net.pterodactylus.sonitus.data.Sink} that writes all received data + * into a file. * * @author David ‘Bombe’ Roden */ @@ -42,6 +38,8 @@ public class FileSink implements Sink { /** The path of the file to write to. */ private final String path; + private FileOutputStream fileOutputStream; + /** * Creates a new file sink that will write to the given path. * @@ -53,37 +51,28 @@ public class FileSink implements Sink { } @Override - public void connect(Source source) throws ConnectException { - Preconditions.checkNotNull(source, "source must not be null"); + public void open(Metadata metadata) throws IOException { + fileOutputStream = new FileOutputStream(path); + } + @Override + public void close() { try { - final FileOutputStream fileOutputStream = new FileOutputStream(path); - new Thread(new Connection(source) { - - @Override - protected int bufferSize() { - return 65536; - } - - @Override - protected void feed(byte[] buffer) throws IOException { - fileOutputStream.write(buffer); - logger.finest(String.format("FileSink: Wrote %d Bytes.", buffer.length)); - } - - @Override - protected void finish() throws IOException { - fileOutputStream.close(); - } - }).start(); - } catch (FileNotFoundException fnfe1) { - throw new ConnectException(fnfe1); + fileOutputStream.close(); + } catch (IOException e) { + /* ignore. */ } } @Override - public void metadataUpdated() { + public void metadataUpdated(Metadata metadata) { /* ignore. */ } + @Override + public void process(byte[] buffer) throws IOException { + fileOutputStream.write(buffer); + logger.finest(String.format("FileSink: Wrote %d Bytes.", buffer.length)); + } + } diff --git a/src/main/java/net/pterodactylus/sonitus/data/sink/Icecast2Sink.java b/src/main/java/net/pterodactylus/sonitus/data/sink/Icecast2Sink.java index 0b55049..0a5d22d 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/sink/Icecast2Sink.java +++ b/src/main/java/net/pterodactylus/sonitus/data/sink/Icecast2Sink.java @@ -17,8 +17,6 @@ package net.pterodactylus.sonitus.data.sink; -import static com.google.common.base.Preconditions.*; - import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -29,11 +27,8 @@ import java.util.Arrays; import java.util.logging.Level; import java.util.logging.Logger; -import net.pterodactylus.sonitus.data.ConnectException; -import net.pterodactylus.sonitus.data.Connection; import net.pterodactylus.sonitus.data.Metadata; import net.pterodactylus.sonitus.data.Sink; -import net.pterodactylus.sonitus.data.Source; import net.pterodactylus.sonitus.io.InputStreamDrainer; import com.google.common.base.Function; @@ -44,8 +39,8 @@ import com.google.common.io.BaseEncoding; import com.google.common.io.Closeables; /** - * {@link Sink} implementation that delivers all incoming data to an Icecast2 - * server. + * {@link net.pterodactylus.sonitus.data.Sink} implementation that delivers all + * incoming data to an Icecast2 server. * * @author David ‘Bombe’ Roden */ @@ -78,8 +73,7 @@ public class Icecast2Sink implements Sink { /** Whether to publish the server. */ private final boolean publishServer; - /** The connected source. */ - private Source source; + private OutputStream socketOutputStream; /** * Creates a new Icecast2 sink. @@ -118,97 +112,85 @@ public class Icecast2Sink implements Sink { // @Override - public void connect(Source source) throws ConnectException { - checkNotNull(source, "source must not be null"); + public void open(Metadata metadata) throws IOException { + logger.info(String.format("Connecting to %s:%d...", server, port)); + Socket socket = new Socket(server, port); + logger.info("Connected."); + socketOutputStream = socket.getOutputStream(); + InputStream socketInputStream = socket.getInputStream(); + + sendLine(socketOutputStream, String.format("SOURCE /%s ICE/1.0", mountPoint)); + sendLine(socketOutputStream, String.format("Authorization: Basic %s", generatePassword(password))); + sendLine(socketOutputStream, String.format("Content-Type: %s", getContentType(metadata))); + sendLine(socketOutputStream, String.format("ICE-Name: %s", serverName)); + sendLine(socketOutputStream, String.format("ICE-Description: %s", serverDescription)); + sendLine(socketOutputStream, String.format("ICE-Genre: %s", genre)); + sendLine(socketOutputStream, String.format("ICE-Public: %d", publishServer ? 1 : 0)); + sendLine(socketOutputStream, ""); + socketOutputStream.flush(); + + new Thread(new InputStreamDrainer(socketInputStream)).start(); + + metadataUpdated(metadata); + } - this.source = source; + @Override + public void close() { try { - logger.info(String.format("Connecting to %s:%d...", server, port)); - final Socket socket = new Socket(server, port); - logger.info("Connected."); - final OutputStream socketOutputStream = socket.getOutputStream(); - final InputStream socketInputStream = socket.getInputStream(); - - sendLine(socketOutputStream, String.format("SOURCE /%s ICE/1.0", mountPoint)); - sendLine(socketOutputStream, String.format("Authorization: Basic %s", generatePassword(password))); - sendLine(socketOutputStream, String.format("Content-Type: %s", getContentType(source.metadata()))); - sendLine(socketOutputStream, String.format("ICE-Name: %s", serverName)); - sendLine(socketOutputStream, String.format("ICE-Description: %s", serverDescription)); - sendLine(socketOutputStream, String.format("ICE-Genre: %s", genre)); - sendLine(socketOutputStream, String.format("ICE-Public: %d", publishServer ? 1 : 0)); - sendLine(socketOutputStream, ""); - socketOutputStream.flush(); - - new Thread(new InputStreamDrainer(socketInputStream)).start(); - new Thread(new Connection(source) { - - private long counter; - - @Override - protected int bufferSize() { - return 4096; - } - - @Override - protected void feed(byte[] buffer) throws IOException { - socketOutputStream.write(buffer); - socketOutputStream.flush(); - counter += buffer.length; - logger.finest(String.format("Wrote %d Bytes.", counter)); - } - - @Override - protected void finish() throws IOException { - Closeables.close(socketOutputStream, true); - if (socket != null) { - socket.close(); - } - } - }).start(); - - metadataUpdated(); - } catch (IOException ioe1) { - throw new ConnectException(ioe1); + Closeables.close(socketOutputStream, true); + } catch (IOException e) { + /* will never throw. */ } } @Override - public void metadataUpdated() { - Metadata metadata = source.metadata(); - String metadataString = String.format("%s (%s)", Joiner.on(" - ").skipNulls().join(FluentIterable.from(Arrays.asList(metadata.artist(), metadata.name())).transform(new Function, Object>() { + public void metadataUpdated(final Metadata metadata) { + new Thread(new Runnable() { @Override - public Object apply(Optional input) { - return input.orNull(); - } - })), "Sonitus"); - logger.info(String.format("Updating metadata to %s", metadataString)); + public void run() { + String metadataString = String.format("%s (%s)", Joiner.on(" - ").skipNulls().join(FluentIterable.from(Arrays.asList(metadata.artist(), metadata.name())).transform(new Function, Object>() { - Socket socket = null; - OutputStream socketOutputStream = null; - try { - socket = new Socket(server, port); - socketOutputStream = socket.getOutputStream(); - - sendLine(socketOutputStream, String.format("GET /admin/metadata?pass=%s&mode=updinfo&mount=/%s&song=%s HTTP/1.0", password, mountPoint, URLEncoder.encode(metadataString, "UTF-8"))); - sendLine(socketOutputStream, String.format("Authorization: Basic %s", generatePassword(password))); - sendLine(socketOutputStream, String.format("User-Agent: Mozilla/Sonitus")); - sendLine(socketOutputStream, ""); - socketOutputStream.flush(); - - new InputStreamDrainer(socket.getInputStream()).run(); - } catch (IOException ioe1) { - logger.log(Level.WARNING, "Could not update metadata!", ioe1); - } finally { - try { - Closeables.close(socketOutputStream, true); - if (socket != null) { - socket.close(); + @Override + public Object apply(Optional input) { + return input.orNull(); + } + })), "Sonitus"); + logger.info(String.format("Updating metadata to %s", metadataString)); + + Socket socket = null; + OutputStream socketOutputStream = null; + try { + socket = new Socket(server, port); + socketOutputStream = socket.getOutputStream(); + + sendLine(socketOutputStream, String.format("GET /admin/metadata?pass=%s&mode=updinfo&mount=/%s&song=%s HTTP/1.0", password, mountPoint, URLEncoder.encode(metadataString, "UTF-8"))); + sendLine(socketOutputStream, String.format("Authorization: Basic %s", generatePassword(password))); + sendLine(socketOutputStream, String.format("User-Agent: Mozilla/Sonitus")); + sendLine(socketOutputStream, ""); + socketOutputStream.flush(); + + new InputStreamDrainer(socket.getInputStream()).run(); + } catch (IOException ioe1) { + logger.log(Level.WARNING, "Could not update metadata!", ioe1); + } finally { + try { + Closeables.close(socketOutputStream, true); + if (socket != null) { + socket.close(); + } + } catch (IOException ioe1) { + /* ignore. */ + } } - } catch (IOException ioe1) { - /* ignore. */ } - } + }).start(); + } + + @Override + public void process(byte[] buffer) throws IOException { + socketOutputStream.write(buffer); + socketOutputStream.flush(); } // @@ -223,7 +205,7 @@ public class Icecast2Sink implements Sink { * The output stream to send the line to * @param line * The line to send - * @throws IOException + * @throws java.io.IOException * if an I/O error occurs */ private static void sendLine(OutputStream outputStream, String line) throws IOException { @@ -237,7 +219,7 @@ public class Icecast2Sink implements Sink { * @param password * The password to encode * @return The encoded password - * @throws UnsupportedEncodingException + * @throws java.io.UnsupportedEncodingException * if the UTF-8 encoding is not supported (which can never happen) */ private static String generatePassword(String password) throws UnsupportedEncodingException { diff --git a/src/main/java/net/pterodactylus/sonitus/data/source/FileSource.java b/src/main/java/net/pterodactylus/sonitus/data/source/FileSource.java index 010eeb9..3d6e871 100644 --- a/src/main/java/net/pterodactylus/sonitus/data/source/FileSource.java +++ b/src/main/java/net/pterodactylus/sonitus/data/source/FileSource.java @@ -17,7 +17,7 @@ package net.pterodactylus.sonitus.data.source; -import static com.google.common.base.Preconditions.*; +import static com.google.common.base.Preconditions.checkNotNull; import static net.pterodactylus.sonitus.data.Metadata.UNKNOWN_CHANNELS; import static net.pterodactylus.sonitus.data.Metadata.UNKNOWN_ENCODING; import static net.pterodactylus.sonitus.data.Metadata.UNKNOWN_FREQUENCY; @@ -33,10 +33,10 @@ import net.pterodactylus.sonitus.data.Source; import net.pterodactylus.sonitus.io.IdentifyingInputStream; import com.google.common.base.Optional; -import com.google.common.io.ByteStreams; /** - * A {@link Source} that is read from the local file system. + * A {@link net.pterodactylus.sonitus.data.Source} that is read from the local + * file system. * * @author David ‘Bombe’ Roden */ @@ -56,7 +56,7 @@ public class FileSource implements Source { * * @param path * The path of the file - * @throws IOException + * @throws java.io.IOException * if the file can not be found, or an I/O error occurs */ public FileSource(String path) throws IOException { @@ -78,20 +78,20 @@ public class FileSource implements Source { // @Override - public Metadata metadata() { - return metadata; - } - - @Override public byte[] get(int bufferSize) throws IOException { byte[] buffer = new byte[bufferSize]; - int read = ByteStreams.read(fileInputStream, buffer, 0, bufferSize); - if (read == 0) { + int read = fileInputStream.read(buffer); + if (read == -1) { throw new EOFException(); } return Arrays.copyOf(buffer, read); } + @Override + public Metadata metadata() { + return metadata; + } + // // OBJECT METHODS // diff --git a/src/main/java/net/pterodactylus/sonitus/data/source/MultiSource.java b/src/main/java/net/pterodactylus/sonitus/data/source/MultiSource.java new file mode 100644 index 0000000..b5705d5 --- /dev/null +++ b/src/main/java/net/pterodactylus/sonitus/data/source/MultiSource.java @@ -0,0 +1,118 @@ +/* + * Sonitus - MultiSource.java - Copyright © 2013 David Roden + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +package net.pterodactylus.sonitus.data.source; + +import static com.google.common.base.Preconditions.checkNotNull; + +import java.io.EOFException; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; + +import net.pterodactylus.sonitus.data.Metadata; +import net.pterodactylus.sonitus.data.Source; +import net.pterodactylus.sonitus.data.event.SourceFinishedEvent; + +import com.google.common.eventbus.EventBus; +import com.google.inject.Inject; + +/** + * {@link Source} implementation that simply forwards another source and + * supports changing the source without letting the {@link + * net.pterodactylus.sonitus.data.Sink} know. + * + * @author David ‘Bombe’ Roden + */ +public class MultiSource implements Source { + + /** The logger. */ + private static final Logger logger = Logger.getLogger(MultiSource.class.getName()); + + /** The event bus. */ + private final EventBus eventBus; + + /** The current source. */ + private final AtomicReference source = new AtomicReference(); + + /** Whether the source was changed. */ + private boolean sourceChanged; + + @Inject + public MultiSource(EventBus eventBus) { + this.eventBus = eventBus; + } + + // + // ACTIONS + // + + /** + * Sets the new source to use. + * + * @param source + * The new source to use + */ + public void setSource(Source source) { + checkNotNull(source, "source must not be null"); + + Source oldSource = this.source.getAndSet(source); + if (oldSource != null) { + synchronized (this.source) { + sourceChanged = true; + this.source.notifyAll(); + } + logger.info(String.format("Next Source set: %s", source)); + } + } + + // + // SOURCE METHODS + // + + @Override + public Metadata metadata() { + return source.get().metadata(); + } + + @Override + public byte[] get(int bufferSize) throws EOFException, IOException { + while (true) { + try { + return source.get().get(bufferSize); + } catch (EOFException eofe1) { + eventBus.post(new SourceFinishedEvent(source.get())); + synchronized (source) { + while (!sourceChanged) { + try { + logger.info("Waiting for next Source..."); + source.wait(); + logger.info("Was notified."); + } catch (InterruptedException ioe1) { + /* ignore: we’ll end up here again if we were interrupted. */ + } + } + } + } finally { + synchronized (source) { + sourceChanged = false; + } + } + } + } + +} -- 2.7.4