GrabDuck

Реактивная обработка стрима логов с RxJava — Часть 1

:

image
Reactive log stream processing with RxJava — Part l

В предыдущем посте автор рассматривал случаи использования ELK стека и сбора логов.
С учетом движения в сторону микросервисов и контейнеризации приложений, централизованная обработка логов и их хранение становится де-факто стандартом.

Может быть, нам стоит попробовать сделать следующий шаг и более активно использовать полученную информацию для того, чтобы находить причины ряда проблем задолго до их появления.*

Сноска — стримы и потоки данных в данном переводе являются взаимозаменяемыми словами. Также слово лог может означать журнал, хотя в большинстве случаев в тексте мы используем иное значение

Если бы мы рассматривали лог событий как поток данных того, что происходит в реальном времени в вашей системе, было бы очень интересно анализировать в реальном времени данные и все возможные варианты их использования, к примеру, обнаружить мошенническое поведение, с помощью агрегирования различных потоков информации непосредственно во время "атаки", и сразу же заблокировать атакующего вместо того, чтобы "традиционно" собирать лог данных и заниматься расследованием уже после возникновения инцидента.

Или другой пример, мы можем отфильтровать (filter) только те события, которые соответствуют определенному типу событий, сгруппировать (group by) их по общему ключу как userID и вычислить общее количество во временном окне, получив количество событий данного типа, которые пользователь совершает в определенный период времени.

 failedLogStream()
     .window(5,TimeUnit.SECONDS)
     .flatMap(window ->
                window
                .groupBy(propertyStringValue("remoteIP"))
                .flatMap(grouped -> grouped
                    .count()
                    .map( failedLoginsCount -> {
                          final String remoteIp = grouped.getKey();
                          return new Pair<>(remoteIp, failedLoginsCount);
                    }))
     )
     .filter(pair -> pair.get > 10)
     .forEach(System.out::println);           

Мы можем инициировать запросы и в других системах и работать с их ответами как с потоками данных, на которые мы можем подписаться и применить несколько привычных операторов для работы со стримами (потоками данных), которые представлены во фреймворках reactive streams (реактивных стримов).


Учим новую парадигму разработки

Хорошо бы разобрать, что такое реактивное программирование стримов, для этого нам нет необходимости разворачивать что-то большое, такое как Kafka Streams Spark or Flink.

Реактивное программирование — это неблокирующие приложения, управляемые событиями, которые масштабируются даже при небольшом числе потоков с противодействием нагрузке (механизм обратной связи, при котором количество данных от производителей не превышает число данных принимаемых потребителями).

Самой большой темой, которую принесет Spring5, будет поддержка Реактивного программирования. Новый модуль spring-web-reactive — фреймворк, похожий на spring-web-mvc, который позволит отдавать асинхронные (неблокирующие) ответы для REST сервисов и реактивный веб-клиент, из чего следует возможность применения данного решение для микросервисной архитектуры. Концепция реактивных стримов не специфична для Spring, поскольку существует общая спецификация reactive-streams-jvm, согласованная большинством реактивных фреймворков (пока для нее, может быть, и не существует идентичного наименования, но концепция должна быть достаточно простой, чтобы стать заменой фреймворкам).

Исторически модель реактивные потоки была представлена Rx.NET, а затем при помощи Netflix портирована на java, получив название RxJava. В то же время концепция был также успешно реализована и в других языках, под названием Реактивные расширения (Reactive EXtensions). С тех пор компании движутся в том же направлении, что и спецификация реактивных стримов. Сейчас RxJava, поскольку он был первопроходцем, нуждается в значительном рефакторинге(переписании кода) — соответственно, версия 2.х лучше соответствует спецификации, и, пока Spring reactor еще новичок, компании не составит труда переписать реализацию согласно спецификации. Рекомендуем прочитать больше о том как они взаимосвязаны.

Doug Lea сообщил, что хочет включить реактивные потоки в состав объекта java.util.concurrent.Flow, а это значит, что реактивные стримы будут поставляться в составе Java 9.


Преимущества с точки зрения производительности

Также другим модным словом сейчас является микросервисная архитектура с обязательной возможностью делать запросы на множество разных сервисов. В идеале, лучше всего выполнять неблокирующие запросы, не ожидая для выполнения следующего запроса получение ответа целиком. Подумайте, вместо того чтобы ждать момента, когда какой-нибудь сервис вернет вам большой список результатов, возможно, стоит в то же время при получении первого фрагмента отправлять новый запрос в другую систему.

