Integração Spring - fluxos de dados dinâmicos

Fogos de artifício, Habr! Hoje, analisaremos uma área bastante específica - o fluxo de dados usando a estrutura Spring Integration e como fazer esses fluxos em tempo de execução sem inicialização preliminar no contexto do aplicativo. Um aplicativo de amostra completo está no Gita .



Introdução



O Spring Integration é um Enterprise Integration Framework (EIP) que usa mecanismos de mensagens sob o capô entre adaptadores de diferentes protocolos / sistemas de integração baseados em canais de mensagens (filas condicionais). Análogos famosos são Camel, Mule, Nifi.



No caso de teste, teremos - para criar um serviço REST que possa ler os parâmetros de solicitação recebidos, acessar nosso banco de dados, por exemplo, postgres, atualizar e buscar os dados da tabela de acordo com os parâmetros recebidos da fonte e enviar o resultado de volta para a fila (request / response) e também cria várias instâncias com diferentes caminhos de solicitação.



Convencionalmente, o diagrama de fluxo de dados terá a seguinte aparência:



imagem



Em seguida, mostrarei como você pode simplesmente fazer isso sem muita dança com um pandeiro, usando IntegrationFlowContext, com pontos de extremidade de componente / segmento que controlam REST. Todo o código principal do projeto estará localizado no repositório, aqui vou indicar apenas alguns recortes. Bem, quem está interessado, por favor, sob gato.



Ferramentas



Vamos começar com o bloco de dependência por padrão. Basicamente, precisaremos de projetos de inicialização por mola - para a ideologia REST de gerenciamento de fluxo / componente, integração por mola - para criar nosso caso com base em canais e adaptadores.



E imediatamente pensamos no que mais precisamos para reproduzir o caso. Além das dependências principais, precisamos de subprojetos - integração-http, integração-jdbc, integração-groovy (fornece conversores de dados dinamicamente personalizáveis ​​com base em scripts do Goovy). Separadamente, direi que neste exemplo não usaremos o conversor groovy como desnecessário, mas forneceremos a capacidade de personalizá-lo de fora.



Lista de dependências
 <!-- Spring block -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-commons</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-groovy</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-http</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-jdbc</artifactId>
        </dependency>

        <!-- Db block -->
        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
        </dependency>

        <dependency>
            <groupId>com.zaxxer</groupId>
            <artifactId>HikariCP</artifactId>
        </dependency>

        <!-- Utility block -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <dependency>
            <groupId>org.reflections</groupId>
            <artifactId>reflections</artifactId>
            <version>0.9.12</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
            <scope>provided</scope>
        </dependency>




Cozinha interna



Vamos continuar criando os componentes de sistema necessários (wrappers / modelos). Precisamos de modelos de canal, bean, httpInboundGateway, handler, jdbcOutboundGateway e de resultados.



  • bean - um objeto auxiliar necessário para adaptadores, thread
  • channel - canal para entrega de mensagens de / para componentes de stream
  • httpInboundGateway - ponto de acesso http para o qual enviaremos ainda uma solicitação com dados para processamento adicional
  • manipulador - um tipo genérico de manipulador (transformadores de ranhura, vários adaptadores etc.)
  • jdbcOutboundGateway - adaptador jdbc
  • resultado - manipulador para enviar informações para um canal específico


Precisamos de invólucros para armazenar parâmetros e inicializar corretamente os componentes de um fluxo inteiro; portanto, criamos imediatamente um repositório de componentes. funcionalidade dos conversores JSON -> Modelo de Definição. O mapeamento direto de campos usando jackson e objetos no meu caso não era aplicável - temos mais uma bicicleta para um protocolo de comunicação específico.



Vamos fazê-lo imediatamente , usando anotações :



StreamComponent - é responsável por identificar as classes como um modelo de ajuste de um componente de fluxo e possui informações de serviço - o nome do componente, o tipo do componente, se o componente está aninhado e a descrição;



SettingClass - responsável por opções adicionais para varrer o modelo, como varrer campos de super classes e ignorar campos ao inicializar valores;



SettingValue - responsável por identificar um campo de classe como personalizável de fora, com configurações de nomeação em JSON, descrição, conversor de tipo, sinalizador de campo obrigatório e sinalizador de objeto interno para fins informativos;



Gerenciador de armazenamento de componentes



Métodos auxiliares para trabalhar com modelos para controladores REST



