Olá a todos. Já em dezembro, um novo fluxo do curso de Arquiteto de Software começará na OTUS . Antecipando o início do curso, gostaria de compartilhar com vocês a tradução de um artigo interessante. Sugiro também assistir a uma aula de demonstração sobre o tema: "Idempotência e comutabilidade da API em filas e HTTP" .
Uma das questões complicadas que enfrentamos constantemente ao projetar aplicativos e sistemas em geral é como organizar efetivamente a troca de informações entre os componentes, mantendo a flexibilidade para alterar as interfaces sem afetar indevidamente outras partes do sistema. Quanto mais específica e otimizada for uma interface, mais provável será que seja tão circunstancial que exigirá uma reescrita completa para alterá-la. E vice versa; Os genéricos podem ser bastante adaptáveis e amplamente suportados, mas, infelizmente, às custas do desempenho.
(Events) , API (real-time APIs) , , ; , .
. , , - , , . : , , .
(state) - , NoSQL . , , - - . , (consumers) - , . (producers) , , .
, , , , . , . (payload) , , . :
:
userLogin
:
zbeeblebrox
2020-08-17 16:26:39 BST
:
CarParked
:
A42 XYZ
2020-08-17 16:36:27
X42
:
orderPlaced
:
£2.25
2020-08-17 16:35:41 BST
(, , ), (, , , , ).
, , , , - Apache Kafka®. Kafka - , :
Pub/Sub
() () , / .
, .
.
Kafka . , , , , , . , , , , NoSQL .
API-, Apache Kafka, , , .
API
, Kafka, , , , , , ( - ). , Kafka - . , , - Kafka (topic - Kafka, - ), .
Kafka . Kafka Go:
package main
import (
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
topic := "test_topic"
p, _ := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092"})
defer p.Close()
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic,
Partition: 0},
Value: []byte("Hello world")}, nil)
}
Kafka , , , , , ( ).
, Kafka, . pub/sub, , , . Kafka , , , A/B-, , . , , . , RabbitMQ, ActiveMQ, .
package main
import (
"fmt"
"gopkg.in/confluentinc/confluent-kafka-go.v1/kafka"
)
func main() {
topic := "test_topic"
cm := kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"go.events.channel.enable": true,
"group.id": "rmoff_01"}
c, _ := kafka.NewConsumer(&cm)
defer c.Close()
c.Subscribe(topic, nil)
for {
select {
case ev := <-c.Events():
switch ev.(type) {
case *kafka.Message:
km := ev.(*kafka.Message)
fmt.Printf("✅ Message '%v' received from topic '%v'\n", string(km.Value), string(*km.TopicPartition.Topic))
}
}
}
}
Kafka, (Consumer Group). . -, Kafka , , , . -, - , , . Kafka , (, ).
, - . Kafka Connect API, .
Producer Consumer API Java, C/C++, Go, Python, Node.js . , HTTP Kafka? REST Proxy.
REST API Apache Kafka
, . , , , :
{
"name": "NCP Sheffield",
"space": "A42",
"occupied": true
}
Kafka, . Kafka Confluent REST Proxy - REST-:
curl -X POST \
-H "Content-Type: application/vnd.kafka.json.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
--data '{"records":[{"value":{ "name": "NCP Sheffield", "space": "A42", "occupied": true }}]}' \
"http://localhost:8082/topics/carpark"
, Consumer API, , REST-. Consumer API, , REST API, Consumer Group, (subscription). , REST API , :
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data '{"name": "rmoff_consumer", "format": "json", "auto.offset.reset": "earliest"}' \
http://localhost:8082/consumers/rmoff_consumer
curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["carpark"]}' \
http://localhost:8082/consumers/rmoff_consumer/instances/rmoff_consumer/subscription
:
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/rmoff_consumer/instances/rmoff_consumer/records
[
{
"topic": "carpark",
"key": null,
"value": {
"name": "Sheffield NCP",
"space": "A42",
"occupied": true
},
"partition": 0,
"offset": 0
}
]
, . , REST-.
, Kafka. , pub/sub. - , , ? , , .
,
Apache Kafka pub/sub - iPhone . , … . Apache Kafka Kafka Streams API. Java Kafka . Kafka Streams, , Walmart, Ticketmaster Bloomberg, ksqlDB.
ksqlDB - , . API SQL Kafka. ksqlDB , , - .
ksqlDB :
CREATE STREAM CARPARK_EVENTS (NAME VARCHAR,
SPACE VARCHAR,
OCCUPIED BOOLEAN)
WITH (KAFKA_TOPIC='carpark',
VALUE_FORMAT='JSON');
ksqlDB , , . , . , , , :
SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss') AS EVENT_TS,
SPACE
FROM CARPARK_EVENTS
WHERE NAME='Sheffield NCP'
AND OCCUPIED=false
EMIT CHANGES;
SQL-, , , , ( EMIT CHANGES). , push-, , , . ksqlDB pull- ( ), , , . , ksqlDB , , .
ksqlDB REST API, SQL :
curl --http2 'http://localhost:8088/query-stream' \
--data-raw '{"sql":"SELECT TIMESTAMPTOSTRING(ROWTIME,'\''yyyy-MM-dd HH:mm:ss'\'') AS EVENT_TS, SPACE FROM CARPARK_EVENTS WHERE NAME='\''Sheffield NCP'\'' and OCCUPIED=false EMIT CHANGES;"}'
, :
{"queryId":"383894a7-05ee-4ec8-bb3b-c5ad39811539","columnNames":["EVENT_TS","SPACE"],"columnTypes":["STRING","STRING"]}
…
["2020-08-05 16:02:33","A42"]
…
…
…
["2020-08-05 16:07:31","D72"]
…
ksqlDB . SELECT
CREATE STREAM streamname AS
Kafka. , ksqlDB , , .. , Kafka. ksqlDB , , :
CREATE STREAM CARPARKS AS
SELECT E.NAME AS NAME, E.SPACE,
R.LOCATION, R.CAPACITY,
E.OCCUPIED,
CASE
WHEN OCCUPIED=TRUE THEN 1
ELSE -1
END AS OCCUPIED_IND
FROM CARPARK_EVENTS E
INNER JOIN
CARPARK_REFERENCE R
ON E.NAME = R.NAME;
, CASE , . CREATE STREAM Kafka, :
+----------------+-------+----------+----------------------------+----------+--------------+
|NAME |SPACE |OCCUPIED |LOCATION |CAPACITY |OCCUPIED_IND |
+----------------+-------+----------+----------------------------+----------+--------------+
|Sheffield NCP |E48 |true |{LAT=53.4265964, LON=-1.8426|1000 |1 |
| | | |386} | | |
, , ksqlDB . , SQL, :
CREATE TABLE CARPARK_SPACES AS SELECT NAME, SUM(OCCUPIED_IND) AS OCCUPIED_SPACES FROM CARPARKS GROUP BY NAME;
ksqlDB REST API:
curl --http2 'http://localhost:8088/query-stream' \
--data-raw '{"sql":"SELECT OCCUPIED_SPACES FROM CARPARK_SPACES WHERE NAME='\''Birmingham NCP'\'';"}'
, , ( "pull- ", " push- ") , :
{"queryId":null,"columnNames":["OCCUPIED_SPACES"],"columnTypes":["INTEGER"]}
[30]
, -
curl --http2 'http://localhost:8088/query-stream' \
--data-raw '{"sql":"SELECT OCCUPIED_SPACES FROM CARPARK_SPACES WHERE NAME='\''Birmingham NCP'\'';"}'
{"queryId":null,"columnNames":["OCCUPIED_SPACES"],"columnTypes":["INTEGER"]}
[29]
Apache Kafka , , , ( ), .
, , , , , :
Apache Kafka Connect API, , Kafka. , Kafka S3 :
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/sink-s3/config \
-d ' {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "carpark",
"s3.bucket.name": "rmoff-carparks",
"s3.region": "us-west-2",
"flush.size": "1024",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat"
}'
, , , S3. . , Snowflake, Kafka Connect; . Kafka Connect Kafka. , CARPARK_REFERENCE
, ksqlDB , (CDC - change data capture) , .
Apache Kafka , . , , , , , .
API Kafka , ksqlDB, , / . API , REST.
Apache Kafka, developer.confluent.io. Confluent Platform - Apache Kafka, , . , Confluent Cloud. GitHub Docker Compose , . - , Kafka, « - ».
- : " API HTTP".