Never block

Если рассматривать ответ от удаленного запрос как Stream (Стрим-поток данных), подписка на который запускает action (действие) при получении ответа, то вместо блокировки потока, ожидающего свой ответ, мы можем воспользоваться меньшим числом потоков в целом, что, в свою очередь, уменьшит затраты ресурсов (к примеру, процессорное время для переключения контекста между потоками и памяти для каждого стека потоков).

Таким образом, использование реактивного программирования позволит нам на стандартном железе обработать больше логов событий, чем обычно.

Пример: сервису, такому как Gmail, нужно отображать пользовательские почтовые сообщения (emails). Тем не менее, emails, в свою очередь, могут иметь множество людей в копии (CC). Было бы прикольно отобразить фотографию для тех пользователей, которые находятся в твоих контактах, что означает вызов REST — ContactService.

Получится вот так:

Future<List<Mail>> emailsFuture = mailstoreService.getUnreadEmails();  
List<Mail> emails = emailsFuture.get(); //блокировка текущего потока  
//возможно долгое ожидание, пока не будет получен полный список данных
//можно ли запустить следующий процесс, как только первый фрагмент будет получен?

Future<List<Contacts>> contacts = getContactsForEmails(emails);  
for(Mail mail : emails) {  
  streamRenderEmails(mail, contacts); //push(отправить) emails клиенту
}

Частично проблема была решена с приходом поддержки реактивного программирования в Java 8 с CompletableFuture (со своими thenCompose, thenCombine, thenAccept и еще 50 методами, хотя это не отменяет того факта, что нужно помнить все, что они делают, а это никак не помогает в прочтении кода).

CompletableFuture<List<Mail>> emailsFuture = mailstoreService.getUnreadEmails();

CompletableFuture<List<Contact>> emailsFuture  
  .thenCompose(emails -> getContactsForEmails(emails)) //нам все еще нужно ожидать List<Mail> 
  .thenAccept(emailsContactsPair -> streamRenderEmails(emailsContactsPair.getKey(), emailsContactsPair.getValue()))

Мы можем переключиться на Iterator вместо List-а, и в тоже время не существует методов, говорящих выполнить какое-либо действие при появлении новых значений. В SQL есть такая возможность, к примеру, ResultSet (в котором вы можете выполнить rs.next()) вместо загрузки всех данных в память.

public interface Iterator<E> {  
    /**
     * Возвращает {@code true}, если в итерации больше элементов.
     */
    boolean hasNext();

    /**
     * Возвращает следующий элемент итерации.
     */
    E next();
}

Но нам все еще нужно постоянно спрашивать "а у тебя есть новое значение?"

Iterable<Mail> emails = mailstoreService.getUnreadEmails();  
Iterator<Mail> emailsIt = emails.iterator();

while(emailsIt.hasNext()) {  
  Mail mail = emailsIt.next(); //неблокирующее действие все равно тратит много процессорного времени на получение новых значений
  if(mail != null) {
      ....
  }
}

Что нам нужно, так это реактивный итератор, такой тип данных, который сможет подписаться и выполнить действие, как только получено новое значение. Именно здесь начинается reactive stream programming (реактивное программирование стримов).


Так что же такое Stream?

Everything is a stream

Stream по-простому это последовательность упорядоченных во времени событий (событиеX происходит после событияY, так что события не конкурируют между собой).

Stream смоделирован так, что выпускает 0..N событий и одну из двух терминальных операций:


  • событие завершения, через которое подписчикам сообщается, что выпуск данных окончен
  • событие об ошибке, информирующее об окончании стрима с ошибкой (исключением)

Мы можем описать это визуально с помощью 'marble diagrams'.

Marble diagram for Observable

Таким образом мы можем представить, что Stream-ом является все, а не только лог событий. Даже единичное значение может быть выражено как Стрим, выпускающий значение, за которым следует событие о завершении.

Бесконечный стрим — стрим, который выпускает события, но без единого терминального события(завершения | ошибки).

RxJava определяет Observable (Наблюдаемый) тип данных для моделирования Stream-а событий типа . В Spring Reactor-е он равен типу Flux.

Observable представляет собой stream температур, взятых с различными интервалами.

Observable представляет собой stream продуктов, купленных в нашем веб магазине.

Observable представляет собой одного пользователя (User), вернувшегося по запросу к БД.

    public Observable<User> findByUserId(String userId) {...}
    //пусть будет Single для большей наглядности 
    public Single<User> findByUserId(String userId) {...}