Modelo base - uma abstração com um conjunto de campos / métodos auxiliares modelos



atuais de modelos de configuração de fluxo Modelos de configuração de fluxo atual



Mapper JSON -> Modelo de Definição



O principal terreno para o trabalho foi preparado. Agora, vamos direto à implementação, diretamente, de serviços que serão responsáveis ​​pelo ciclo de vida, armazenamento e inicialização de fluxos e definiremos imediatamente a ideia de que podemos paralelizar 1 fluxo com a mesma nomeação em várias instâncias, ou seja, precisaremos criar identificadores (guias) exclusivos para todos os componentes do fluxo; caso contrário, colisões com outros componentes singleton (beans, canais etc.) podem ocorrer no contexto do aplicativo. Mas primeiro vamos criar mapeadores de dois componentes - http e jdbc, ou seja, o incremento dos modelos feitos anteriormente para os componentes do próprio fluxo (HttpRequestHandlerEndpointSpec e JdbcOutboundGateway). Serviço de Gerenciamento Central



HttpRegistry



JdbcRegistry



( StreamDeployingService) executa as funções de armazenamento de trabalhadores / inativos, registra novas, inicia, interrompe e remove os threads completamente do contexto do aplicativo. Um recurso importante do serviço é a implementação da dependência IntegrationFlowBuilderRegistry, que nos ajuda a dinamizar o aplicativo (talvez lembre-se desses arquivos xml de configuração ou classes DSL por quilômetros). De acordo com a especificação do fluxo, ele deve sempre começar com um componente ou canal de entrada, portanto, levamos isso em consideração na implementação do método registerStreamContext.



E o gerente auxiliar ( IntegrationFlowBuilderRegistry), que executa a função de um mapeador de modelos para o fluxo de componentes e a inicialização do próprio fluxo usando o IntegrationFlowBuilder. Também implementei um manipulador de log no pipeline de fluxo, um serviço para coletar métricas de canal de fluxo (uma opção alternável) e uma possível implementação de conversores de mensagens de fluxo com base na implementação Groovy (se de repente este exemplo se tornar a base da venda, a pré-compilação de scripts groovy deve ser feita no estágio de inicialização do fluxo , porque realize testes de carga na RAM e não importa quantos núcleos e energia você tenha). Dependendo da configuração dos estágios de log e dos parâmetros de nível de log do modelo, ele estará ativo após cada transmissão de uma mensagem de componente para componente. O monitoramento é ativado e desativado por um parâmetro em application.yml:



monitoring:
  injectction:
    default: true


Agora, temos toda a mecânica para inicializar fluxos dinâmicos de processamento de dados, além disso, podemos escrever mapeadores para vários protocolos e adaptadores, como RabbitMQ, Kafka, Tcp, Ftp, etc. além disso, na maioria dos casos, você não precisa escrever nada com sua própria mão (exceto, é claro, modelos de configuração e métodos auxiliares) - um número bastante grande de componentes já está presente no repositório .



O estágio final será a implementação de controladores para obter informações sobre os componentes existentes do sistema, gerenciar fluxos e obter métricas.



ComponentsController - fornece informações sobre todos os componentes em um modelo legível por humanos e um componente por nome e tipo.



StreamController - fornece gerenciamento de fluxo completo, a saber, a inicialização de novos modelos JSON, iniciando, parando, excluindo e emitindo métricas por identificador.



Produto final



Aumentamos o aplicativo resultante e descrevemos o caso de teste no formato JSON.



Fluxo de Dados de Amostra
Script de inicialização do banco de dados:



CREATE TABLE IF NOT EXISTS account_data
(
    id          INT                      NOT NULL,
    accountname VARCHAR(45)              NOT NULL,
    password    VARCHAR(128),
    email       VARCHAR(255),
    last_ip     VARCHAR(15) DEFAULT NULL NOT NULL
);

CREATE UNIQUE INDEX account_data_username_uindex
    ON account_data (accountname);

ALTER TABLE account_data
    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
        SEQUENCE NAME account_data_id_seq
            START WITH 1
            INCREMENT BY 1
            NO MINVALUE
            NO MAXVALUE
            CACHE 1
        );

ALTER TABLE account_data
    ADD CONSTRAINT account_data_pk
        PRIMARY KEY (id);

CREATE TABLE IF NOT EXISTS account_info
(
    id             INT NOT NULL,
    banned         BOOLEAN  DEFAULT FALSE,
    premium_points INT      DEFAULT 0,
    premium_type   SMALLINT DEFAULT -1
);

