API em tempo real no contexto do Apache Kafka

Olá a todos. Já em dezembro, um novo fluxo do curso de Arquiteto de Software começará na OTUS . Antecipando o início do curso, gostaria de compartilhar com vocês a tradução de um artigo interessante. Sugiro também assistir a uma aula de demonstração sobre o tema: "Idempotência e comutabilidade da API em filas e HTTP" .






Uma das questões complicadas que enfrentamos constantemente ao projetar aplicativos e sistemas em geral é como organizar efetivamente a troca de informações entre os componentes, mantendo a flexibilidade para alterar as interfaces sem afetar indevidamente outras partes do sistema. Quanto mais específica e otimizada for uma interface, mais provável será que seja tão circunstancial que exigirá uma reescrita completa para alterá-la. E vice versa; Os genéricos podem ser bastante adaptáveis ​​e amplamente suportados, mas, infelizmente, às custas do desempenho.





(Events) , API (real-time APIs) , , ; , .





. , , - , , . : , , .





(state) - , NoSQL . , , - - . , (consumers) - , . (producers) , , .





, , , , . , . (payload) , , . :





  • : userLogin







    • : zbeeblebrox



      2020-08-17 16:26:39 BST







  • : CarParked







    • : A42 XYZ



      2020-08-17 16:36:27



      X42







  • : orderPlaced







    • :







      £2.25



      2020-08-17 16:35:41 BST











(, , ), (, , , , ).





, , , , - Apache Kafka®. Kafka - , :





  • Pub/Sub





    • () () , / .









    • , .









    • .





Kafka . , , , , , . , , , , NoSQL .





API-, Apache Kafka, , , .





API

, Kafka, , , , , , ( - ). , Kafka - . , , - Kafka (topic - Kafka, - ), .





Kafka . Kafka Go:





package main

import (
    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

    topic := "test_topic"
    p, _ := kafka.NewProducer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092"})
    defer p.Close()

    p.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic,
            Partition: 0},
        Value: []byte("Hello world")}, nil)

}
      
      







Kafka , , , , , ( ).





, Kafka, . pub/sub, , , . Kafka , , , A/B-, , . , , . , RabbitMQ, ActiveMQ, .





package main

import (
    "fmt"

    "gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)

func main() {

    topic := "test_topic"

    cm := kafka.ConfigMap{
        "bootstrap.servers":        "localhost:9092",
        "go.events.channel.enable": true,
        "group.id":                 "rmoff_01"}

    c, _ := kafka.NewConsumer(&cm)
    defer c.Close()
    c.Subscribe(topic, nil)

    for {
        select {
        case ev := <-c.Events():
            switch ev.(type) {

            case *kafka.Message:
                km := ev.(*kafka.Message)
                fmt.Printf("✅ Message '%v' received from topic '%v'\n", string(km.Value), string(*km.TopicPartition.Topic))
            }
        }
    }

}
      
      







Kafka, (Consumer Group). . -, Kafka , , , . -, - , , . Kafka , (, ).





, - . Kafka Connect API, .





Producer Consumer API Java, C/C++, Go, Python, Node.js . , HTTP Kafka? REST Proxy.





REST API Apache Kafka

, . , , , :





{
    "name": "NCP Sheffield",
    "space": "A42",
    "occupied": true
}
      
      







Kafka, . Kafka Confluent REST Proxy - REST-:





curl -X POST \
     -H "Content-Type: application/vnd.kafka.json.v2+json" \
     -H "Accept: application/vnd.kafka.v2+json" \
     --data '{"records":[{"value":{ "name": "NCP Sheffield", "space": "A42", "occupied": true }}]}' \
     "http://localhost:8082/topics/carpark"
      
      







, Consumer API, , REST-. Consumer API, , REST API, Consumer Group, (subscription). , REST API , :





curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
      --data '{"name": "rmoff_consumer", "format": "json", "auto.offset.reset": "earliest"}' \
      http://localhost:8082/consumers/rmoff_consumer

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["carpark"]}' \
 http://localhost:8082/consumers/rmoff_consumer/instances/rmoff_consumer/subscription
      
      







:





curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
      http://localhost:8082/consumers/rmoff_consumer/instances/rmoff_consumer/records
[
    {
        "topic": "carpark",
        "key": null,
        "value": {
            "name": "Sheffield NCP",
            "space": "A42",
            "occupied": true
        },
        "partition": 0,
        "offset": 0
    }
]
      
      







, . , REST-.





, Kafka. , pub/sub. - , , ? , , .





,

Apache Kafka pub/sub - iPhone . , … . Apache Kafka Kafka Streams API. Java Kafka . Kafka Streams, , Walmart, Ticketmaster Bloomberg, ksqlDB.