Но Observable это просто тип данных, поэтому, как и в случае с шаблоном проектирования Publish/Subscriber (Публикации/Подписчика), нам нужен Subscriber(Подписчик) чтобы обработать 3 типа событий

        Observable<CartItem> cartItemsStream = ...;

        Subscriber<CartItem> subscriber = new Subscriber<CartItem>() {
            @Override
            public void onNext(CartItem cartItem) {
                System.out.println("Cart Item added " + cartItem);
            }

            @Override
            public void onCompleted() {
            }

            @Override
            public void onError(Throwable e) {
                e.printStackTrace();
            }
        };

        cartItemsStream.subscribe(subscriber);

Реактивные операторы

Но это просто часть Stream-a, а до сих пор мы не применяли ничего необычного, только классический шаблон проектирования Observer (Наблюдатель).

Reactive (Реактивная) же часть означает, что мы можем определить некоторые Function (операторы — функции), которые будут выполнены, когда stream запустит событие.

Это значит, что будет создан другой stream (стримы immutable), на который мы можем подписать другой оператор и т.д.

Observable<CartItem> filteredCartStream = cartStream.filter(new Func1<CartItem, Boolean>() {  
            @Override
            public Boolean call(CartItem cartItem) {
                return cartItem.isLaptop();
            }
        });

Observable<Long> laptopCartItemsPriceStream = filteredCartStream.map(new Func1<CartItem, Long>() {  
            @Override
            public Long call(CartItem cartItem) {
                try {
                    return priceService.getPrice(cartItem.getId());
                } catch(PriceServiceException e) {
                    thrown new RuntimeException(e);
                }
            }
        });

Поскольку операторы (методы) класса Observable (filter, map, groupBy,...) возвращают Observable, это значит, что мы можем использовать цепочку из операторов, чтобы объединить их с лямбда синтаксисом и написать что-нибудь красивое.

Observable<BigDecimal> priceStream = cartStream  
                        .filter((cartItem) -> cartItem.isLaptop()).
                        .map((laptop) -> {
                             try {
                                  return priceService.getPrice(cartItem.getId());
                            } catch(PriceServiceException e) {
                                 thrown new RuntimeException(e);
                            }
                        });

Обратите внимание, что выше, когда создается priceStream, ничего не происходит — priceService.getPrice() не вызывается, пока нет элемента, проходящего по цепочке операторов. Это означает, что мы создали через rx-оператор подобие плана, как управляемые данные будут спускаться вниз по цепочке(подписание регистрируется).

Когда просят объяснить реактивное программирование, обычно в шутку дают пример с Excel листами, где в колонках написаны формулы, вызываемые при обновлении ячейки, что, в свою очередь, обновляет другую ячейку, которая, в свою очередь, обновляет другую и так далее по цепочке.

Прямо как rx-operator, который ничего не делает, эти формулы просто управляют данными и каждая из них получает свой шанс сделать что-то до того момента, как данные спустились вниз по цепочке.

Чтобы лучше понимать, как события путешествуют вместе с цепочкой операторов, я нашел полезную аналогию, в примере с переездом из одного дома в другой, грузчики выступают в роли операторов, вместе с которым перемещаются вещи из вашего дома — так это изобразил Томас Нильд.

Его пример с кодом:

Observable<Item> mover1 = Observable.create(s -> {  
   while (house.hasItems()) {
    s.onNext(house.getItem());
   }
   s.onCompleted();
});

Observable<Item> mover2 = mover1.map(item -> putInBox(item));

Subscription mover3 = mover2.subscribe(box -> putInTruck(box),  
   () -> closeTruck()); //это сработает на событие OnCompleted()

"Грузчик 1 с одной стороны источник Observable. Он создает выбросы за счет того, что выносит вещи из дома. Он вызывает Грузчика 2 с методом onNext(), который выполняет map() операцию. Когда его метод onNext() вызван, он берет вещь и перекладывает в коробку. Затем он вызывает Грузчика 3, конечного Subscriber(подписчика), с методом onNext(), который грузит коробку в машину."

Магия RxJava — это большой набор доступных операторов, ваша же работа заключается в комбинировании их всех вместе для управления течением данных.

Множество Stream операторов помогают составить словарь терминов для обозначения действий, производимых со стримами, которые могут быть реализованы в популярных языках (RxJava, RxJS, Rx.NET, etc) из числа ReactiveX framework (Реактивных расширений).