ALTER TABLE account_info
    ALTER COLUMN id ADD GENERATED BY DEFAULT AS IDENTITY (
        SEQUENCE NAME account_info_id_seq
            START WITH 1
            INCREMENT BY 1
            NO MINVALUE
            NO MAXVALUE
            CACHE 1
        );

ALTER TABLE account_info
    ADD CONSTRAINT account_info_account_data_id_fk FOREIGN KEY (id) REFERENCES account_data (id)
        ON UPDATE CASCADE ON DELETE CASCADE;

ALTER TABLE account_info
    ADD CONSTRAINT account_info_pk
        PRIMARY KEY (id);



INSERT INTO account_data (accountname, password, email, last_ip)
VALUES ('test', 'test', 'test@test', '127.0.0.1');
INSERT INTO account_info (banned, premium_points, premium_type)
VALUES (false, 1000, 1);


Importante: o parâmetro order é usado para inicialização sequencial de componentes no contexto do fluxo, ou seja, Como os componentes são construídos de acordo com este parâmetro, o processamento da mensagem recebida será reproduzido. (os canais e os compartimentos são sempre listados primeiro na lista). E para o bem - você precisa fazer o processamento do gráfico e a necessidade desse parâmetro desaparecerá por si só.



{
  "flowName": "Rest Postgres stream",
  "components": [
    {
      "componentName": "bean",
      "componentType": "other",
      "componentParameters": {
        "id": "pgDataSource",
        "bean-type": "com.zaxxer.hikari.HikariDataSource",
        "property-args": [
          {
            "property-name": "username",
            "property-value": "postgres"
          },
          {
            "property-name": "password",
            "property-value": "postgres"
          },
          {
            "property-name": "jdbcUrl",
            "property-value": "jdbc:postgresql://localhost:5432/test"
          },
          {
            "property-name": "driverClassName",
            "property-value": "org.postgresql.Driver"
          }
        ]
      }
    },
    {
      "componentName": "message-channel",
      "componentType": "source",
      "componentParameters": {
        "id": "jdbcReqChannel",
        "order": 1,
        "channel-type": "direct",
        "max-subscribers": 1000
      }
    },
    {
      "componentName": "message-channel",
      "componentType": "source",
      "componentParameters": {
        "id": "jdbcRepChannel",
        "order": 1,
        "channel-type": "direct"
      }
    },
    {
      "componentName": "http-inbound-gateway",
      "componentType": "source",
      "componentParameters": {
        "order": 2,
        "http-inbound-supported-methods": [
          "POST"
        ],
        "payload-type": "org.genfork.integration.model.request.http.SimpleJdbcPayload",
        "log-stages": true,
        "log-level": "INFO",
        "request-channel": "jdbcReqChannel",
        "reply-channel": "jdbcRepChannel"
      }
    },
    {
      "componentName": "handler",
      "componentType": "processor",
      "componentParameters": {
        "order": 3,
        "handler-definition": {
          "componentName": "jdbc-outbound-adapter",
          "componentType": "app",
          "componentParameters": {
            "data-source": "pgDataSource",
            "query": "SELECT accountname, password, email, last_ip, banned, premium_points, premium_type FROM account_data d INNER JOIN account_info i ON d.id = i.id WHERE d.id = :payload.accountId",
            "update-query": "UPDATE account_info SET banned = true WHERE id = :payload.accountId",
            "jdbc-reply-channel": "jdbcRepChannel",
            "log-stages": true,
            "log-level": "INFO"
          }
        }
      }
    },
    {
      "componentName": "result",
      "componentType": "app",
      "componentParameters": {
        "order": 4,
        "cancel": false,
        "result-channel": "jdbcRepChannel"
      }
    }
  ]
}





Teste:



1) Inicializamos um novo fluxo usando o método



POST / stream / deploy, em que nosso JSON estará no corpo da solicitação.



Em resposta, o sistema terá que enviar se tudo estiver correto, caso contrário, uma mensagem de erro será visível:



{
    "status": "SUCCESS", -  
    "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b" -  
}


2) Iniciamos o start usando o método:



GET / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b / start, onde indicamos o identificador do stream inicializado anteriormente.



Em resposta, o sistema terá que enviar se tudo estiver correto, caso contrário, uma mensagem de erro será visível:



{
    "status": "SUCCESS", -  
}


