- contrapressão é legal
- contrapressão está disponível apenas em bibliotecas que implementam a especificação de fluxos reativos
- esta especificação é tão complexa que você nem deveria tentar implementá-la sozinho
Neste artigo, tentarei mostrar que:
- contrapressão é muito simples
- para implementar uma contrapressão assíncrona, basta fazer uma versão assíncrona do semáforo
- se houver uma implementação de semáforo assíncrona, a interface org.reactivestreams.Publisher é implementada em algumas dezenas de linhas de código
A contrapressão é um feedback que ajusta a velocidade do produtor de dados para corresponder à velocidade do consumidor. Na ausência de tal conexão, o produtor mais rápido pode estourar o buffer do consumidor ou, se o buffer for adimensional, esgotar toda a RAM.
Na programação multithread, esse problema foi resolvido por Dijkstroy, que propôs um novo mecanismo de sincronização - o semáforo. Um semáforo pode ser considerado um contador de permissão. Presume-se que o produtor solicita permissão do semáforo antes de cometer uma ação que consome muitos recursos. Se o semáforo estiver vazio, o encadeamento do produtor será bloqueado.
Os programas assíncronos não podem bloquear threads, portanto, eles não podem acessar um semáforo vazio para obter permissão (mas podem fazer todas as outras operações do semáforo). Eles devem bloquear sua execução de outra maneira. Essa outra maneira é que eles apenas deixam o thread de trabalho em que estavam executando, mas antes disso, combinam de voltar ao trabalho assim que o semáforo estiver cheio.
A maneira mais elegante de pausar e retomar um programa assíncrono é estruturá-lo como um ator de fluxo de dados com portas :

Um modelo de fluxo de dados - atores com portas, as conexões direcionadas entre suas portas e tokens iniciais. Retirado de: A Structured Description Of Dataflow Actors And Your Application
Existem portas de entrada e saída. As portas de entrada recebem tokens (mensagens e sinais) das portas de saída de outros atores. Se a porta de entrada contiver tokens e a porta de saída tiver um local para colocar tokens, ela será considerada ativa. Se todas as portas do ator estiverem ativas, ele é enviado para execução. Assim, ao retomar seu trabalho, o programa ator pode ler tokens com segurança das portas de entrada e gravar no fim de semana. Este mecanismo simples contém toda a sabedoria da programação assíncrona. A alocação de portas como subobjetos separados de atores simplifica muito a codificação de programas assíncronos e permite aumentar sua diversidade ao combinar portas de diferentes tipos.
O ator Hewitt clássico contém 2 portas - uma é visível, com um buffer para mensagens de entrada, a outra é um binário oculto que bloqueia quando o ator é enviado para execução e, portanto, impede que o ator reinicie até o final do lançamento inicial. O semáforo assíncrono desejado é um cruzamento entre essas duas portas. Como um buffer de mensagem, ele pode armazenar muitos tokens e, como uma porta oculta, esses tokens são pretos, ou seja, indistinguíveis, como nas redes de Petri, e um contador de tokens é suficiente para armazená-los.
No primeiro nível da hierarquia, temos uma classe
AbstractActor
com três classes aninhadas - base Port
e derivadas AsyncSemaPort
e InPort
, além de um mecanismo de lançamento de um ator para execução na ausência de portas bloqueadas. Resumindo, é assim:
public abstract class AbstractActor {
/** */
private int blocked = 0;
protected synchronized void restart() {
controlPort.unBlock();
}
private synchronized void incBlockCount() {
blocked++;
}
private synchronized void decBlockCount() {
blocked--;
if (blocked == 0) {
controlPort.block();
excecutor.execute(this::run);
}
}
protected abstract void turn() throws Throwable;
/** */
private void run() {
try {
turn();
restart();
} catch (Throwable throwable) {
whenError(throwable);
}
}
}
Ele contém um conjunto mínimo de classes de portas:
Port
- classe base de todas as portas
protected class Port {
private boolean isBlocked = true;
public Port() {
incBlockCount();
}
protected synchronized void block() {
if (isBlocked) {
return;
}
isBlocked = true;
incBlockCount();
}
protected synchronized void unBlock() {
if (!isBlocked) {
return;
}
isBlocked = false;
decBlockCount();
}
}
Semáforo assíncrono:
public class AsyncSemaPort extends Port {
private long permissions = 0;
public synchronized void release(long n) {
permissions += n;
if (permissions > 0) {
unBlock();
}
}
public synchronized void aquire(long delta) {
permissions -= delta;
if (permissions <= 0) {
//
// ,
//
block();
}
}
}
InPort
- buffer mínimo para uma mensagem de entrada:
public class InPort<T> extends Port implements OutMessagePort<T> {
private T item;
@Override
public void onNext(T item) {
this.item = item;
unBlock();
}
public synchronized T poll() {
T res = item;
item = null;
return res;
}
}
A versão completa da aula
AbstractActor
pode ser vista aqui.
No próximo nível da hierarquia, temos três atores abstratos com portas específicas, mas com rotinas de processamento indefinidas:
- uma classe
AbstractProducer
é um ator com uma porta do tipo semáforo assíncrono (e uma porta de controle interno, presente em todos os atores por padrão). - a classe
AbstractTransformer
é um ator Hewitt regular, com uma referência à porta de entrada do próximo ator na cadeia, para onde envia os tokens convertidos. - a classe
AbstractConsumer
também é um ator comum, mas não envia os tokens convertidos a lugar nenhum, embora tenha um link para o semáforo produtor, e abre esse semáforo após absorver o token de entrada. Isso mantém o número de tokens em processo constante e nenhum estouro de buffer ocorre.
No último nível, já no diretório de teste, são definidos atores específicos usados nos testes :
- a classe
ProducerActor
gera um fluxo finito de inteiros. - a classe
TransformerActor
pega o próximo número do fluxo e o envia pela cadeia. - classe
ConsumerActor
- aceita e imprime os números resultantes
Agora podemos construir uma cadeia de manipuladores de trabalho paralelos assíncronos da seguinte forma: produtor - qualquer número de transformadores - consumidor