Эти понятия надо знать даже при использовании различных фреймворков для работы с реактивными стримами, таких как Spring Reactor (в надежде на наличие неких операторов, общих для этих фреймворков).

Пока что мы видели только простые операторы, такие как фильтрация:

**Filter**

Которые только пропускают элементы, попадающие под условие фильтра (один грузчик перенесет только те вещи, которые стоят меньше 100$, вместо того, чтобы передать сразу все другому грузчику)

Однако есть операторы, которые могут разбить стрим на множество отдельных стримов — Observable<Observable<T>>(Стрим стримов) — это такие операторы, как groupBy

> **group by**

        Observable<Integer> values = Observable.just(1,4,5,7,8,9,10);
        Observable<GroupedObservable<String, Integer>> oddEvenStream = values.groupBy((number) -> number % 2 == 0 ? "odd":"even");
        Observable<Integer> remergedStream = Observable.concat(oddEvenStream);
        remergedStream.subscribe(number -> System.out.print(number +" "));
//Выводит
//1 5 7 9 4 8 10 

и достаточно простой оператор concat, который снова из четных и нечетных стримов создает один-единственный стрим, и устанавливает на него подписку.
> **Concat**

Мы видим, что оператор concat ожидает завершения стрима перед тем, как добавить еще один, снова создавая один стрим. Таким образом, нечетные числа отображаются первыми.

Также у нас есть возможность скомбинировать вместе множество stream-ов, как, например, это делает zip оператор
> **Zip operator**

Zip назван так не потому, что он работает как архиватор, но скорее из-за то, что он, как молния (на куртке), комбинирует события из двух stream-ов.

> Молния (защелка)

Он берет одно событие из одного stream-а и соединяет с событием из другого (делая пару). Как только это выполнено, он применяет оператор склеивания, перед тем, как спуститься дальше по цепочке.

PS: это работает и для большего количества stream-ов.

Так что, даже если один stream выпускает события быстрее, то дальше слушатель увидит только скомбинированное событие, которое выпустится из более медленного stream-а.

Иметь возможность "ждать" ответа от множества удаленных вызовов, которые мы получаем от stream-ов, на самом деле очень полезно.

С другой стороны, оператор combineLatest не ожидает выпуска пары событий, но вместо этого, использует последние выпущенные события из более медленного stream-а, до применения функции склейки и передачи его дальше по цепочки.

> Combine latest


Движемся к мышлению на основе Push подхода

Давайте рассмотрим несколько примеров, как на самом деле создают Observables. Наиболее длинный вариант создания:

        log("Before create Observable");
        Observable<Integer> someIntStream = Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        log("Create");
                        emitter.onNext(3);
                        emitter.onNext(4);
                        emitter.onNext(5);
                        emitter.onComplete();
                        log("Completed");
                    }
                });
        log("After create Observable");

        log("Subscribing 1st");
        someIntStream.subscribe((val) -> LOGGER.info("received " + val));
        //мы можем опустить реализацию
        //другие методы(for onError and onComplete) если мы ее хотим сделать, что-то особенное

        log("Subscribing 2nd");
        someIntStream.subscribe((val) -> LOGGER.info("received " + val));

События посылаются подписчику, как только тот подписался.
Не то, что мы пользуемся такой конструкцией, просто мы передали новый объект ObservableOnSubscribe, который демонстрирует, что делать, когда на него кто-нибудь подписывается.

Пока мы не подписались на Observable, выходных данных нет и ничего не происходит, данные не движутся.

Когда кто-то подписывается, вызывается метод call() и 3 сообщения проталкиваются вниз по цепочке, следом за ними идет сигнал, что stream завершился.

Выше мы дважды подписались, код внутри метода call(...) также будет вызван дважды. Так что он эффективно переотправляет те же значения, как только кто-нибудь еще подпишется и тогда получит следующие значения на вывод:

mainThread: Before create Observable  
mainThread: After create Observable  
mainThread: Subscribing 1st  
mainThread: Create  
mainThread: received 3  
mainThread: received 4  
mainThread: received 5  
mainThread: Completed  
mainThread: Subscribing 2nd  
mainThread: Create  
mainThread: received 3  
mainThread: received 4  
mainThread: received 5  
mainThread: Completed  

Важно заметить, что rx операторы не обязательно означают многопоточность. RxJava не внедряет конкурентность по умолчанию между Observable и Subscriber. Поэтому все вызовы происходят на "главном" потоке.