3) Chamando um fluxo por um identificador no sistema? Como, o que e onde - no mapeador do modelo HttpRegistry, escrevi a condição



Http.inboundGateway(localPath != null ? localPath : String.format("/stream/%s/call", uuid))


onde, o parâmetro http-inbound-path é levado em consideração e, se não for especificado explicitamente na configuração do componente, será ignorado e o caminho de chamada do sistema será definido. No nosso caso, será:



POST / stream / ece4d4ac-3b46-4952-b0a6-8cf334074b99 / call - onde o identificador de fluxo está presente, com o corpo da solicitação:



{
    "accountId": 1
}


Em resposta, receberemos, se as etapas de processamento da solicitação funcionarem corretamente, receberemos uma estrutura plana de registros das tabelas account_data e account_info.



{
    "accountname": "test",
    "password": "test",
    "email": "test@test",
    "last_ip": "127.0.0.1",
    "banned": true,
    "premium_points": 1000,
    "premium_type": 1
}


A especificidade do adaptador JdbcOutboundGateway é tal que, se você especificar o parâmetro update-query, será registrado um manipulador adicional, que primeiro atualizará os dados e somente buscará pelo parâmetro de consulta.



Se você especificar os mesmos caminhos manualmente, a possibilidade de iniciar componentes com HttpInboundGateway como um ponto de acesso a um fluxo em várias instâncias será abolida, porque o sistema não permitirá o registro de um caminho semelhante.



4) Vejamos as métricas usando o método GET / stream / 2bf65d9d-97c6-4199-86aa-0c808c25071b / metrics



Conteúdo de resposta
, / , / / :



[
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcReqChannel",
        "sendDuration": {
            "count": 1,
            "min": 153.414,
            "max": 153.414,
            "mean": 153.414,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 153.414,
        "minSendDuration": 153.414,
        "meanSendDuration": 153.414,
        "meanSendRate": 0.001195117818082359,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 1.1102230246251565E-16
    },
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.2bf65d9d-97c6-4199-86aa-0c808c25071b.channel#2",
        "sendDuration": {
            "count": 1,
            "min": 0.1431,
            "max": 0.1431,
            "mean": 0.1431,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 0.1431,
        "minSendDuration": 0.1431,
        "meanSendDuration": 0.1431,
        "meanSendRate": 0.005382436008121413,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 0.0
    },
    {
        "streamId": "2bf65d9d-97c6-4199-86aa-0c808c25071b",
        "channelName": "application.Rest Postgres stream_2bf65d9d-97c6-4199-86aa-0c808c25071b_jdbcRepChannel",
        "sendDuration": {
            "count": 1,
            "min": 0.0668,
            "max": 0.0668,
            "mean": 0.0668,
            "standardDeviation": 0.0,
            "countLong": 1
        },
        "maxSendDuration": 0.0668,
        "minSendDuration": 0.0668,
        "meanSendDuration": 0.0668,
        "meanSendRate": 0.001195118373693797,
        "sendCount": 1,
        "sendErrorCount": 0,
        "errorRate": {
            "count": 0,
            "min": 0.0,
            "max": 0.0,
            "mean": 0.0,
            "standardDeviation": 0.0,
            "countLong": 0
        },
        "meanErrorRate": 0.0,
        "meanErrorRatio": 1.1102230246251565E-16
    }
]




Conclusão



Assim, foi mostrado como, depois de gastar um pouco mais de tempo e esforço, escrever um aplicativo para integração com vários sistemas do que escrever manipuladores manuais adicionais (pipelines) a cada vez no seu aplicativo para integração com outros sistemas, 200-500 linhas de código cada.



No exemplo atual, você pode paralelizar o trabalho do mesmo tipo de linhas para várias instâncias, por meio de identificadores únicos, evitando colisões no contexto global da aplicação entre dependências de rosca (caixas, canais, etc.).



Além disso, você pode desenvolver o projeto:



  • salvar fluxos no banco de dados;
  • dar suporte a todos os componentes de integração que a comunidade de primavera e integração de primavera nos fornece;
  • criar trabalhadores que executariam trabalhos com threads em um cronograma;
  • faça uma interface sã para configurar fluxos com um "mouse e componente cubos" condicionais (a propósito, o exemplo foi parcialmente aprimorado para o projeto github.com/spring-cloud/spring-cloud-dataflow-ui ).


E mais uma vez vou duplicar o link para o repositório .



All Articles