ksqlDB - , . API SQL Kafka. ksqlDB , , - .





ksqlDB :





CREATE STREAM CARPARK_EVENTS (NAME     VARCHAR,
                              SPACE    VARCHAR,
                              OCCUPIED BOOLEAN)
                        WITH (KAFKA_TOPIC='carpark',
                              VALUE_FORMAT='JSON');
      
      







ksqlDB , , . , . , , , :





SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss') AS EVENT_TS,
       SPACE
  FROM CARPARK_EVENTS
 WHERE NAME='Sheffield NCP'
   AND OCCUPIED=false
  EMIT CHANGES;
      
      



SQL-, , , , ( EMIT CHANGES). , push-, , , . ksqlDB pull- ( ), , , . , ksqlDB , , .





ksqlDB REST API, SQL :





curl --http2 'http://localhost:8088/query-stream' \
     --data-raw '{"sql":"SELECT TIMESTAMPTOSTRING(ROWTIME,'\''yyyy-MM-dd HH:mm:ss'\'') AS EVENT_TS, SPACE FROM CARPARK_EVENTS WHERE NAME='\''Sheffield NCP'\'' and OCCUPIED=false EMIT CHANGES;"}'
      
      







, :





{"queryId":"383894a7-05ee-4ec8-bb3b-c5ad39811539","columnNames":["EVENT_TS","SPACE"],"columnTypes":["STRING","STRING"]}
["2020-08-05 16:02:33","A42"]
["2020-08-05 16:07:31","D72"]
      
      







ksqlDB . SELECT



CREATE STREAM streamname AS



Kafka. , ksqlDB , , .. , Kafka. ksqlDB , , :





CREATE STREAM CARPARKS AS
    SELECT E.NAME AS NAME, E.SPACE,
           R.LOCATION, R.CAPACITY,
           E.OCCUPIED,
           CASE
               WHEN OCCUPIED=TRUE THEN 1
               ELSE -1
           END AS OCCUPIED_IND
    FROM   CARPARK_EVENTS E
           INNER JOIN
           CARPARK_REFERENCE R
           ON E.NAME = R.NAME;
      
      







, CASE , . CREATE STREAM Kafka, :





+----------------+-------+----------+----------------------------+----------+--------------+
|NAME            |SPACE  |OCCUPIED  |LOCATION                    |CAPACITY  |OCCUPIED_IND  |
+----------------+-------+----------+----------------------------+----------+--------------+
|Sheffield NCP   |E48    |true      |{LAT=53.4265964, LON=-1.8426|1000      |1             |
|                |       |          |386}                        |          |              |
      
      







, , ksqlDB . , SQL, :





CREATE TABLE CARPARK_SPACES AS
    SELECT NAME,
           SUM(OCCUPIED_IND) AS OCCUPIED_SPACES
        FROM CARPARKS
        GROUP BY NAME;
      
      



ksqlDB REST API:





curl --http2 'http://localhost:8088/query-stream' \
     --data-raw '{"sql":"SELECT OCCUPIED_SPACES FROM CARPARK_SPACES WHERE NAME='\''Birmingham NCP'\'';"}'
      
      







, , ( "pull- ", " push- ") , :





{"queryId":null,"columnNames":["OCCUPIED_SPACES"],"columnTypes":["INTEGER"]}
[30]
      
      



, -





curl --http2 'http://localhost:8088/query-stream' \
     --data-raw '{"sql":"SELECT OCCUPIED_SPACES FROM CARPARK_SPACES WHERE NAME='\''Birmingham NCP'\'';"}'
{"queryId":null,"columnNames":["OCCUPIED_SPACES"],"columnTypes":["INTEGER"]}
[29]
      
      



Java ksqlDB Python Go.





Apache Kafka , , , ( ), .





, , , , , :

















Apache Kafka Connect API, , Kafka. , Kafka S3 :





curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/sink-s3/config \
    -d ' {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "topics": "carpark",
        "s3.bucket.name": "rmoff-carparks",
        "s3.region": "us-west-2",
        "flush.size": "1024",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "format.class": "io.confluent.connect.s3.format.json.JsonFormat"
        }'
      
      







, , , S3. . , Snowflake, Kafka Connect; . Kafka Connect Kafka. , CARPARK_REFERENCE



, ksqlDB , (CDC - change data capture) , .





Apache Kafka , . , , , , , .





API Kafka , ksqlDB, , / . API , REST.





Apache Kafka, developer.confluent.io. Confluent Platform - Apache Kafka, , . , Confluent CloudGitHub Docker Compose , . - , Kafka, « - ».






- : " API HTTP".













All Articles