Usando o Azure Service Bus do Java

Olá colegas! Acontece que nosso aplicativo é escrito na pilha java, mas está hospedado no Azure. E estamos tentando aproveitar ao máximo os serviços de gerenciamento do provedor de nuvem.



Um deles é o Azure Service Bus e hoje quero falar sobre os recursos de usá-lo em um aplicativo Spring Boot regular.



Se você quiser ler sobre os recursos do rake - vá até o final do artigo



O que é Azure Service Bus



Algumas palavras sobre o Azure Service Bus é um agente de mensagens em nuvem (substituição de nuvem para RabbitMQ, ActiveMQ). Suporta filas (a mensagem é entregue a um destinatário) e tópicos (o mecanismo de publicação / assinatura) - em mais detalhes aqui O



suporte é declarado:



  1. Mensagens ordenadas - a documentação diz que se trata de um FIFO, MAS é implementado usando o conceito de sessões de mensagens - um grupo de mensagens, não a fila inteira. Se você precisar garantir a ordem das mensagens, combine as mensagens em um grupo e agora as mensagens do grupo serão entregues como FIFO. Portanto, Azure Service Bus Queue não é um FIFO - ele entrega suas mensagens de forma tão aleatória quanto convém
  2. Fila de devoluções - tudo é simples aqui, eles não conseguiram entregar a mensagem após N tentativas ou um período de tempo - movida para DLQ
  3. Entrega programada - você pode definir um atraso antes da entrega
  4. Adiamento de mensagens - oculta as mensagens na fila, a mensagem não será entregue automaticamente, mas pode ser recuperada por ID. Precisamos armazenar este ID em algum lugar


Como integrar com o Azure Service Bus



O Barramento de Serviço do Azure dá suporte ao AMQP 1.0, o que significa que não é compatível com clientes RabbitMQ. bunny usa AMQP 0.9.1



O único cliente "padrão" que pode trabalhar com o Service Bus é o Apache Qpid .



Existem 3 maneiras de emparelhar seu aplicativo Spring Boot com Service Bus:



  1. JMS + QPID — , — QPID — .

    timeout producer — — factory.setCacheProducers(false);
  2. Spring Cloud — Azure Service Bus — , . Service Bus

    ( 1.2.6) — , azure service bus java sdk.



    Spring Integration — , «Scheduled delivery» «Message deferral» .



    sdk, MessageAndSessionPump

  3. azure service bus java sdk — ,


Spring Cloud — Azure Service Bus



Vou me alongar sobre esse método com mais detalhes e falar sobre os recursos de uso do

aplicativo de exemplo no repositório oficial, portanto, não faz sentido duplicar o código - o repositório com um exemplo está aqui .



Porque é Spring Integration Messaging, tudo se resume a Channel, MessageHandler, MessagingGateway, ServiceActivator.



E há o ServiceBusQueueTemplate .



Enviando mensagens



Devemos ter um Canal no qual escrevemos a mensagem que queremos enviar, na outra ponta existe um MessageHandler que a envia para o Barramento de Serviços.



O MessagHandler é com.microsoft.azure.spring.integration.core.DefaultMessageHandler - este é o conector para o serviço externo.



Como vinculá-lo a um canal? - adicione a anotação - @ServiceActivator (inputChannel = OUTPUT_CHANNEL) e agora nosso MessagHandler está ouvindo o canal OUTPUT_CHANNEL .



Em seguida, precisamos de alguma forma escrever nossa mensagem para o canal - aqui novamente a magia da primavera - anunciamos o MessagingGateway e o vinculamos ao canal pelo nome.



Um snippet do exemplo :



@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
    void send(String text);
}


Isso é tudo: Gateway -> Canal -> MessagHandler -> ServiceBusQueueTemplate -> ServiceBusMessageConverter .



No código, resta injetar nosso gateway e chamar o método send .