Такой тип Observable, которые начинают распространение, когда кто-нибудь подписан называются cold observables(холодные наблюдатели). Другой вид — hot observables(горячие наблюдатели), они могут выпускать события, даже когда никто не подписан на них.


  • Cold Observables начинают распространять события только когда кто-то подписывается. Каждый подписчик получает одни и те же события. Например, как CD на котором играют одни и те же песни для того, кто включил cd в плеер послушать.
  • Hot Observables события распространяются даже когда на них еще никто не подписался. Как радиостанция которая проигрывает песни через радиовещание, даже когда никто его не включал. И так же, как при включении радиоприемника, ты пропускаешь предшествующие события. Hot observable моделирует события, распространением которых вы не можете управлять. Примерно как в случае записи событий в журнал (лог событий).

Subjects это такой специальный вид Observable, который также является Observer (как Subscriber — который решает, что он может протолкнуть данные (вызовом onNext()) до них) и сделает реализацию горячих Observables проще. Также существует множество реализаций, подобных ReplaySubject, которые сохраняют выделенные события в буфере и воспроизводят их по подписке (конечно, можно указать размер буфера, чтобы предотвратить ошибку OutOfMemory), пока PublishSubject только пропускает события, которые случились после подписания.
И конечно, существует множество статических методов для создания Observables и из других источников.

Observable.just("This", "is", "something")  
Observable.from(Iterable<T> collection)  
Observable.from(Future<T> future) - передает значение после того, как `future` выполнится 

Добавление в наш ELK-стек RabbitMQ эмиттера данных, отправляемых посредством push

По традиции, работая со стеком ELK, мы используем ElasticSearch чтобы запросить данные по логу событий, так что можно сказать, что они в опрашивающем стиле 'pull-based'.

Можем ли вместо этого иметь push-based, где мы собираемся информировать 'незамедлительно' при появлении события в журнале, чтобы дальше уменьшить время реагирования на событие, с того момента как оно произошло и до того как мы начнем его обрабатывать.

Одним из множества возможных решение может стать RabbitMq, как бывалое в сражениях решение с очень хорошей репутацией за его производительность, за его возможность обработки огромного числа сообщений. Несмотря на это Logstash уже поддерживает плагин для RabbitMQ (также есть еще один плагин для FluentD) так что мы можем с легкостью интегрировать его в наш существующий ELK стек и записывать логи в ElasticSearch и RabbitMQ.

Возможно вы помните, что Logstash может вести себя как контроллер, и выбирать как ему работать, и куда отправлять/сохранять логируемые событий. Это значит что мы можем отфильтровать те события, которые хотим обработать или указать куда их послать, например, в другие RabbitMQ очереди.

Существует даже возможность напрямую отправлять данные в RabbitMQ через Logback Appender, если вам захочется опустить использование Logstash.

К слову сказать: Пока так называемый AmqpAppender, является скорее специфичной реализацией RabbitMQ AMQP (с версией протокола AMQP 0-9-1, 0-9).

ActiveMQ для примера (пока тоже поддерживает AMQP connector) кажется реализует протокол версии AMQP 1.0, пока spring-amqp библиотека с протоколом версий 0-9-1, 0-9, которые совсем отличаются от 1.0), так что вы может столкнетесь с ошибками по типу 'org.apache.activemq.transport.amqp.AmqpProtocolException: Connection from client using unsupported AMQP attempted'

Однако наше решение было чтобы использовать logstash-logback-encoder и отправлять отформатированный JSON с журналом событий в Logstash. Мы перенаправим logstash вывод на точку обмена RabbitMQ (exchange).

Мы будем использовать docker-compose, чтобы запустить кластер logstash-rabbitmq
Вы можете склонировать репозиторий

docker-compose -f docker-compose-rabbitmq.yml up
и затем вы можете использовать
./event-generate.sh
чтобы сгенерировать некоторое число случайных событий которые будут отправлены на logstash.

Для того, чтобы определить, куда отправлять данные, используйте файл настроек logstash. Мы используем rabbitmq-output-plugin, как ссылку:

output {  
    rabbitmq {
        exchange => logstash
        exchange_type => direct
        host => rabbitmq
        key => my_app
    }
}

RabbitMQ это не классический JMS сервер, вместо этого он использует AMQP протокол, который имеет весьма отличную от остальных концепцию для очередей.

amqp

Издатель отправляет сообщения на именованную точку обмена (exchange) и потребитель забирает сообщения из очереди.

