Um exemplo de aplicativo baseado em webhook orientado a eventos no armazenamento de objetos S3 Mail.ru Cloud Solutions



Máquina de café Rube Goldberg A



arquitetura orientada a eventos aumenta a eficiência de custos dos recursos utilizados, porque eles são usados ​​somente quando são necessários. Existem muitas opções de como implementar isso e não criar entidades adicionais na nuvem como aplicativos de trabalho. E hoje não vou falar sobre FaaS, mas sobre webhooks. Vou mostrar um exemplo tutorial de como lidar com eventos com os webhooks de armazenamento de objetos.



Algumas palavras sobre armazenamento de objetos e webhooks. O armazenamento de objetos permite armazenar quaisquer dados na nuvem como objetos acessíveis via S3 ou outra API (dependendo da implementação) via HTTP / HTTPS. Webhooks geralmente são retornos de chamada HTTP personalizados. Eles geralmente são acionados por um evento, como um envio de código para um repositório ou um comentário publicado em um blog. Quando um evento ocorre, o site de origem envia uma solicitação HTTP para o URL especificado para o webhook. Como resultado, você pode fazer eventos em um site acionar ações em outro ( wiki ). Quando o site de origem é Armazenamento de Objetos, os eventos são alterados em seu conteúdo.



Exemplos de casos simples em que essa automação pode ser usada:



  1. . « », .
  2. , , .
  3. ( , , , ).
  4. , , Kubernetes, , .


Como exemplo, faremos uma variante da tarefa 1, quando as alterações no bucket de armazenamento de objetos Mail.ru Cloud Solutions (MCS) são sincronizadas usando webhooks no armazenamento de objetos da AWS. Em um caso carregado real, você deve fornecer trabalho assíncrono registrando webhooks na fila, mas, para a tarefa educacional, faremos a implementação sem isso.



Esquema de trabalho



O protocolo de comunicação é descrito em detalhes no guia de webhooks S3 no MCS . O esquema de trabalho possui os seguintes elementos:



  • Um serviço de publicação localizado no lado S3 e publica solicitações HTTP quando um webnhook é acionado.
  • Um servidor de recebimento de webhook que escuta solicitações do serviço de publicação HTTP e executa as ações apropriadas. O servidor pode ser escrito em qualquer idioma; em nosso exemplo, o servidor será gravado em Go.


A peculiaridade da implementação do webhook na API S3 é o registro do servidor de recepção do webhook no serviço de publicação. Em particular, o servidor de recebimento do webhook deve confirmar a assinatura das mensagens do serviço de publicação (em outras implementações do webhook, geralmente não é necessário confirmar a assinatura).



Assim, o servidor de recebimento do webhook deve suportar duas operações principais:



  • responder a uma solicitação do serviço de publicação para confirmação do registro,
  • processar eventos recebidos.


Instalando o Servidor para Receber Webhooks



É necessário um servidor Linux para executar o servidor de recebimento do webhook. Neste artigo, como exemplo, usamos uma instância virtual que implantamos no MCS.



Instale o software necessário e inicie o servidor webhook.



ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install git
Reading package lists... Done
Building dependency tree
Reading state information... Done
The following packages were automatically installed and are no longer required:
  bc dns-root-data dnsmasq-base ebtables landscape-common liblxc-common 
liblxc1 libuv1 lxcfs lxd lxd-client python3-attr python3-automat 
python3-click python3-constantly python3-hyperlink
  python3-incremental python3-pam python3-pyasn1-modules 
python3-service-identity python3-twisted python3-twisted-bin 
python3-zope.interface uidmap xdelta3
Use 'sudo apt autoremove' to remove them.
Suggested packages:
  git-daemon-run | git-daemon-sysvinit git-doc git-el git-email git-gui 
gitk gitweb git-cvs git-mediawiki git-svn
The following NEW packages will be installed:
  git
