Anatomia de uma contrapressão em correntes de jato

Ao ler vários artigos sobre o tópico de fluxos reativos, o leitor pode chegar à conclusão de que:



  • contrapressão é legal
  • contrapressão está disponível apenas em bibliotecas que implementam a especificação de fluxos reativos
  • esta especificação é tão complexa que você nem deveria tentar implementá-la sozinho


Neste artigo, tentarei mostrar que:



  • contrapressão é muito simples
  • para implementar uma contrapressão assíncrona, basta fazer uma versão assíncrona do semáforo
  • se houver uma implementação de semáforo assíncrona, a interface org.reactivestreams.Publisher é implementada em algumas dezenas de linhas de código


A contrapressão é um feedback que ajusta a velocidade do produtor de dados para corresponder à velocidade do consumidor. Na ausência de tal conexão, o produtor mais rápido pode estourar o buffer do consumidor ou, se o buffer for adimensional, esgotar toda a RAM.



Na programação multithread, esse problema foi resolvido por Dijkstroy, que propôs um novo mecanismo de sincronização - o semáforo. Um semáforo pode ser considerado um contador de permissão. Presume-se que o produtor solicita permissão do semáforo antes de cometer uma ação que consome muitos recursos. Se o semáforo estiver vazio, o encadeamento do produtor será bloqueado.



Os programas assíncronos não podem bloquear threads, portanto, eles não podem acessar um semáforo vazio para obter permissão (mas podem fazer todas as outras operações do semáforo). Eles devem bloquear sua execução de outra maneira. Essa outra maneira é que eles apenas deixam o thread de trabalho em que estavam executando, mas antes disso, combinam de voltar ao trabalho assim que o semáforo estiver cheio.



A maneira mais elegante de pausar e retomar um programa assíncrono é estruturá-lo como um ator de fluxo de dados com portas :







Um modelo de fluxo de dados - atores com portas, as conexões direcionadas entre suas portas e tokens iniciais. Retirado de: A Structured Description Of Dataflow Actors And Your Application



Existem portas de entrada e saída. As portas de entrada recebem tokens (mensagens e sinais) das portas de saída de outros atores. Se a porta de entrada contiver tokens e a porta de saída tiver um local para colocar tokens, ela será considerada ativa. Se todas as portas do ator estiverem ativas, ele é enviado para execução. Assim, ao retomar seu trabalho, o programa ator pode ler tokens com segurança das portas de entrada e gravar no fim de semana. Este mecanismo simples contém toda a sabedoria da programação assíncrona. A alocação de portas como subobjetos separados de atores simplifica muito a codificação de programas assíncronos e permite aumentar sua diversidade ao combinar portas de diferentes tipos.



O ator Hewitt clássico contém 2 portas - uma é visível, com um buffer para mensagens de entrada, a outra é um binário oculto que bloqueia quando o ator é enviado para execução e, portanto, impede que o ator reinicie até o final do lançamento inicial. O semáforo assíncrono desejado é um cruzamento entre essas duas portas. Como um buffer de mensagem, ele pode armazenar muitos tokens e, como uma porta oculta, esses tokens são pretos, ou seja, indistinguíveis, como nas redes de Petri, e um contador de tokens é suficiente para armazená-los.



No primeiro nível da hierarquia, temos uma classe AbstractActorcom três classes aninhadas - base Porte derivadas AsyncSemaPorte InPort, além de um mecanismo de lançamento de um ator para execução na ausência de portas bloqueadas. Resumindo, é assim:



public abstract class AbstractActor {
    /**    */
    private int blocked = 0;

    protected synchronized void restart() {
            controlPort.unBlock();
    }

    private synchronized void incBlockCount() {
        blocked++;
    }

    private synchronized void decBlockCount() {
        blocked--;
        if (blocked == 0) {
            controlPort.block();
            excecutor.execute(this::run);
        }
    }

    protected abstract void turn() throws Throwable;

    /**   */
    private void run() {
        try {
            turn();
            restart();
        } catch (Throwable throwable) {
            whenError(throwable);
        }
    }
}


Ele contém um conjunto mínimo de classes de portas:



Port- classe base de todas as portas



    protected  class Port {
        private boolean isBlocked = true;

        public Port() {
            incBlockCount();
        }

        protected synchronized void block() {
            if (isBlocked) {
                return;
            }
            isBlocked = true;
            incBlockCount();
        }

        protected synchronized void unBlock() {
            if (!isBlocked) {
                return;
            }
            isBlocked = false;
            decBlockCount();
        }
    }


Semáforo assíncrono:



    public class AsyncSemaPort extends Port {
        private long permissions = 0;

        public synchronized void release(long n) {
            permissions += n;
            if (permissions > 0) {
                unBlock();
            }
        }

        public synchronized void aquire(long delta) {
            permissions -= delta;
            if (permissions <= 0) { 
                //    
                //        ,
                //       
                block();
            }
        }
    }


InPort - buffer mínimo para uma mensagem de entrada:



    public class InPort<T> extends Port implements OutMessagePort<T> {
        private T item;

        @Override
        public void onNext(T item) {
            this.item = item;
            unBlock();
        }

        public synchronized T poll() {
            T res = item;
            item = null;
            return res;
        }
    }


A versão completa da aula AbstractActorpode ser vista aqui.