Mencionei ServiceBusMessageConverter na cadeia de chamada para um motivo - se você deseja adicionar cabeçalhos personalizados (como por exemplo) CORRELATION_ID à mensagem, este é o lugar onde eles precisam ser movidos de org.springframework.messaging.MessageHeaders à mensagem celeste.

O método especial setCustomHeaders .



Nesse caso, seu gateway será semelhante a este:



@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
    void send(@Payload String text, @Header("CORRELATION_ID") String correlationId);
}


Recebendo mensagens



Ok, nós sabemos como enviar mensagens, como recebê-las agora?



Aqui tudo é o mesmo - MessageProducer -> Canal -> Handler



O MessageProducer é com.microsoft.azure.spring.integration.servicebus.inbound.ServiceBusQueueInboundChannelAdapter - este é nosso conector para um serviço externo. Dentro, o mesmo ServiceBusQueueTemplate com ServiceBusMessageConverter onde você pode ler cabeçalhos personalizados e colocá-los na mensagem de integração de primavera.



O canal já está instalado manualmente:



@Bean
public ServiceBusQueueInboundChannelAdapter queueMessageChannelAdapter(
        @Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, 
        ServiceBusQueueOperation queueOperation) {
    queueOperation.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
    ServiceBusQueueInboundChannelAdapter adapter = new ServiceBusQueueInboundChannelAdapter(QUEUE_NAME,
            queueOperation);
    adapter.setOutputChannel(inputChannel);
    return adapter;
}


Mas o próprio Handler é anexado ao canal via @ServiceActivator .



@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
    String message = new String(payload);
.......


Você pode obter a linha imediatamente:



@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(String payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
.......


Você deve ter notado o estranho parâmetro checkpointer Checkpointer , ele é usado para reconhecer manualmente o processamento da mensagem.

Se você definir CheckpointMode.MANUAL ao criar o ServiceBusQueueInboundChannelAdapter , deverá enviar o reconhecimento da mensagem você mesmo. Se você usar CheckpointMode.RECORD , a confirmação será enviada automaticamente - detalhes no código ServiceBusQueueTemplate .







Características de uso



Então, a lista de "rakes" e "chips" em que já fomos.



ReceiveMode.PEEKLOCK



O Barramento de Serviço do Azure dá suporte ao modo PEEKLOCK - o consumidor recebe uma mensagem, ela é bloqueada no barramento de serviço, fica inacessível para qualquer pessoa por um determinado tempo (duração do bloqueio), mas não é excluído dele. Se dentro do tempo estipulado o consumidor não enviar a confirmação do processamento - sucesso / abandono ou não estender o bloqueio - a mensagem é considerada novamente disponível e será feita uma nova tentativa de entrega.



Curiosamente, abandonar simplesmente redefine o bloqueio e a mensagem fica disponível instantaneamente para ser reenviada.



ServiceBusQueueTemplate padrão cria QueueClient modo ReceiveMode.PEEKLOCK .



Se uma exceção não tratada voar em nosso manipulador- nenhuma confirmação será enviada ao servidor e a mensagem permanecerá bloqueada e será reenviada pelo tempo limite.

Nesse caso, o balcão de entrega aumentará, o que é lógico.



Não sei se isso é um bug ou um recurso - mas é muito conveniente fazer um intervalo entre as novas tentativas para situações em que for necessário.



Se a mensagem não puder ser processada mesmo com nova tentativa, é necessário capturar exceções e marcar a mensagem como processada e adicionar lógica adicional ao aplicativo, caso contrário, ela será entregue repetidamente até atingir o limite do número de reenvio (configurado ao criar uma fila no barramento de serviço )



Contagem de mensagens de simultaneidade e pré-busca



Como você deve ter adivinhado, a configuração de simultaneidade é responsável pelo número de manipuladores de mensagens paralelas e a contagem de mensagem de pré-busca é quantas mensagens obteremos no buffer do servidor.



Por padrão, o ServiceBusQueueTemplate é configurado automaticamente (AzureServiceBusQueueAutoConfiguration) com um valor de 1 para ambos os parâmetros, ou seja, por padrão, cada fila terá um encadeamento de processamento, embora o conceito de um barramento de serviço com reconhecimento para cada mensagem individual implique muitos processadores simultâneos. Isso é ainda mais importante se você tiver um longo processamento de solicitação.



Infelizmente, essas configurações não podem ser definidas por meio da configuração do aplicativo (application.yml / application.properties) e só podem ser definidas no código. Mas, mesmo por meio do código, você não poderá definir configurações diferentes para filas diferentes.



Portanto, se você precisar fazer configurações diferentes, terá que criar vários beans ServiceBusQueueTemplate para cada ServiceBusQueueInboundChannelAdapter



CompletableFuture dentro do barramento de serviço azure java SDK



O próprio barramento de serviço azure java sdk é implementado em torno do executor CompletableFuture e CachedThreadPool - MessagingFactory.INTERNAL_THREAD_POOL, portanto, tome cuidado com todos os tipos de beans locais de thread



Mensagens ordenadas



Usamos o barramento de serviço como uma fila de trabalho - alguns trabalhos dependem uns dos outros e, portanto, devem ser executados na ordem em que foram criados.



Como mencionei acima, as camisetas usam o conceito de sessões de mensagem - quando as mensagens são agrupadas em uma sessão por chave (transmitida no cabeçalho), a sessão existe desde que haja pelo menos uma mensagem com a chave de sessão - em detalhes na documentação O

barramento de serviço garante a entrega de mensagens dentro de tal grupo na ordem de adição a servidor (ou seja, na ordem em que o servidor de barramento de serviço os gravou no repositório).



Também vale a pena mencionar se você criou uma fila de sessões habilitadas - isso significa que todas as mensagens devem ter um cabeçalho com uma chave de sessão.



Imediatamente ficamos muito satisfeitos com a possibilidade do barramento de serviço alinhar mensagens em uma fila FIFO - ainda que para um grupo de mensagens.



Mas depois de um tempo, começamos a notar problemas:



  • algumas mensagens começaram a chegar um número infinito de vezes
  • o processamento da fila ficou lento
  • nas estatísticas do barramento de serviço, metade das solicitações são marcadas como com falha, e as solicitações com falha aparecem mesmo em uma fila vazia quando ocioso


Olhando para o código sdk, descobrimos a peculiaridade de trabalhar com sessões:



  1. o consumidor captura a sessão e começa a ler todas as mensagens disponíveis nela
  2. processou simultaneamente o número de sessões igual ao parâmetro de simultaneidade
  3. unhandled exception — 1 ( ) — re-delivery ? 0 exception — ttl .
  4. — success abandon. — delay re-delivery

    .. abandon — , delivery counter .

    delivery count


Como resultado, abandonamos esse recurso de barramento de serviço e criamos uma bicicleta, e o barramento de serviço atua como um gatilho.



Assim que a fila de sessões habilitadas foi cancelada, os erros nas estatísticas desapareceram, a solicitação ao barramento de serviço.



No pacote JMS + Qpid - esta funcionalidade não está disponível.



Problemas potenciais com tamanhos de fila maiores que 1G



Ainda não conheci, mas ouvi dizer que começa a funcionar instável se o tamanho da fila for maior que 1G.



Se você se deparar com isso ou vice-versa, tudo funciona - escreva nos comentários.



Problemas com solicitações de rastreamento



O agente do azure Application Insights padrão não sabe como rastrear o envio de mensagens como dependência e as mensagens recebidas como solicitações.



Tive que adicionar algum código.



Resultado



Se você precisar de uma fila de trabalho com um longo tempo de processamento de mensagens e não precisar de uma fila, você pode usar.



Se o processamento de mensagens for rápido - use o Azure Event Hub - Kafka normal, o cliente padrão funciona bem.



All Articles