0 upgraded, 1 newly installed, 0 to remove and 46 not upgraded.
Need to get 3915 kB of archives.
After this operation, 32.3 MB of additional disk space will be used.
Get:1 http://MS1.clouds.archive.ubuntu.com/ubuntu bionic-updates/main 
amd64 git amd64 1:2.17.1-1ubuntu0.7 [3915 kB]
Fetched 3915 kB in 1s (5639 kB/s)
Selecting previously unselected package git.
(Reading database ... 53932 files and directories currently installed.)
Preparing to unpack .../git_1%3a2.17.1-1ubuntu0.7_amd64.deb ...
Unpacking git (1:2.17.1-1ubuntu0.7) ...
Setting up git (1:2.17.1-1ubuntu0.7) ...


Clone a pasta com o servidor de recebimento do webhook:



ubuntu@ubuntu-basic-1-2-10gb:~$ git clone
https://github.com/RomanenkoDenys/s3-webhook.git
Cloning into 's3-webhook'...
remote: Enumerating objects: 48, done.
remote: Counting objects: 100% (48/48), done.
remote: Compressing objects: 100% (27/27), done.
remote: Total 114 (delta 20), reused 45 (delta 18), pack-reused 66
Receiving objects: 100% (114/114), 23.77 MiB | 20.25 MiB/s, done.
Resolving deltas: 100% (49/49), done.


Vamos iniciar o servidor:



ubuntu@ubuntu-basic-1-2-10gb:~$ cd s3-webhook/
ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80


Assinando o serviço de publicação



Você pode registrar seu servidor para receber webhooks via API ou interface da web. Para simplificar, registraremos através da interface da web:



  1. Vá para a seção de baldes na sala de controle.
  2. Vamos ao balde, para o qual vamos configurar webhooks, e clicamos na engrenagem:






Vá para a guia Webhooks e clique em Adicionar:





Preencha os campos:







ID - o nome do webhook.



Evento - quais eventos enviar. Definimos a transferência de todos os eventos que ocorrem ao trabalhar com arquivos (adicionar e excluir).



URL - endereço do servidor de recebimento do webhook.



O prefixo / sufixo do filtro é um filtro que permite gerar webhooks apenas para objetos cujos nomes correspondem a determinadas regras. Por exemplo, para fazer o webhook funcionar apenas com arquivos com a extensão .png, escreva "png" no sufixo Filtro .



Atualmente, apenas as portas 80 e 443 são suportadas para acessar o servidor de recebimento da Web.



Clique em Adicionar gancho e veja o seguinte:





Gancho adicionado.



O servidor para receber webhooks nos logs mostra o progresso do processo de registro do gancho:



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80
2020/06/15 12:01:14 [POST] incoming HTTP request from 
95.163.216.92:42530
2020/06/15 12:01:14 Got timestamp: 2020-06-15T15:01:13+03:00 TopicArn: 
mcs5259999770|myfiles-ash|s3:ObjectCreated:*,s3:ObjectRemoved:* Token: 
E2itMqAMUVVZc51pUhFWSp13DoxezvRxkUh5P7LEuk1dEe9y URL: 
http://89.208.199.220/webhook
2020/06/15 12:01:14 Generate responce signature: 
3754ce36636f80dfd606c5254d64ecb2fd8d555c27962b70b4f759f32c76b66d


O registro terminou. Na próxima seção, examinaremos mais de perto o algoritmo do servidor que está recebendo webhooks.



Descrição do servidor para receber webhooks



No nosso exemplo, o servidor está escrito em Go. Vamos analisar os princípios básicos de seu trabalho.



package main

// Generate hmac_sha256_hex
func HmacSha256hex(message string, secret string) string {
}

// Generate hmac_sha256
func HmacSha256(message string, secret string) string {
}

