From 0de5919a1639ea0e49e0545a33d906de504c1b52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Garc=C3=ADa?= Date: Sat, 21 May 2022 19:12:38 +0200 Subject: [PATCH] Fix incorrect pings sent, and respond to pings from the client --- src/api/notifications.rs | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/src/api/notifications.rs b/src/api/notifications.rs index 7adb0bd9..6b9460e4 100644 --- a/src/api/notifications.rs +++ b/src/api/notifications.rs @@ -293,6 +293,7 @@ pub fn start_notification_server() -> WebSocketUsers { let users2 = users.clone(); tokio::spawn(async move { let addr = (CONFIG.websocket_address(), CONFIG.websocket_port()); + info!("Starting WebSockets server on {}:{}", addr.0, addr.1); let listener = TcpListener::bind(addr).await.expect("Can't listen on websocket port"); let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel::<()>(); @@ -317,9 +318,11 @@ pub fn start_notification_server() -> WebSocketUsers { users } -async fn handle_connection(stream: TcpStream, users: WebSocketUsers, _addr: SocketAddr) -> Result<(), Error> { +async fn handle_connection(stream: TcpStream, users: WebSocketUsers, addr: SocketAddr) -> Result<(), Error> { let mut user_uuid: Option = None; + info!("Accepting WS connection from {addr}"); + // Accept connection, do initial handshake, validate auth token and get the user ID use handshake::server::{Request, Response}; let mut stream = accept_hdr_async(stream, |req: &Request, res: Response| { @@ -346,9 +349,22 @@ async fn handle_connection(stream: TcpStream, users: WebSocketUsers, _addr: Sock res = stream.next() => { match res { Some(Ok(message)) => { + //info!("RECEIVED {message:?}"); + + // Respond to any pings + if let Message::Ping(ping) = message { + if stream.send(Message::Pong(ping)).await.is_err() { + break; + } + continue; + } else if let Message::Pong(_) = message { + /* Ignored */ + continue; + } + // We should receive an initial message with the protocol and version, and we will reply to it if let Message::Text(ref message) = message { - let msg = message.strip_suffix('\u{1e}').unwrap_or(message); + let msg = message.strip_suffix(RECORD_SEPARATOR as char).unwrap_or(message); if serde_json::from_str(msg).ok() == Some(INITIAL_MESSAGE) { stream.send(Message::binary(INITIAL_RESPONSE)).await?; @@ -377,13 +393,15 @@ async fn handle_connection(stream: TcpStream, users: WebSocketUsers, _addr: Sock } _= interval.tick() => { - if stream.send(Message::Binary(create_ping())).await.is_err() { + if stream.send(Message::Ping(create_ping())).await.is_err() { break; } } } } + info!("Closing WS connection from {addr}"); + // Delete from map users.map.entry(user_uuid).or_default().retain(|(uuid, _)| uuid != &entry_uuid); Ok(())