Assim, implementamos uma contrapressão, e mesmo de uma forma mais geral do que na especificação de fluxos reativos - o feedback pode abranger um número arbitrário de cascatas de processamento, e não apenas adjacentes, como na especificação.
Para implementar a especificação, você precisa definir uma porta de saída que seja sensível ao número de permissões passadas a ela usando o método request () - isso será
Publisher
, e suplementar a existente com uma InPort
chamada para este método - isso será Subscriber
. Ou seja, assumimos que as interfaces Publisher
eSubscriber
descreve o comportamento dos portos, não dos atores. Mas a julgar pelo fato de que na lista de interfaces também existe Processor
, que de forma alguma pode ser uma interface de porta, os autores da especificação consideram suas interfaces como interfaces de ator. Bem, podemos criar atores que implementam todas essas interfaces delegando a execução das funções da interface às portas correspondentes.
Para simplificar, deixe o nosso
Publisher
não ter seu próprio buffer e escreverá diretamente no buffer Subscriber
. Para fazer isso, você precisa de alguém para se Subscriber
inscrever e cumprir request()
, ou seja, temos 2 condições e, para isso, precisamos de 2 portas - InPort<Subscriber>
e AsyncSemaPort
. Nenhum deles é adequado como base para implementaçãoPublisher
'a, uma vez que contém métodos desnecessários, vamos tornar essas portas variáveis internas:
public class ReactiveOutPort<T> implements Publisher<T>, Subscription, OutMessagePort<T> {
protected AbstractActor.InPort<Subscriber<? super T>> subscriber;
protected AbstractActor.AsyncSemaPort sema;
public ReactiveOutPort(AbstractActor actor) {
subscriber = actor.new InPort<>();
sema = actor.new AsyncSemaPort();
}
}
Desta vez,
ReactiveOutPort
não definimos a classe como aninhada, portanto, era necessário um parâmetro construtor, uma referência ao ator envolvente, para instanciar as portas definidas como classes aninhadas.
O método se
subscribe(Subscriber subscriber)
resume a salvar o assinante e ligar subscriber.onSubscribe()
:
public synchronized void subscribe(Subscriber<? super T> subscriber) {
if (subscriber == null) {
throw new NullPointerException();
}
if (this.subscriber.isFull()) {
subscriber.onError(new IllegalStateException());
return;
}
this.subscriber.onNext(subscriber);
subscriber.onSubscribe(this);
}
o que geralmente resulta em uma chamada
Publisher.request()
que se resume a aumentar o semáforo com uma chamada AsyncSemaPort.release()
:
public synchronized void request(long n) {
if (subscriber.isEmpty()) {
return; // this spec requirement
}
if (n <= 0) {
subscriber.current().onError(new IllegalArgumentException());
return;
}
sema.release(n);
}
E agora não podemos esquecer de baixar o semáforo utilizando uma chamada
AsyncSemaPort.aquire()
no momento da utilização do recurso:
public synchronized void onNext(T item) {
Subscriber<? super T> subscriber = this.subscriber.current();
if (subscriber == null) {
throw new IllegalStateException();
}
sema.aquire();
subscriber.onNext(item);
}
O projeto AsyncSemaphore foi desenvolvido especialmente para este artigo. É intencionalmente feito o mais compacto possível para não cansar o leitor. Como resultado, ele contém limitações significativas:
-
Publisher
'Subscriber
' -
Subscriber
' 1
Além disso,
AsyncSemaPort
não é um análogo completo de um semáforo síncrono - apenas um cliente pode executar a operação aquire()
y AsyncSemaPort
(ou seja, o ator de inclusão). Mas isso não é uma desvantagem - AsyncSemaPort
ele desempenha bem sua função. Em princípio, você pode fazer isso de forma diferente - pegue java.util.concurrent.Semaphore
e complemente com uma interface de assinatura assíncrona (consulte AsyncSemaphore.java do projeto DF4J ). Esse semáforo pode ligar atores e threads de execução em qualquer ordem.
Em geral, cada tipo de interação síncrona (bloqueio) tem sua própria contraparte assíncrona (não bloqueadora). Portanto, no mesmo projeto DF4J existe uma implementação
BlockingQueue
, complementado por uma interface assíncrona. Isso abre a possibilidade de uma transformação passo a passo de um programa multithread em um assíncrono, parte por parte substituindo threads por atores.