Somos amigos do ELK e do Exchange. Parte 2





Eu continuo minha história sobre como fazer amigos do Exchange e do ELK (comece aqui ). Deixe-me lembrá-lo de que esta combinação é capaz de lidar com um grande número de toras sem hesitação. Desta vez, falaremos sobre como fazer o Exchange funcionar com os componentes Logstash e Kibana.



O Logstash na pilha ELK é usado para processar logs de maneira inteligente e prepará-los para colocação no Elastic na forma de documentos, com base nos quais é conveniente construir várias visualizações em Kibana.



Instalação



Consiste em duas etapas:



  • Instalando e configurando o pacote OpenJDK.
  • Instalando e configurando o pacote Logstash.


Instalando e configurando o



pacote OpenJDK O pacote OpenJDK deve ser baixado e descompactado em um diretório específico. Em seguida, o caminho para este diretório deve ser inserido nas variáveis ​​$ env: Path e $ env: JAVA_HOME do sistema operacional Windows:











Verifique a versão Java:



PS C:\> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)


Instalando e configurando o pacote Logstash



Baixe o arquivo com a distribuição Logstash aqui . O arquivo deve ser descompactado para a raiz do disco. C:\Program FilesVocê não deve descompactá-lo em uma pasta , o Logstash se recusará a iniciar normalmente. Em seguida, você precisa fazer jvm.optionsalterações no arquivo que é responsável por alocar RAM para o processo Java. Eu recomendo especificar metade da RAM do servidor. Se ele tiver 16 GB de RAM a bordo, as chaves padrão são:



-Xms1g
-Xmx1g


deve ser substituído por:



-Xms8g
-Xmx8g


Além disso, é aconselhável comentar a linha -XX:+UseConcMarkSweepGC. Leia mais sobre isso aqui . A próxima etapa é criar uma configuração padrão no arquivo logstash.conf:



input {
 stdin{}
}
 
filter {
}
 
output {
 stdout {
 codec => "rubydebug"
 }
}


Com essa configuração, o Logstash lê os dados do console, passa por um filtro vazio e grava de volta no console. Aplicar esta configuração testará a funcionalidade do Logstash. Para fazer isso, vamos lançá-lo interativamente:



PS C:\...\bin> .\logstash.bat -f .\logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}


O Logstash foi lançado com sucesso na porta 9600.



A etapa final da instalação é lançar o Logstash como um serviço do Windows. Isso pode ser feito, por exemplo, usando o pacote NSSM :



PS C:\...\bin> .\nssm.exe install logstash
Service "logstash" installed successfully!


tolerância ao erro



O mecanismo de Filas Persistentes garante a segurança dos logs durante a transmissão do servidor de origem.



Como funciona



O layout das filas durante o processamento do log: entrada → fila → filtro + saída.



O plug-in de entrada recebe dados da fonte de log, os grava na fila e envia uma confirmação de recebimento dos dados para a fonte.



As mensagens da fila são processadas pelo Logstash, passam o filtro e o plugin de saída. Ao receber a confirmação da saída de envio de um log, o Logstash remove o log processado da fila. Se o Logstash parar, todas as mensagens não processadas e mensagens que não receberam confirmação de envio permanecerão na fila e o Logstash continuará a processá-las na próxima vez que for iniciado.



Configurando



Regulado por chaves no arquivo C:\Logstash\config\logstash.yml:



  • queue.type: (os valores possíveis são persistede memory (default)).
  • path.queue: (caminho para a pasta com arquivos de fila, que são armazenados em C: \ Logstash \ queue por padrão).
  • queue.page_capacity: (o tamanho máximo da página da fila, o padrão é 64 MB).
  • queue.drain: (verdadeiro / falso - ativa / desativa a interrupção do processamento da fila antes de desligar o Logstash. Não recomendo ativá-lo, porque isso afetará diretamente a velocidade de desligamento do servidor).
  • queue.max_events: (número máximo de eventos na fila, padrão - 0 (ilimitado)).
  • queue.max_bytes: (tamanho máximo da fila em bytes, o padrão é 1024 MB (1 GB)).


Se queue.max_eventse estiverem configurados queue.max_bytes, as mensagens deixarão de ser recebidas na fila quando o valor de qualquer uma dessas configurações for atingido. Leia mais sobre filas persistentes aqui .



Um exemplo da parte de logstash.yml responsável por configurar uma fila:



queue.type: persisted
queue.max_bytes: 10gb


Configurando



A configuração do Logstash geralmente consiste em três partes, responsáveis ​​por diferentes fases de processamento dos logs de entrada: recebimento (seção de entrada), análise (seção de filtro) e envio para Elastic (seção de saída). A seguir, veremos mais de perto cada um deles.



Entrada



O fluxo de entrada com logs brutos é recebido dos agentes do filebeat. É este plugin que especificamos na seção de entrada:



input {
  beats {
    port => 5044
  }
}


Após essa configuração, o Logstash começa a escutar na porta 5044 e, ao receber os logs, processa-os de acordo com as configurações na seção de filtro. Se necessário, você pode quebrar o canal para receber logs de filebit em SSL. Leia mais sobre as configurações do plugin do beats aqui .



Filtro