У сообщения есть стандартный заголовок 'routing-key', который используется в процессе называемом ассоциативным привязкой, чтобы связать обмен сообщениями в очереди. Очереди могут фильтровать какие сообщения они получают через ключ привязки, и к тому же можно использовать подставленные знаки в привязке как эти 'logstash.'

Для более подробного объяснения AMQP вы можете прочитать здесь и тут. Так мы настроили Spring соединение c RabbitMq

    @Bean
    ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory(host, port);
    }

    @Bean
    RabbitAdmin rabbitAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
        rabbitAdmin.declareQueue(queue());
        rabbitAdmin.declareBinding(bindQueueFromExchange(queue(), exchange()));
        return rabbitAdmin;
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);

        return container;
    }

    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

    DirectExchange exchange() {
        return new DirectExchange("logstash");
    }

    private Binding bindQueueFromExchange(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("my_app");
    }

    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(receiver,
                new MessageConverter() {
            public Message toMessage(Object o, MessageProperties messageProperties)
                    throws MessageConversionException {
                throw new RuntimeException("Unsupported");
            }

            public String fromMessage(Message message) throws MessageConversionException {
                try {
                    return new String(message.getBody(), "UTF-8");
                } catch (UnsupportedEncodingException e) {
                    throw new RuntimeException("UnsupportedEncodingException");
                }
            }
        });
        messageListenerAdapter.setDefaultListenerMethod("receive"); //the method in our Receiver class
        return messageListenerAdapter;
    }

    @Bean
    Receiver receiver() {
        return new Receiver();
    }

Мы определили очередь и связываем с сервисом обмена 'logstash', чтобы принять сообщения с ключем маршрутизации 'my_app'. MessageListenerAdapter выше определяет, что метод 'receive' должен быть вызван на бине Receiver каждый раз, когда новое сообщение приходит из очереди.

Поскольку мы ожидаем непрерывный поток в лог событий, мы не имеем над ним контроль, мы можем подумать об использовании hot observable, которые распространяют события всем подписчикам после того, как они подписались, так что мы используем для работы PublishSubject.

public class Receiver {  
    private PublishSubject<JsonObject> publishSubject = PublishSubject.create();

    public Receiver() {
    }

    /**
     * Method invoked by Spring whenever a new message arrives
     * @param message amqp message
     */
    public void receive(Object message) {
        log.info("Received remote message {}", message);
        JsonElement remoteJsonElement = gson.fromJson ((String) message, JsonElement.class);
        JsonObject jsonObj = remoteJsonElement.getAsJsonObject();

        publishSubject.onNext(jsonObj);
    }

    public PublishSubject<JsonObject> getPublishSubject() {
        return publishSubject;
    }
}

Мы должны знать, что событие SimpleMessageListenerContainer поддерживает наличие более одного потока, который потребляет из очереди (и пропускает события вниз по цепочки). Однако контракт Observable говорит, что мы не можем выделять события конкурентно (вызовы onNext,onComplete, onError должны быть сериализованы):

// ТАК ДЕЛАТЬ НЕ НАДО
Observable.create(s -> {  
                    // Thread A
                    new Thread(() -> {
                        s.onNext("one");
                        s.onNext("two");
                    }).start();

                    // Thread B
                    new Thread(() -> {
                        s.onNext("three");
                        s.onNext("four");
                    }).start();
                });
// ТАК ДЕЛАТЬ НЕ НАДО

//ДЕЛАЙТЕ ТАК
Observable<String> obs1 = Observable.create(s -> {  
                    // Thread A
                    new Thread(() -> {
                        s.onNext("one");
                        s.onNext("two");
                    }).start();
                  });

Observable<String> obs2 = Observable.create(s -> {  
                    // Thread B
                    new Thread(() -> {
                        s.onNext("three");
                        s.onNext("four");
                    }).start();
                });

Observable<String> c = Observable.merge(obs1, obs2); 

Мы можем обойти эту проблему вызывая Observable.serialize() или Subject.toSerialized(), но мы остаемся со значение по умолчанию 1 Thread в ListenerContainer, нет необходимости делать это. Еще вы должны быть в курсе того, что если вы планируете использовать Subjects как шину событий, распространяющихся на несколько потоков. Прочитайте подробное объяснение.

А сейчас вы можете взглянуть на код и репозиторий, как продолжение этого длинного поста Part II (Части 2) или перейдите на Rx Playground там вы найдете больше примеров.
Ссылка на сайт переводчика