Introdução
O Spring Integration é um Enterprise Integration Framework (EIP) que usa mecanismos de mensagens sob o capô entre adaptadores de diferentes protocolos / sistemas de integração baseados em canais de mensagens (filas condicionais). Análogos famosos são Camel, Mule, Nifi.
No caso de teste, teremos - para criar um serviço REST que possa ler os parâmetros de solicitação recebidos, acessar nosso banco de dados, por exemplo, postgres, atualizar e buscar os dados da tabela de acordo com os parâmetros recebidos da fonte e enviar o resultado de volta para a fila (request / response) e também cria várias instâncias com diferentes caminhos de solicitação.
Convencionalmente, o diagrama de fluxo de dados terá a seguinte aparência:
Em seguida, mostrarei como você pode simplesmente fazer isso sem muita dança com um pandeiro, usando IntegrationFlowContext, com pontos de extremidade de componente / segmento que controlam REST. Todo o código principal do projeto estará localizado no repositório, aqui vou indicar apenas alguns recortes. Bem, quem está interessado, por favor, sob gato.
Ferramentas
Vamos começar com o bloco de dependência por padrão. Basicamente, precisaremos de projetos de inicialização por mola - para a ideologia REST de gerenciamento de fluxo / componente, integração por mola - para criar nosso caso com base em canais e adaptadores.
E imediatamente pensamos no que mais precisamos para reproduzir o caso. Além das dependências principais, precisamos de subprojetos - integração-http, integração-jdbc, integração-groovy (fornece conversores de dados dinamicamente personalizáveis com base em scripts do Goovy). Separadamente, direi que neste exemplo não usaremos o conversor groovy como desnecessário, mas forneceremos a capacidade de personalizá-lo de fora.
Lista de dependências
<!-- Spring block -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-commons</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-groovy</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-http</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
<!-- Db block -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
</dependency>
<!-- Utility block -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.12</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
<scope>provided</scope>
</dependency>
Cozinha interna
Vamos continuar criando os componentes de sistema necessários (wrappers / modelos). Precisamos de modelos de canal, bean, httpInboundGateway, handler, jdbcOutboundGateway e de resultados.
- bean - um objeto auxiliar necessário para adaptadores, thread
- channel - canal para entrega de mensagens de / para componentes de stream
- httpInboundGateway - ponto de acesso http para o qual enviaremos ainda uma solicitação com dados para processamento adicional
- manipulador - um tipo genérico de manipulador (transformadores de ranhura, vários adaptadores etc.)
- jdbcOutboundGateway - adaptador jdbc
- resultado - manipulador para enviar informações para um canal específico
Precisamos de invólucros para armazenar parâmetros e inicializar corretamente os componentes de um fluxo inteiro; portanto, criamos imediatamente um repositório de componentes. funcionalidade dos conversores JSON -> Modelo de Definição. O mapeamento direto de campos usando jackson e objetos no meu caso não era aplicável - temos mais uma bicicleta para um protocolo de comunicação específico.
Vamos fazê-lo imediatamente , usando anotações :
StreamComponent - é responsável por identificar as classes como um modelo de ajuste de um componente de fluxo e possui informações de serviço - o nome do componente, o tipo do componente, se o componente está aninhado e a descrição;
SettingClass - responsável por opções adicionais para varrer o modelo, como varrer campos de super classes e ignorar campos ao inicializar valores;
SettingValue - responsável por identificar um campo de classe como personalizável de fora, com configurações de nomeação em JSON, descrição, conversor de tipo, sinalizador de campo obrigatório e sinalizador de objeto interno para fins informativos;
Gerenciador de armazenamento de componentes
Métodos auxiliares para trabalhar com modelos para controladores REST
Modelo base - uma abstração com um conjunto de campos / métodos auxiliares modelos
atuais de modelos de configuração de fluxo Modelos de configuração de fluxo atual
Mapper JSON -> Modelo de Definição
O principal terreno para o trabalho foi preparado. Agora, vamos direto à implementação, diretamente, de serviços que serão responsáveis pelo ciclo de vida, armazenamento e inicialização de fluxos e definiremos imediatamente a ideia de que podemos paralelizar 1 fluxo com a mesma nomeação em várias instâncias, ou seja, precisaremos criar identificadores (guias) exclusivos para todos os componentes do fluxo; caso contrário, colisões com outros componentes singleton (beans, canais etc.) podem ocorrer no contexto do aplicativo. Mas primeiro vamos criar mapeadores de dois componentes - http e jdbc, ou seja, o incremento dos modelos feitos anteriormente para os componentes do próprio fluxo (HttpRequestHandlerEndpointSpec e JdbcOutboundGateway). Serviço de Gerenciamento Central
HttpRegistry
JdbcRegistry
( StreamDeployingService) executa as funções de armazenamento de trabalhadores / inativos, registra novas, inicia, interrompe e remove os threads completamente do contexto do aplicativo. Um recurso importante do serviço é a implementação da dependência IntegrationFlowBuilderRegistry, que nos ajuda a dinamizar o aplicativo (talvez lembre-se desses arquivos xml de configuração ou classes DSL por quilômetros). De acordo com a especificação do fluxo, ele deve sempre começar com um componente ou canal de entrada, portanto, levamos isso em consideração na implementação do método registerStreamContext.
E o gerente auxiliar ( IntegrationFlowBuilderRegistry), que executa a função de um mapeador de modelos para o fluxo de componentes e a inicialização do próprio fluxo usando o IntegrationFlowBuilder. Também implementei um manipulador de log no pipeline de fluxo, um serviço para coletar métricas de canal de fluxo (uma opção alternável) e uma possível implementação de conversores de mensagens de fluxo com base na implementação Groovy (se de repente este exemplo se tornar a base da venda, a pré-compilação de scripts groovy deve ser feita no estágio de inicialização do fluxo , porque realize testes de carga na RAM e não importa quantos núcleos e energia você tenha). Dependendo da configuração dos estágios de log e dos parâmetros de nível de log do modelo, ele estará ativo após cada transmissão de uma mensagem de componente para componente. O monitoramento é ativado e desativado por um parâmetro em application.yml:
monitoring:
injectction:
default: true
Agora, temos toda a mecânica para inicializar fluxos dinâmicos de processamento de dados, além disso, podemos escrever mapeadores para vários protocolos e adaptadores, como RabbitMQ, Kafka, Tcp, Ftp, etc. além disso, na maioria dos casos, você não precisa escrever nada com sua própria mão (exceto, é claro, modelos de configuração e métodos auxiliares) - um número bastante grande de componentes já está presente no repositório .
O estágio final será a implementação de controladores para obter informações sobre os componentes existentes do sistema, gerenciar fluxos e obter métricas.
ComponentsController - fornece informações sobre todos os componentes em um modelo legível por humanos e um componente por nome e tipo.
StreamController - fornece gerenciamento de fluxo completo, a saber, a inicialização de novos modelos JSON, iniciando, parando, excluindo e emitindo métricas por identificador.
Produto final
Aumentamos o aplicativo resultante e descrevemos o caso de teste no formato JSON.
Fluxo de Dados de Amostra
Script de inicialização do banco de dados:
Importante: o parâmetro order é usado para inicialização sequencial de componentes no contexto do fluxo, ou seja, Como os componentes são construídos de acordo com este parâmetro, o processamento da mensagem recebida será reproduzido. (os canais e os compartimentos são sempre listados primeiro na lista). E para o bem - você precisa fazer o processamento do gráfico e a necessidade desse parâmetro desaparecerá por si só.
CREATE TABLE IF NOT EXISTS account_data
(
id INT NOT NULL,
accountname VARCHAR(45) NOT NULL,
password VARCHAR(128),
email VARCHAR(255),
last_ip VARCHAR(15) DEFAULT NULL NOT NULL
);
CREATE UNIQUE INDEX account_data_username_uindex
ON account_data (accountname);
ALTER TABLE account_data
ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
SEQUENCE NAME account_data_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1
);
ALTER TABLE account_data
ADD CONSTRAINT account_data_pk
PRIMARY KEY (id);
CREATE TABLE IF NOT EXISTS account_info
(
id INT NOT NULL,
banned BOOLEAN DEFAULT FALSE,
premium_points INT DEFAULT 0,
premium_type SMALLINT DEFAULT -1
);
ALTER TABLE account_info
ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
SEQUENCE NAME account_info_id_seq
START WITH 1
INCREMENT BY 1
NO MINVALUE
NO MAXVALUE
CACHE 1
);
ALTER TABLE account_info
ADD CONSTRAINT account_info_account_data_id_fk FOREIGN KEY (id) REFERENCES account_data (id)
ON UPDATE CASCADE ON DELETE CASCADE;
ALTER TABLE account_info
ADD CONSTRAINT account_info_pk
PRIMARY KEY (id);
INSERT INTO account_data (accountname, password, email, last_ip)
VALUES ('test', 'test', 'test@test', '127.0.0.1');
INSERT INTO account_info (banned, premium_points, premium_type)
VALUES (false, 1000, 1);
Importante: o parâmetro order é usado para inicialização sequencial de componentes no contexto do fluxo, ou seja, Como os componentes são construídos de acordo com este parâmetro, o processamento da mensagem recebida será reproduzido. (os canais e os compartimentos são sempre listados primeiro na lista). E para o bem - você precisa fazer o processamento do gráfico e a necessidade desse parâmetro desaparecerá por si só.
{
"flowName": "Rest Postgres stream",
"components": [
{
"componentName": "bean",
"componentType": "other",
"componentParameters": {
"id": "pgDataSource",
"bean-type": "com.zaxxer.hikari.HikariDataSource",
"property-args": [
{
"property-name": "username",
"property-value": "postgres"
},
{
"property-name": "password",
"property-value": "postgres"
},
{
"property-name": "jdbcUrl",
"property-value": "jdbc:postgresql://localhost:5432/test"
},
{
"property-name": "driverClassName",
"property-value": "org.postgresql.Driver"
}
]
}
},
{
"componentName": "message-channel",
"componentType": "source",
"componentParameters": {
"id": "jdbcReqChannel",
"order": 1,
"channel-type": "direct",
"max-subscribers": 1000
}
},
{
"componentName": "message-channel",
"componentType": "source",
"componentParameters": {
"id": "jdbcRepChannel",
"order": 1,
"channel-type": "direct"
}
},
{
"componentName": "http-inbound-gateway",
"componentType": "source",
"componentParameters": {
"order": 2,
"http-inbound-supported-methods": [
"POST"
],
"payload-type": "org.genfork.integration.model.request.http.SimpleJdbcPayload",
"log-stages": true,
"log-level": "INFO",
"request-channel": "jdbcReqChannel",
"reply-channel": "jdbcRepChannel"
}
},
{
"componentName": "handler",
"componentType": "processor",
"componentParameters": {
"order": 3,
"handler-definition": {
"componentName": "jdbc-outbound-adapter",
"componentType": "app",
"componentParameters": {
"data-source": "pgDataSource",
"query": "SELECT accountname, password, email, last_ip, banned, premium_points, premium_type FROM account_data d INNER JOIN account_info i ON d.id = i.id WHERE d.id = :payload.accountId",
"update-query": "UPDATE account_info SET banned = true WHERE id = :payload.accountId",
"jdbc-reply-channel": "jdbcRepChannel",
"log-stages": true,
"log-level": "INFO"
}
}
}
},
{
"componentName": "result",
"componentType": "app",
"componentParameters": {
"order": 4,
"cancel": false,
"result-channel": "jdbcRepChannel"
}
}
]
}
Teste:
1) Inicializamos um novo fluxo usando o método
POST / stream / deploy, em que nosso JSON estará no corpo da solicitação.
Em resposta, o sistema terá que enviar se tudo estiver correto, caso contrário, uma mensagem de erro será visível:
{
"status": "SUCCESS", -
"streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b" -
}
2) Iniciamos o start usando o método:
GET / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b / start, onde indicamos o identificador do stream inicializado anteriormente.
Em resposta, o sistema terá que enviar se tudo estiver correto, caso contrário, uma mensagem de erro será visível:
{
"status": "SUCCESS", -
}
3) Chamando um fluxo por um identificador no sistema? Como, o que e onde - no mapeador do modelo HttpRegistry, escrevi a condição
Http.inboundGateway(localPath != null ? localPath : String.format("/stream/%s/call", uuid))
onde, o parâmetro http-inbound-path é levado em consideração e, se não for especificado explicitamente na configuração do componente, será ignorado e o caminho de chamada do sistema será definido. No nosso caso, será:
POST / stream / ece4d4ac-3b46-4952-b0a6-8cf334074b99 / call - onde o identificador de fluxo está presente, com o corpo da solicitação:
{
"accountId": 1
}
Em resposta, receberemos, se as etapas de processamento da solicitação funcionarem corretamente, receberemos uma estrutura plana de registros das tabelas account_data e account_info.
{
"accountname": "test",
"password": "test",
"email": "test@test",
"last_ip": "127.0.0.1",
"banned": true,
"premium_points": 1000,
"premium_type": 1
}
A especificidade do adaptador JdbcOutboundGateway é tal que, se você especificar o parâmetro update-query, será registrado um manipulador adicional, que primeiro atualizará os dados e somente buscará pelo parâmetro de consulta.
Se você especificar os mesmos caminhos manualmente, a possibilidade de iniciar componentes com HttpInboundGateway como um ponto de acesso a um fluxo em várias instâncias será abolida, porque o sistema não permitirá o registro de um caminho semelhante.
4) Vejamos as métricas usando o método GET / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b / metrics
Conteúdo de resposta
, / , / / :
[
{
"streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
"channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcReqChannel",
"sendDuration": {
"count": 1,
"min": 153.414,
"max": 153.414,
"mean": 153.414,
"standardDeviation": 0.0,
"countLong": 1
},
"maxSendDuration": 153.414,
"minSendDuration": 153.414,
"meanSendDuration": 153.414,
"meanSendRate": 0.001195117818082359,
"sendCount": 1,
"sendErrorCount": 0,
"errorRate": {
"count": 0,
"min": 0.0,
"max": 0.0,
"mean": 0.0,
"standardDeviation": 0.0,
"countLong": 0
},
"meanErrorRate": 0.0,
"meanErrorRatio": 1.1102230246251565E-16
},
{
"streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
"channelName": "application.2bf65d9d-97c6-4199-86aa-0c808c25071b.channel#2",
"sendDuration": {
"count": 1,
"min": 0.1431,
"max": 0.1431,
"mean": 0.1431,
"standardDeviation": 0.0,
"countLong": 1
},
"maxSendDuration": 0.1431,
"minSendDuration": 0.1431,
"meanSendDuration": 0.1431,
"meanSendRate": 0.005382436008121413,
"sendCount": 1,
"sendErrorCount": 0,
"errorRate": {
"count": 0,
"min": 0.0,
"max": 0.0,
"mean": 0.0,
"standardDeviation": 0.0,
"countLong": 0
},
"meanErrorRate": 0.0,
"meanErrorRatio": 0.0
},
{
"streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
"channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcRepChannel",
"sendDuration": {
"count": 1,
"min": 0.0668,
"max": 0.0668,
"mean": 0.0668,
"standardDeviation": 0.0,
"countLong": 1
},
"maxSendDuration": 0.0668,
"minSendDuration": 0.0668,
"meanSendDuration": 0.0668,
"meanSendRate": 0.001195118373693797,
"sendCount": 1,
"sendErrorCount": 0,
"errorRate": {
"count": 0,
"min": 0.0,
"max": 0.0,
"mean": 0.0,
"standardDeviation": 0.0,
"countLong": 0
},
"meanErrorRate": 0.0,
"meanErrorRatio": 1.1102230246251565E-16
}
]
Conclusão
Assim, foi mostrado como, depois de gastar um pouco mais de tempo e esforço, escrever um aplicativo para integração com vários sistemas do que escrever manipuladores manuais adicionais (pipelines) a cada vez no seu aplicativo para integração com outros sistemas, 200-500 linhas de código cada.
No exemplo atual, você pode paralelizar o trabalho do mesmo tipo de linhas para várias instâncias, por meio de identificadores únicos, evitando colisões no contexto global da aplicação entre dependências de rosca (caixas, canais, etc.).
Além disso, você pode desenvolver o projeto:
- salvar fluxos no banco de dados;
- dar suporte a todos os componentes de integração que a comunidade de primavera e integração de primavera nos fornece;
- criar trabalhadores que executariam trabalhos com threads em um cronograma;
- faça uma interface sã para configurar fluxos com um "mouse e componente cubos" condicionais (a propósito, o exemplo foi parcialmente aprimorado para o projeto github.com/spring-cloud/spring-cloud-dataflow-ui ).
E mais uma vez vou duplicar o link para o repositório .