Olá, Habr! Desta vez tentei fazer um chat simples via Websockets. Para obter detalhes, bem-vindo em cat.
Conteúdo
- Learning Scala: Parte 1 - Jogo da Cobra
- Learning Scala: Parte 2 - Folha de tarefas com uploads de imagens
- Scala de Aprendizagem: Parte 3 - Testes de Unidade
- Learning Scala: Parte 4 - WebSocket
Links
Na verdade, todo o código está em um objeto ChatHub
class ChatHub[F[_]] private(
val topic: Topic[F, WebSocketFrame],
private val ref: Ref[F, Int]
)
(
implicit concurrent: Concurrent[F],
timer: Timer[F]
) extends Http4sDsl[F] {
val endpointWs: ServerEndpoint[String, Unit, String, Stream[IO, WebSocketFrame], IO] = endpoint
.get
.in("chat")
.tag("WebSockets")
.summary(" . : ws://localhost:8080/chat")
.description(" ")
.in(
stringBody
.description(" ")
.example("!")
)
.out(
stringBody
.description(" - ")
.example("6 : Id f518a53d: !")
)
// .
.serverLogic(_ => IO(Left(()): Either[Unit, String]))
def routeWs: HttpRoutes[F] = {
HttpRoutes.of[F] {
case GET -> Root / "chat" => logic()
}
}
private def logic(): F[Response[F]] = {
val toClient: Stream[F, WebSocketFrame] =
topic.subscribe(1000)
val fromClient: Pipe[F, WebSocketFrame, Unit] =
handle
WebSocketBuilder[F].build(toClient, fromClient)
}
private def handle(s: Stream[F, WebSocketFrame]): Stream[F, Unit] = s
.collect({
case WebSocketFrame.Text(text, _) => text
})
.evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
.through(topic.publish)
}
object ChatHub {
def apply[F[_]]()(implicit concurrent: Concurrent[F], timer: Timer[F]): F[ChatHub[F]] = for {
ref <- Ref.of[F, Int](0)
topic <- Topic[F, WebSocketFrame](WebSocketFrame.Text("==="))
} yield new ChatHub(topic, ref)
}
Aqui devemos falar imediatamente sobre o Tópico - um primitivo de sincronização do Fs2 que permite que você faça um modelo Editor-Assinante e você pode ter muitos Editores e muitos Assinantes ao mesmo tempo. Em geral, é melhor enviar mensagens a ele por meio de algum tipo de buffer como Fila, porque há um limite no número de mensagens na fila e o Publicador espera até que todos os Assinantes recebam mensagens em sua fila de mensagens e, se ela transbordar, pode travar.
val topic: Topic[F, WebSocketFrame],
Aqui também conto o número de mensagens que foram enviadas para o chat como o número de cada mensagem. Como preciso fazer isso com threads diferentes, usei um análogo do Atomic, que se chama Ref aqui e garante a atomicidade da operação.
private val ref: Ref[F, Int]
Processando um fluxo de mensagens de usuários.
private def handle(stream: Stream[F, WebSocketFrame]): Stream[F, Unit] =
stream
// .
.collect({
case WebSocketFrame.Text(text, _) => text
})
// .
.evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
//
.through(topic.publish)
Na verdade, a própria lógica de criar um soquete.
private def logic(): F[Response[F]] = {
// .
val toClient: Stream[F, WebSocketFrame] =
//
topic.subscribe(1000)
//
val fromClient: Pipe[F, WebSocketFrame, Unit] =
//
handle
// .
WebSocketBuilder[F].build(toClient, fromClient)
}
Ligamos nosso soquete à rota no servidor (ws: // localhost: 8080 / chat)
def routeWs: HttpRoutes[F] = {
HttpRoutes.of[F] {
case GET -> Root / "chat" => logic()
}
}
Na verdade, isso é tudo. Então você pode iniciar o servidor com esta rota. Ainda queria fazer qualquer tipo de documentação. Em geral, para documentar WebSocket e outras interações baseadas em eventos como RabbitMQ AMPQ, existe AsynAPI, mas não há nada em Tapir, então eu apenas fiz uma descrição do endpoint para Swagger como uma solicitação GET. Claro, ele não vai funcionar. Mais precisamente, um erro 501 será retornado, mas será exibido no Swagger
val endpointWs: Endpoint[String, Unit, String, fs2.Stream[F, Byte]] = endpoint
.get
.in("chat")
.tag("WebSockets")
.summary(" . : ws://localhost:8080/chat")
.description(" ")
.in(
stringBody
.description(" ")
.example("!")
)
.out(
stringBody
.description(" - ")
.example("6 : Id f518a53d: !")
)
No próprio estilo, tem a seguinte aparência. Conecte
nosso bate-papo ao nosso servidor de API
todosController = new TodosController()
imagesController = new ImagesController()
//
chatHub <- Resource.liftF(ChatHub[IO]())
endpoints = todosController.endpoints ::: imagesController.endpoints
// Swagger
docs = (chatHub.endpointWs :: endpoints).toOpenAPI("The Scala Todo List", "0.0.1")
yml: String = docs.toYaml
//
routes = chatHub.routeWs <+>
endpoints.toRoutes <+>
new SwaggerHttp4s(yml, "swagger").routes[IO]
httpApp = Router(
"/" -> routes
).orNotFound
blazeServer <- BlazeServerBuilder[IO](serverEc)
.bindHttp(settings.host.port, settings.host.host)
.withHttpApp(httpApp)
.resource
Conectamos ao chat com um script extremamente simples.
<script>
const id = `f${(~~(Math.random() * 1e8)).toString(16)}`;
const webSocket = new WebSocket('ws://localhost:8080/chat');
webSocket.onopen = event => {
alert('onopen ');
};
webSocket.onmessage = event => {
console.log(event);
receive(event.data);
};
webSocket.onclose = event => {
alert('onclose ');
};
function send() {
let text = document.getElementById("message");
webSocket.send(` Id ${id}: ${text.value}`);
text.value = '';
}
function receive(m) {
let text = document.getElementById("chat");
text.value = text.value + '\n\r' + m;
}
</script>
Na verdade, isso é tudo. Espero que alguém que também estuda a rocha ache este artigo interessante e talvez até útil.