No próximo nível da hierarquia, temos três atores abstratos com portas específicas, mas com rotinas de processamento indefinidas:



  • uma classe AbstractProduceré um ator com uma porta do tipo semáforo assíncrono (e uma porta de controle interno, presente em todos os atores por padrão).
  • a classe AbstractTransformeré um ator Hewitt regular, com uma referência à porta de entrada do próximo ator na cadeia, para onde envia os tokens convertidos.
  • a classe AbstractConsumertambém é um ator comum, mas não envia os tokens convertidos a lugar nenhum, embora tenha um link para o semáforo produtor, e abre esse semáforo após absorver o token de entrada. Isso mantém o número de tokens em processo constante e nenhum estouro de buffer ocorre.


No último nível, já no diretório de teste, são definidos atores específicos usados ​​nos testes :



  • a classe ProducerActorgera um fluxo finito de inteiros.
  • a classe TransformerActorpega o próximo número do fluxo e o envia pela cadeia.
  • classe ConsumerActor- aceita e imprime os números resultantes


Agora podemos construir uma cadeia de manipuladores de trabalho paralelos assíncronos da seguinte forma: produtor - qualquer número de transformadores - consumidor







Assim, implementamos uma contrapressão, e mesmo de uma forma mais geral do que na especificação de fluxos reativos - o feedback pode abranger um número arbitrário de cascatas de processamento, e não apenas adjacentes, como na especificação.



Para implementar a especificação, você precisa definir uma porta de saída que seja sensível ao número de permissões passadas a ela usando o método request () - isso será Publisher, e suplementar a existente com uma InPortchamada para este método - isso será Subscriber. Ou seja, assumimos que as interfaces PublishereSubscriberdescreve o comportamento dos portos, não dos atores. Mas a julgar pelo fato de que na lista de interfaces também existe Processor, que de forma alguma pode ser uma interface de porta, os autores da especificação consideram suas interfaces como interfaces de ator. Bem, podemos criar atores que implementam todas essas interfaces delegando a execução das funções da interface às portas correspondentes.



Para simplificar, deixe o nosso Publishernão ter seu próprio buffer e escreverá diretamente no buffer Subscriber. Para fazer isso, você precisa de alguém para se Subscriberinscrever e cumprir request(), ou seja, temos 2 condições e, para isso, precisamos de 2 portas - InPort<Subscriber>e AsyncSemaPort. Nenhum deles é adequado como base para implementaçãoPublisher'a, uma vez que contém métodos desnecessários, vamos tornar essas portas variáveis ​​internas:



public class ReactiveOutPort<T> implements Publisher<T>, Subscription, OutMessagePort<T> {
    protected AbstractActor.InPort<Subscriber<? super T>> subscriber;
    protected AbstractActor.AsyncSemaPort sema;

    public ReactiveOutPort(AbstractActor actor) {
        subscriber = actor.new InPort<>();
        sema = actor.new AsyncSemaPort();
    }
}


Desta vez, ReactiveOutPortnão definimos a classe como aninhada, portanto, era necessário um parâmetro construtor, uma referência ao ator envolvente, para instanciar as portas definidas como classes aninhadas.



O método se subscribe(Subscriber subscriber)resume a salvar o assinante e ligar subscriber.onSubscribe():



    public synchronized void subscribe(Subscriber<? super T> subscriber) {
        if (subscriber == null) {
            throw new NullPointerException();
        }
        if (this.subscriber.isFull()) {
            subscriber.onError(new IllegalStateException());
            return;
        }
        this.subscriber.onNext(subscriber);
        subscriber.onSubscribe(this);
    }


o que geralmente resulta em uma chamada Publisher.request()que se resume a aumentar o semáforo com uma chamada AsyncSemaPort.release():



    public synchronized void request(long n) {
        if (subscriber.isEmpty()) {
            return; // this spec requirement
        }
        if (n <= 0) {
            subscriber.current().onError(new IllegalArgumentException());
            return;
        }
        sema.release(n);
    }


E agora não podemos esquecer de baixar o semáforo utilizando uma chamada AsyncSemaPort.aquire()no momento da utilização do recurso:



    public synchronized void onNext(T item) {
        Subscriber<? super T> subscriber = this.subscriber.current();
        if (subscriber == null) {
            throw  new IllegalStateException();
        }
        sema.aquire();
        subscriber.onNext(item);
    }


O projeto AsyncSemaphore foi desenvolvido especialmente para este artigo. É intencionalmente feito o mais compacto possível para não cansar o leitor. Como resultado, ele contém limitações significativas:



  • Publisher' Subscriber'
  • Subscriber' 1


Além disso, AsyncSemaPortnão é um análogo completo de um semáforo síncrono - apenas um cliente pode executar a operação aquire()y AsyncSemaPort(ou seja, o ator de inclusão). Mas isso não é uma desvantagem - AsyncSemaPortele desempenha bem sua função. Em princípio, você pode fazer isso de forma diferente - pegue java.util.concurrent.Semaphoree complemente com uma interface de assinatura assíncrona (consulte AsyncSemaphore.java do projeto DF4J ). Esse semáforo pode ligar atores e threads de execução em qualquer ordem.



Em geral, cada tipo de interação síncrona (bloqueio) tem sua própria contraparte assíncrona (não bloqueadora). Portanto, no mesmo projeto DF4J existe uma implementaçãoBlockingQueue, complementado por uma interface assíncrona. Isso abre a possibilidade de uma transformação passo a passo de um programa multithread em um assíncrono, parte por parte substituindo threads por atores.



All Articles