// Send subscription confirmation
func SubscriptionConfirmation(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Send subscription confirmation
func GotRecords(w http.ResponseWriter, req *http.Request, body []byte) {
}

// Liveness probe
func Ping(w http.ResponseWriter, req *http.Request) {
    // log request
    log.Printf("[%s] incoming HTTP Ping request from %s\n", req.Method, req.RemoteAddr)
    fmt.Fprintf(w, "Pong\n")
}

//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {
}

func main() {

    // get command line args
    bindPort := flag.Int("port", 80, "number between 1-65535")
    bindAddr := flag.String("address", "", "ip address in dot format")
    flag.StringVar(&actionScript, "script", "", "external script to execute")
    flag.Parse()

    http.HandleFunc("/ping", Ping)
    http.HandleFunc("/webhook", Webhook)

log.Fatal(http.ListenAndServe(*bindAddr+":"+strconv.Itoa(*bindPort), nil))
}


Vamos considerar as principais funções:



  • Ping () é uma rota que responde por URL / ping, a implementação mais simples de um probe animado.
  • Webhook () - rota principal, manipulador de URL / webhook:

    • confirma o registro no serviço de publicação (transição para a função SubscriptionConfirmation),
    • processa webhooks recebidos (função Gotrecords).
  • As funções HmacSha256 e HmacSha256hex são implementações dos algoritmos de criptografia HMAC-SHA256 e HMAC-SHA256 com a saída como uma sequência de números hexadecimais para subtração de assinatura.
  • main é a função principal, processa os parâmetros da linha de comando e registra os manipuladores de URL.


Parâmetros da linha de comando aceitos pelo servidor:



  • -port é a porta na qual o servidor escutará.
  • -address é o endereço IP que o servidor escutará.
  • -script é um programa externo chamado em todos os ganchos que entram.


Vamos dar uma olhada em algumas das funções:



//Webhook
func Webhook(w http.ResponseWriter, req *http.Request) {

    // Read body
    body, err := ioutil.ReadAll(req.Body)
    defer req.Body.Close()
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    // log request
    log.Printf("[%s] incoming HTTP request from %s\n", req.Method, req.RemoteAddr)
    // check if we got subscription confirmation request
    if strings.Contains(string(body), 
"\"Type\":\"SubscriptionConfirmation\"") {
        SubscriptionConfirmation(w, req, body)
    } else {
        GotRecords(w, req, body)
    }

}


Esta função determina o que veio - um pedido de confirmação de registro ou um webhook. Como segue a documentação , em caso de confirmação do registro, a seguinte estrutura Json vem na solicitação Post:



POST http://test.com HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation
content-type: application/json

{
    "Timestamp":"2019-12-26T19:29:12+03:00",
    "Type":"SubscriptionConfirmation",
    "Message":"You have chosen to subscribe to the topic $topic. To confirm the subscription you need to response with calculated signature",
    "TopicArn":"mcs2883541269|bucketA|s3:ObjectCreated:Put",
    "SignatureVersion":1,
    "Token":«RPE5UuG94rGgBH6kHXN9FUPugFxj1hs2aUQc99btJp3E49tA»
}


Esta solicitação precisa ser respondida:



content-type: application/json

{"signature":«ea3fce4bb15c6de4fec365d36bcebbc34ccddf54616d5ca12e1972f82b6d37af»}


Onde a assinatura é calculada como:



signature = hmac_sha256(url, hmac_sha256(TopicArn, 
hmac_sha256(Timestamp, Token)))


Se um webhook chegar, a estrutura da solicitação Post ficará assim:



POST <url> HTTP/1.1
x-amz-sns-messages-type: SubscriptionConfirmation

