+ private <M extends BaseMessage> void consume(Consumer<M> consumer, M message) {
+ consume(consumer, message, "Identifier");
+ }
+
+ private <M extends BaseMessage> void consume(Consumer<M> consumer, M message,
+ String identifier) {
+ if (Objects.equals(message.getField(identifier), this.identifier.get())) {
+ consumeAlways(consumer, message);
+ }
+ }
+
+ private <M extends BaseMessage> void consumeAlways(Consumer<M> consumer, M message) {