Scala Belajar: Bagian 4 - WebSocket



Halo, Habr! Kali ini saya coba bikin chat sederhana lewat Websockets. Untuk detailnya, selamat datang di bawah cat.



Kandungan





Tautan



  1. Kode sumber
  2. Gambar Docker
  3. Tenuk
  4. Http4s
  5. Fs2
  6. Doobie
  7. ScalaTest
  8. ScalaCheck
  9. ScalaTestPlusScalaCheck


Sebenarnya semua kode ada dalam satu objek ChatHub



class ChatHub[F[_]] private(
                             val topic: Topic[F, WebSocketFrame],
                             private val ref: Ref[F, Int]
                           )
                           (
                             implicit concurrent: Concurrent[F],
                             timer: Timer[F]
                           ) extends Http4sDsl[F] {

  val endpointWs: ServerEndpoint[String, Unit, String, Stream[IO, WebSocketFrame], IO] = endpoint
    .get
    .in("chat")
    .tag("WebSockets")
    .summary("   .    : ws://localhost:8080/chat")
    .description("   ")
    .in(
      stringBody
        .description("      ")
        .example("!")
    )
    .out(
      stringBody
        .description("  -   ")
        .example("6 :     Id  f518a53d: !")
    )
    //    . 
    .serverLogic(_ => IO(Left(()): Either[Unit, String]))

  def routeWs: HttpRoutes[F] = {
    HttpRoutes.of[F] {
      case GET -> Root / "chat" => logic()
    }
  }

  private def logic(): F[Response[F]] = {
    val toClient: Stream[F, WebSocketFrame] =
      topic.subscribe(1000)
    val fromClient: Pipe[F, WebSocketFrame, Unit] =
      handle
    WebSocketBuilder[F].build(toClient, fromClient)
  }

  private def handle(s: Stream[F, WebSocketFrame]): Stream[F, Unit] = s
    .collect({
      case WebSocketFrame.Text(text, _) => text
    })
    .evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
    .through(topic.publish)
}

object ChatHub {

  def apply[F[_]]()(implicit concurrent: Concurrent[F], timer: Timer[F]): F[ChatHub[F]] = for {
    ref <- Ref.of[F, Int](0)
    topic <- Topic[F, WebSocketFrame](WebSocketFrame.Text("==="))
  } yield new ChatHub(topic, ref)
}


Di sini Anda harus segera mengatakan tentang Topik - primitif sinkronisasi dari Fs2 yang memungkinkan Anda membuat model Penerbit - Pelanggan, dan Anda dapat memiliki banyak Penerbit dan banyak Pelanggan pada saat yang bersamaan. Secara umum, lebih baik mengirim pesan ke sana melalui beberapa jenis buffer seperti Queue karena memiliki batasan jumlah pesan dalam antrian dan Penerbit menunggu sampai semua Pelanggan menerima pesan dalam antrian pesan mereka, dan jika penuh, mungkin hang.



val topic: Topic[F, WebSocketFrame],


Di sini saya juga menghitung jumlah pesan yang dikirim ke obrolan sebagai jumlah setiap pesan. Karena saya perlu melakukan ini dari utas yang berbeda, saya menggunakan analog dari Atomic, yang disebut Ref di sini dan menjamin atomicity operasi.



  private val ref: Ref[F, Int]


Memproses aliran pesan dari pengguna.



  private def handle(stream: Stream[F, WebSocketFrame]): Stream[F, Unit] = 
    stream
//       . 
    .collect({
      case WebSocketFrame.Text(text, _) => text
    })
//               .
    .evalMap(text => ref.modify(count => (count + 1, WebSocketFrame.Text(s"${count + 1} : $text"))))
//     
    .through(topic.publish)


Sebenarnya logika membuat soket.



private def logic(): F[Response[F]] = {
//    .
    val toClient: Stream[F, WebSocketFrame] =
//        
      topic.subscribe(1000)
//        
    val fromClient: Pipe[F, WebSocketFrame, Unit] =
//      
      handle
//         .
    WebSocketBuilder[F].build(toClient, fromClient)
  }


Kami mengikat soket kami ke rute di server (ws: // localhost: 8080 / chat)



def routeWs: HttpRoutes[F] = {
    HttpRoutes.of[F] {
      case GET -> Root / "chat" => logic()
    }
  }


Sebenarnya itu saja. Kemudian Anda dapat memulai server dengan rute ini. Saya masih ingin membuat dokumentasi apa saja. Secara umum, untuk mendokumentasikan WebSocket dan interaksi berbasis event lainnya seperti RabbitMQ AMPQ, ada AsynAPI, tetapi tidak ada di bawah Tapir, jadi saya hanya membuat deskripsi titik akhir untuk Swagger sebagai permintaan GET. Tentu saja, dia tidak akan bekerja. Lebih tepatnya, kesalahan 501 akan dikembalikan, tetapi akan ditampilkan di Swagger



  val endpointWs: Endpoint[String, Unit, String, fs2.Stream[F, Byte]] = endpoint
    .get
    .in("chat")
    .tag("WebSockets")
    .summary("   .    : ws://localhost:8080/chat")
    .description("   ")
    .in(
      stringBody
        .description("      ")
        .example("!")
    )
    .out(
      stringBody
        .description("  -   ")
        .example("6 :     Id  f518a53d: !")
    )


Dalam kesombongan itu sendiri, terlihat seperti ini. Hubungkan







obrolan kita ke server API kita



    todosController = new TodosController()
    imagesController = new ImagesController()
//   
    chatHub <- Resource.liftF(ChatHub[IO]())
    endpoints = todosController.endpoints ::: imagesController.endpoints
//     Swagger
    docs = (chatHub.endpointWs :: endpoints).toOpenAPI("The Scala Todo List", "0.0.1")
    yml: String = docs.toYaml
//      
    routes = chatHub.routeWs <+>
      endpoints.toRoutes <+>
      new SwaggerHttp4s(yml, "swagger").routes[IO]
    httpApp = Router(
      "/" -> routes
    ).orNotFound
    blazeServer <- BlazeServerBuilder[IO](serverEc)
      .bindHttp(settings.host.port, settings.host.host)
      .withHttpApp(httpApp)
      .resource


Kami terhubung ke obrolan dengan skrip yang sangat sederhana.



    <script>
        const id = `f${(~~(Math.random() * 1e8)).toString(16)}`;
        const webSocket = new WebSocket('ws://localhost:8080/chat');

        webSocket.onopen = event => {
            alert('onopen ');
        };

        webSocket.onmessage = event => {
            console.log(event);
            receive(event.data);
        };

        webSocket.onclose = event => {
            alert('onclose ');
        };

        function send() {
            let text = document.getElementById("message");
            webSocket.send(`    Id  ${id}: ${text.value}`);
            text.value = '';
        }

        function receive(m) {
            let text = document.getElementById("chat");
            text.value = text.value + '\n\r' + m;
        }
    </script>


Ini sebenarnya semuanya. Saya harap seseorang yang juga mempelajari batuan akan menganggap artikel ini menarik dan bahkan mungkin berguna.



All Articles