{ "Records":
    [
        {
            "s3": {
                "object": {
                    "eTag":"aed563ecafb4bcc5654c597a421547b2",
                    "sequencer":1577453615,
                    "key":"some-file-to-bucket",
                    "size":100
                },
            "configurationId":"1",
            "bucket": {
                "name": "bucketA",
                "ownerIdentity": {
                    "principalId":"mcs2883541269"}
                },
                "s3SchemaVersion":"1.0"
            },
            "eventVersion":"1.0",
            "requestParameters":{
                "sourceIPAddress":"185.6.245.156"
            },
            "userIdentity": {
                "principalId":"2407013e-cbc1-415f-9102-16fb9bd6946b"
            },
            "eventName":"s3:ObjectCreated:Put",
            "awsRegion":"ru-msk",
            "eventSource":"aws:s3",
            "responseElements": {
                "x-amz-request-id":"VGJR5rtJ"
            }
        }
    ]
}


Consequentemente, dependendo da solicitação, você precisa entender como processar os dados. Escolhi um registro como indicador "Type":"SubscriptionConfirmation", pois ele está presente na solicitação para confirmar a assinatura e não está presente no webhook. Com base na presença / ausência desse registro na solicitação POST, a execução adicional do programa entra em uma função SubscriptionConfirmationou em uma função GotRecords.



Não consideraremos a função SubscriptionConfirmation em detalhes, ela será implementada de acordo com os princípios estabelecidos na documentação . Você pode verificar o código fonte para esta função no repositório git do projeto .



A função GotRecords analisa a solicitação recebida e, para cada objeto Record, chama um script externo (cujo nome foi passado no parâmetro -script) com os parâmetros:



  • nome do balde
  • chave de objeto
  • Aja:

    • cópia - se na solicitação original EventName = ObjectCreated | PutObject | PutObjectCopy
    • delete - se na solicitação original EventName = ObjectRemoved | DeleteObject


Portanto, se um gancho com uma solicitação Post chegar, como descrito acima , e o parâmetro -script = script.sh, o script será chamado da seguinte maneira:



script.sh  bucketA some-file-to-bucket copy


Deve-se entender que esse servidor de recebimento de webhook não é uma solução de produção completa, mas um exemplo simplificado de uma possível implementação.



Exemplo de trabalho



Vamos sincronizar os arquivos do depósito principal no MCS com o depósito de backup na AWS. O bucket principal é chamado myfiles-ash, o backup é myfiles-backup (a configuração de um bucket na AWS está fora do escopo deste artigo). Assim, quando um arquivo é colocado no depósito principal, sua cópia deve aparecer no backup; quando excluída do arquivo principal, deve ser excluída no backup.



Trabalharemos com buckets usando o utilitário awscli, com o qual o armazenamento em nuvem MCS e o armazenamento em nuvem da AWS são compatíveis.



ubuntu@ubuntu-basic-1-2-10gb:~$ sudo apt-get install awscli
Reading package lists... Done
Building dependency tree
Reading state information... Done
After this operation, 34.4 MB of additional disk space will be used.
Unpacking awscli (1.14.44-1ubuntu1) ...
Setting up awscli (1.14.44-1ubuntu1) ...


Vamos configurar o acesso à API do S3 MCS:



ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile mcs
AWS Access Key ID [None]: hdywEPtuuJTExxxxxxxxxxxxxx
AWS Secret Access Key [None]: hDz3SgxKwXoxxxxxxxxxxxxxxxxxx
Default region name [None]:
Default output format [None]:


Vamos configurar o acesso à API do AWS S3:



ubuntu@ubuntu-basic-1-2-10gb:~$ aws configure --profile aws
AWS Access Key ID [None]: AKIAJXXXXXXXXXXXX
AWS Secret Access Key [None]: dfuerphOLQwu0CreP5Z8l5fuXXXXXXXXXXXXXXXX
Default region name [None]:
Default output format [None]:


Vamos verificar os acessos:



Para a AWS:



ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile aws
2020-07-06 08:44:11 myfiles-backup


Para o MCS, quando o comando estiver em execução, adicione --endpoint-url:



ubuntu@ubuntu-basic-1-2-10gb:~$ aws s3 ls --profile mcs --endpoint-url 
https://hb.bizmrg.com
2020-02-04 06:38:05 databasebackups-0cdaaa6402d4424e9676c75a720afa85
2020-05-27 10:08:33 myfiles-ash


Acessado.



Agora vamos escrever um script para lidar com o gancho de entrada, vamos chamá-lo de s3_backup_mcs_aws.sh



#!/bin/bash
# Require aws cli
# if file added — copy it to backup bucket
# if file removed — remove it from backup bucket
# Variables
ENDPOINT_MCS="https://hb.bizmrg.com"
AWSCLI_MCS=`which aws`" --endpoint-url ${ENDPOINT_MCS} --profile mcs s3"
AWSCLI_AWS=`which aws`" --profile aws s3"
BACKUP_BUCKET="myfiles-backup"

SOURCE_BUCKET="${1}"
SOURCE_FILE="${2}"
ACTION="${3}"

SOURCE="s3://${SOURCE_BUCKET}/${SOURCE_FILE}"
TARGET="s3://${BACKUP_BUCKET}/${SOURCE_FILE}"
TEMP="/tmp/${SOURCE_BUCKET}/${SOURCE_FILE}"

case ${ACTION} in
    "copy")
    ${AWSCLI_MCS} cp "${SOURCE}" "${TEMP}"
    ${AWSCLI_AWS} cp "${TEMP}" "${TARGET}"
    rm ${TEMP}
    ;;

    "delete")
    ${AWSCLI_AWS} rm ${TARGET}
    ;;

    *)
    echo "Usage: ${0} sourcebucket sourcefile copy/delete"
    exit 1
    ;;
esac


Iniciamos o servidor:



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ sudo ./s3-webhook -port 80 -
script scripts/s3_backup_mcs_aws.sh


Verificando como funciona. Por meio da interface da web, o MCS inclui o arquivo test.txt no balde myfiles-ash. Nos logs no console, você pode ver que uma solicitação foi feita ao servidor webhook:



2020/07/06 09:43:08 [POST] incoming HTTP request from 
95.163.216.92:56612
download: s3://myfiles-ash/test.txt to ../../../tmp/myfiles-ash/test.txt
upload: ../../../tmp/myfiles-ash/test.txt to 
s3://myfiles-backup/test.txt


Vamos verificar o conteúdo do bucket de arquivos de backup myfiles na AWS:



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
2020-07-06 09:43:10       1104 test.txt


Agora, na interface da web, exclua o arquivo do balde myfiles-ash.



Logs do servidor:



2020/07/06 09:44:46 [POST] incoming HTTP request from 
95.163.216.92:58224
delete: s3://myfiles-backup/test.txt


Conteúdo do bloco:



ubuntu@ubuntu-basic-1-2-10gb:~/s3-webhook$ aws s3 --profile aws ls 
myfiles-backup
ubuntu@ubuntu-basic-1-2-10gb:~$


Arquivo excluído, problema resolvido.



Conclusão e ToDo



Todo o código usado neste artigo está no meu repositório . Existem também exemplos de scripts e exemplos de contagem de assinaturas para registrar webhooks.



Esse código nada mais é do que um exemplo de como você pode usar webhooks S3 em suas atividades. Como eu disse no começo, se você planeja usar esse servidor em produção, deve pelo menos reescrever o servidor para operação assíncrona: registrar webhooks recebidos em uma fila (RabbitMQ ou NATS) e, a partir daí, desmonte e processe-os por aplicativos de trabalho. Caso contrário, com a chegada massiva de webhooks, você poderá encontrar uma falta de recursos do servidor para executar tarefas. A presença de filas permite espalhar o servidor e os trabalhadores, além de resolver problemas com tarefas repetidas em caso de falhas. Também é desejável alterar o registro para um mais detalhado e padronizado.



Boa sorte!



Leia mais sobre o tema:






All Articles