Todos os logs de texto interessantes que o Exchange gera para processamento estão no formato csv com os campos descritos no próprio arquivo de log. Para analisar registros csv, Logstash nos oferece três plugins: dissecação , csv e grok. O primeiro é o mais rápido , mas pode analisar apenas os registros mais simples.

Por exemplo, ele dividirá o seguinte registro em dois (devido à presença de uma vírgula dentro do campo), o que fará com que o log seja analisado incorretamente:



…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…


Ele pode ser usado ao analisar logs, por exemplo, IIS. Nesse caso, a seção de filtro pode ter a seguinte aparência:



filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
    }
  }
} 


A configuração do Logstash permite declarações condicionais , portanto, só podemos enviar logs para o plug-in de dissecção que foram marcados com uma tag filebeat IIS. Dentro do plugin, combinamos os valores dos campos com seus nomes, removemos o campo original messageque continha a entrada do log e podemos adicionar um campo arbitrário que irá, por exemplo, conter o nome do aplicativo do qual coletamos os logs.



No caso de registros de rastreamento, é melhor usar o plug-in csv, ele pode processar campos complexos corretamente:



filter {
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
}


Dentro do plug-in, combinamos os valores dos campos com seus nomes, excluímos o campo original message(bem como os campos tenant-ide schema-version) que continham a entrada do registro e podemos adicionar um campo arbitrário que irá, por exemplo, conter o nome do aplicativo do qual coletamos os registros.



Na saída do estágio de filtragem, obteremos documentos em uma primeira aproximação, prontos para renderização em Kibana. Sentiremos falta do seguinte:



  • Os campos numéricos serão reconhecidos como texto, evitando que operações sejam realizadas neles. Ou seja, os campos de time-takenlog do IIS, bem como os campos recipient-counte total-biteslog de rastreamento.
  • O carimbo de data / hora padrão do documento conterá o tempo de processamento do log, não o tempo de gravação do servidor.
  • O campo recipient-addressterá a aparência de uma construção única, o que não permite a análise com a contagem dos destinatários das cartas.


Agora é a hora de adicionar um pouco de mágica ao processo de processamento de log.



Convertendo campos numéricos



O plugin de dissecção tem uma opção convert_datatypeque você pode usar para converter um campo de texto em formato digital. Por exemplo, assim:



dissect {
  convert_datatype => { "time-taken" => "int" }
}


Vale lembrar que este método só é adequado se o campo for definitivamente um string. A opção não processa valores nulos dos campos e é lançada em uma exceção.



Para rastrear logs, é melhor não usar um método de conversão semelhante, uma vez que os campos recipient-counte total-bitespodem estar vazios. É melhor usar o plug-in mutate para converter estes campos :



mutate {
  convert => [ "total-bytes", "integer" ]
  convert => [ "recipient-count", "integer" ]
}


Divisão do destinatário em destinatários individuais



Essa tarefa também pode ser resolvida usando o plug-in mutate:



mutate {
  split => ["recipient_address", ";"]
}


Alterar o carimbo de data / hora



No caso de logs de rastreamento, a tarefa é facilmente resolvida pelo plugin de data , que ajudará a escrever a timestampdata e hora no campo no formato exigido do campo date-time:



date {
  match => [ "date-time", "ISO8601" ]
  timezone => "Europe/Moscow"
  remove_field => [ "date-time" ]
}


No caso dos logs do IIS, precisaremos combinar os dados do campo datee time, usando o plug-in mutate, registrar o fuso horário de que precisamos e colocar esse carimbo de hora timestampusando o plug-in de data:



mutate { 
  add_field => { "data-time" => "%{date} %{time}" }
  remove_field => [ "date", "time" ]
}
date { 
  match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
  timezone => "UTC"
  remove_field => [ "data-time" ]
}


Resultado



A seção de saída é usada para enviar logs processados ​​para o receptor de log. No caso de envio direto para o Elastic, o plugin elasticsearch é usado , que especifica o endereço do servidor e o modelo para o nome do índice para enviar o documento gerado:



output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}


Configuração final



A configuração final será semelhante a esta:



input {
  beats {
    port => 5044
  }
}
 
filter {
  if "IIS" in [tags] {
    dissect {
      mapping => {
        "message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
      }
      remove_field => ["message"]
      add_field => { "application" => "exchange" }
      convert_datatype => { "time-taken" => "int" }
    }
    mutate { 
      add_field => { "data-time" => "%{date} %{time}" }
      remove_field => [ "date", "time" ]
    }
    date { 
      match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
      timezone => "UTC"
      remove_field => [ "data-time" ]
    }
  }
  if "Tracking" in [tags] {
    csv {
      columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
      remove_field => ["message", "tenant-id", "schema-version"]
      add_field => { "application" => "exchange" }
    }
    mutate {
      convert => [ "total-bytes", "integer" ]
      convert => [ "recipient-count", "integer" ]
      split => ["recipient_address", ";"]
    }
    date {
      match => [ "date-time", "ISO8601" ]
      timezone => "Europe/Moscow"
      remove_field => [ "date-time" ]
    }
  }
}
 
output {
  elasticsearch {
    hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
    manage_template => false
    index => "Exchange-%{+YYYY.MM.dd}"
  }
}


Links Úteis:






All Articles