
Eu continuo minha história sobre como fazer amigos do Exchange e do ELK (comece aqui ). Deixe-me lembrá-lo de que esta combinação é capaz de lidar com um grande número de toras sem hesitação. Desta vez, falaremos sobre como fazer o Exchange funcionar com os componentes Logstash e Kibana.
O Logstash na pilha ELK é usado para processar logs de maneira inteligente e prepará-los para colocação no Elastic na forma de documentos, com base nos quais é conveniente construir várias visualizações em Kibana.
Instalação
Consiste em duas etapas:
- Instalando e configurando o pacote OpenJDK.
- Instalando e configurando o pacote Logstash.
Instalando e configurando o
pacote OpenJDK O pacote OpenJDK deve ser baixado e descompactado em um diretório específico. Em seguida, o caminho para este diretório deve ser inserido nas variáveis $ env: Path e $ env: JAVA_HOME do sistema operacional Windows:


Verifique a versão Java:
PS C:\> java -version
openjdk version "13.0.1" 2019-10-15
OpenJDK Runtime Environment (build 13.0.1+9)
OpenJDK 64-Bit Server VM (build 13.0.1+9, mixed mode, sharing)
Instalando e configurando o pacote Logstash
Baixe o arquivo com a distribuição Logstash aqui . O arquivo deve ser descompactado para a raiz do disco.
C:\Program Files
Você não deve descompactá-lo em uma pasta , o Logstash se recusará a iniciar normalmente. Em seguida, você precisa fazer jvm.options
alterações no arquivo que é responsável por alocar RAM para o processo Java. Eu recomendo especificar metade da RAM do servidor. Se ele tiver 16 GB de RAM a bordo, as chaves padrão são:
-Xms1g
-Xmx1g
deve ser substituído por:
-Xms8g
-Xmx8g
Além disso, é aconselhável comentar a linha
-XX:+UseConcMarkSweepGC
. Leia mais sobre isso aqui . A próxima etapa é criar uma configuração padrão no arquivo logstash.conf:
input {
stdin{}
}
filter {
}
output {
stdout {
codec => "rubydebug"
}
}
Com essa configuração, o Logstash lê os dados do console, passa por um filtro vazio e grava de volta no console. Aplicar esta configuração testará a funcionalidade do Logstash. Para fazer isso, vamos lançá-lo interativamente:
PS C:\...\bin> .\logstash.bat -f .\logstash.conf
...
[2019-12-19T11:15:27,769][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2019-12-19T11:15:27,847][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2019-12-19T11:15:28,113][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
O Logstash foi lançado com sucesso na porta 9600.
A etapa final da instalação é lançar o Logstash como um serviço do Windows. Isso pode ser feito, por exemplo, usando o pacote NSSM :
PS C:\...\bin> .\nssm.exe install logstash
Service "logstash" installed successfully!
tolerância ao erro
O mecanismo de Filas Persistentes garante a segurança dos logs durante a transmissão do servidor de origem.
Como funciona
O layout das filas durante o processamento do log: entrada → fila → filtro + saída.
O plug-in de entrada recebe dados da fonte de log, os grava na fila e envia uma confirmação de recebimento dos dados para a fonte.
As mensagens da fila são processadas pelo Logstash, passam o filtro e o plugin de saída. Ao receber a confirmação da saída de envio de um log, o Logstash remove o log processado da fila. Se o Logstash parar, todas as mensagens não processadas e mensagens que não receberam confirmação de envio permanecerão na fila e o Logstash continuará a processá-las na próxima vez que for iniciado.
Configurando
Regulado por chaves no arquivo
C:\Logstash\config\logstash.yml:
queue.type
: (os valores possíveis sãopersisted
ememory (default))
.path.queue
: (caminho para a pasta com arquivos de fila, que são armazenados em C: \ Logstash \ queue por padrão).queue.page_capacity
: (o tamanho máximo da página da fila, o padrão é 64 MB).queue.drain
: (verdadeiro / falso - ativa / desativa a interrupção do processamento da fila antes de desligar o Logstash. Não recomendo ativá-lo, porque isso afetará diretamente a velocidade de desligamento do servidor).queue.max_events
: (número máximo de eventos na fila, padrão - 0 (ilimitado)).queue.max_bytes
: (tamanho máximo da fila em bytes, o padrão é 1024 MB (1 GB)).
Se
queue.max_events
e estiverem configurados queue.max_bytes
, as mensagens deixarão de ser recebidas na fila quando o valor de qualquer uma dessas configurações for atingido. Leia mais sobre filas persistentes aqui .
Um exemplo da parte de logstash.yml responsável por configurar uma fila:
queue.type: persisted
queue.max_bytes: 10gb
Configurando
A configuração do Logstash geralmente consiste em três partes, responsáveis por diferentes fases de processamento dos logs de entrada: recebimento (seção de entrada), análise (seção de filtro) e envio para Elastic (seção de saída). A seguir, veremos mais de perto cada um deles.
Entrada
O fluxo de entrada com logs brutos é recebido dos agentes do filebeat. É este plugin que especificamos na seção de entrada:
input {
beats {
port => 5044
}
}
Após essa configuração, o Logstash começa a escutar na porta 5044 e, ao receber os logs, processa-os de acordo com as configurações na seção de filtro. Se necessário, você pode quebrar o canal para receber logs de filebit em SSL. Leia mais sobre as configurações do plugin do beats aqui .
Filtro
Todos os logs de texto interessantes que o Exchange gera para processamento estão no formato csv com os campos descritos no próprio arquivo de log. Para analisar registros csv, Logstash nos oferece três plugins: dissecação , csv e grok. O primeiro é o mais rápido , mas pode analisar apenas os registros mais simples.
Por exemplo, ele dividirá o seguinte registro em dois (devido à presença de uma vírgula dentro do campo), o que fará com que o log seja analisado incorretamente:
…,"MDB:GUID1, Mailbox:GUID2, Event:526545791, MessageClass:IPM.Note, CreationTime:2020-05-15T12:01:56.457Z, ClientType:MOMT, SubmissionAssistant:MailboxTransportSubmissionEmailAssistant",…
Ele pode ser usado ao analisar logs, por exemplo, IIS. Nesse caso, a seção de filtro pode ter a seguinte aparência:
filter {
if "IIS" in [tags] {
dissect {
mapping => {
"message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
}
remove_field => ["message"]
add_field => { "application" => "exchange" }
}
}
}
A configuração do Logstash permite declarações condicionais , portanto, só podemos enviar logs para o plug-in de dissecção que foram marcados com uma tag filebeat
IIS
. Dentro do plugin, combinamos os valores dos campos com seus nomes, removemos o campo original message
que continha a entrada do log e podemos adicionar um campo arbitrário que irá, por exemplo, conter o nome do aplicativo do qual coletamos os logs.
No caso de registros de rastreamento, é melhor usar o plug-in csv, ele pode processar campos complexos corretamente:
filter {
if "Tracking" in [tags] {
csv {
columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
remove_field => ["message", "tenant-id", "schema-version"]
add_field => { "application" => "exchange" }
}
}
Dentro do plug-in, combinamos os valores dos campos com seus nomes, excluímos o campo original
message
(bem como os campos tenant-id
e schema-version
) que continham a entrada do registro e podemos adicionar um campo arbitrário que irá, por exemplo, conter o nome do aplicativo do qual coletamos os registros.
Na saída do estágio de filtragem, obteremos documentos em uma primeira aproximação, prontos para renderização em Kibana. Sentiremos falta do seguinte:
- Os campos numéricos serão reconhecidos como texto, evitando que operações sejam realizadas neles. Ou seja, os campos de
time-taken
log do IIS, bem como os camposrecipient-count
etotal-bites
log de rastreamento. - O carimbo de data / hora padrão do documento conterá o tempo de processamento do log, não o tempo de gravação do servidor.
- O campo
recipient-address
terá a aparência de uma construção única, o que não permite a análise com a contagem dos destinatários das cartas.
Agora é a hora de adicionar um pouco de mágica ao processo de processamento de log.
Convertendo campos numéricos
O plugin de dissecção tem uma opção
convert_datatype
que você pode usar para converter um campo de texto em formato digital. Por exemplo, assim:
dissect {
…
convert_datatype => { "time-taken" => "int" }
…
}
Vale lembrar que este método só é adequado se o campo for definitivamente um string. A opção não processa valores nulos dos campos e é lançada em uma exceção.
Para rastrear logs, é melhor não usar um método de conversão semelhante, uma vez que os campos
recipient-count
e total-bites
podem estar vazios. É melhor usar o plug-in mutate para converter estes campos :
mutate {
convert => [ "total-bytes", "integer" ]
convert => [ "recipient-count", "integer" ]
}
Divisão do destinatário em destinatários individuais
Essa tarefa também pode ser resolvida usando o plug-in mutate:
mutate {
split => ["recipient_address", ";"]
}
Alterar o carimbo de data / hora
No caso de logs de rastreamento, a tarefa é facilmente resolvida pelo plugin de data , que ajudará a escrever a
timestamp
data e hora no campo no formato exigido do campo date-time
:
date {
match => [ "date-time", "ISO8601" ]
timezone => "Europe/Moscow"
remove_field => [ "date-time" ]
}
No caso dos logs do IIS, precisaremos combinar os dados do campo
date
e time
, usando o plug-in mutate, registrar o fuso horário de que precisamos e colocar esse carimbo de hora timestamp
usando o plug-in de data:
mutate {
add_field => { "data-time" => "%{date} %{time}" }
remove_field => [ "date", "time" ]
}
date {
match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
timezone => "UTC"
remove_field => [ "data-time" ]
}
Resultado
A seção de saída é usada para enviar logs processados para o receptor de log. No caso de envio direto para o Elastic, o plugin elasticsearch é usado , que especifica o endereço do servidor e o modelo para o nome do índice para enviar o documento gerado:
output {
elasticsearch {
hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
manage_template => false
index => "Exchange-%{+YYYY.MM.dd}"
}
}
Configuração final
A configuração final será semelhante a esta:
input {
beats {
port => 5044
}
}
filter {
if "IIS" in [tags] {
dissect {
mapping => {
"message" => "%{date} %{time} %{s-ip} %{cs-method} %{cs-uri-stem} %{cs-uri-query} %{s-port} %{cs-username} %{c-ip} %{cs(User-Agent)} %{cs(Referer)} %{sc-status} %{sc-substatus} %{sc-win32-status} %{time-taken}"
}
remove_field => ["message"]
add_field => { "application" => "exchange" }
convert_datatype => { "time-taken" => "int" }
}
mutate {
add_field => { "data-time" => "%{date} %{time}" }
remove_field => [ "date", "time" ]
}
date {
match => [ "data-time", "YYYY-MM-dd HH:mm:ss" ]
timezone => "UTC"
remove_field => [ "data-time" ]
}
}
if "Tracking" in [tags] {
csv {
columns => ["date-time","client-ip","client-hostname","server-ip","server-hostname","source-context","connector-id","source","event-id","internal-message-id","message-id","network-message-id","recipient-address","recipient-status","total-bytes","recipient-count","related-recipient-address","reference","message-subject","sender-address","return-path","message-info","directionality","tenant-id","original-client-ip","original-server-ip","custom-data","transport-traffic-type","log-id","schema-version"]
remove_field => ["message", "tenant-id", "schema-version"]
add_field => { "application" => "exchange" }
}
mutate {
convert => [ "total-bytes", "integer" ]
convert => [ "recipient-count", "integer" ]
split => ["recipient_address", ";"]
}
date {
match => [ "date-time", "ISO8601" ]
timezone => "Europe/Moscow"
remove_field => [ "date-time" ]
}
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200", "127.0.0.2:9200"]
manage_template => false
index => "Exchange-%{+YYYY.MM.dd}"
}
}
Links Úteis:
- Como instalar o OpenJDK 11 no Windows?
- Baixar Logstash
- Elastic usa a opção obsoleta UseConcMarkSweepGC # 36828
- NSSM
- Filas persistentes
- Plug-in de entrada do Beats
- Logstash Cara, onde está minha motosserra? Eu preciso dissecar meus logs
- Plugin de filtro de dissecção
- Condicionais
- Plug-in de filtro mutate
- Plugin de filtro de data
- Plugin de saída do Elasticsearch