use anyhow::Result; use futures_util::{SinkExt, StreamExt}; use tokio::net::TcpStream; use tokio_tungstenite::{ connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream, }; use tracing::info; /// A WebSocket connection (either client or server side). pub type WsStream = WebSocketStream>; /// Connect to a WebSocket server as a client. pub async fn connect_to_server(url: &str) -> Result { info!("Connecting to WebSocket server: {}", url); let (ws_stream, response) = connect_async(url).await?; info!("WebSocket connected, status: {}", response.status()); Ok(ws_stream) } /// Send a binary message over the WebSocket. pub async fn send_binary(ws: &mut WsStream, data: Vec) -> Result<()> { ws.send(Message::Binary(data.into())).await?; Ok(()) } /// Receive the next binary message from the WebSocket. /// Returns None if the connection is closed. pub async fn recv_binary(ws: &mut WsStream) -> Result>> { loop { match ws.next().await { Some(Ok(Message::Binary(data))) => return Ok(Some(data.to_vec())), Some(Ok(Message::Close(_))) => return Ok(None), Some(Ok(Message::Ping(data))) => { ws.send(Message::Pong(data)).await?; } Some(Ok(_)) => continue, Some(Err(e)) => return Err(anyhow::anyhow!("WebSocket error: {}", e)), None => return Ok(None), } } } /// Send a close frame. pub async fn close(ws: &mut WsStream) -> Result<()> { ws.close(None).await?; Ok(()) } /// WebSocket server acceptor — accepts a TcpStream and performs the WebSocket upgrade. pub async fn accept_connection( stream: TcpStream, ) -> Result>> { let ws = tokio_tungstenite::accept_async(MaybeTlsStream::Plain(stream)).await?; Ok(ws) }