はじめに
これは ドリコム Advent Calendar 2024 の 21 日目です。自己紹介
こんにちは、SRE ソリューション部の AnD00 です。 最近、とあるプロダクトの開発を Rust で行うことになり、その学習を兼ねてリアルタイムチャットを実装しました。 完全にこすり倒されたネタであることは重々承知していますが、Phoenix のように Topic や Event のような概念を持たせる記事に出会わなかったので、自分なりにまとめています。本記事のゴール
axum
と tokio-tungstenite
を用いて、複数クライアントが同一のルームでメッセージを送り合う、簡易的なリアルタイムチャットサーバーを作成します。
前提知識と準備
- WebSocket の基本概念を理解していること
- Rust と Cargo がインストール済みであること
実装の流れ
1. プロジェクト作成と依存関係設定
まずはプロジェクトを作成。cargo new realtime-chat cd realtime-chat
Cargo.toml
に必要な依存を追加します。
[package] name = "realtime-chat" version = "0.1.0" edition = "2021" [dependencies] axum = { version = "0.7", features = ["ws"] } futures = "0.3" futures-util = { version = "0.3", default-features = false, features = ["sink", "std"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" tokio = { version = "1", features = ["full"] } tokio-tungstenite = "0.24"
2. WebSocket 接続を受け付ける基本構造
最初は、WebSocket 接続を受け付けて、クライアントとやり取りする最小の構造を作ります。 この段階では、まだルームの概念やブロードキャストはありません。ただ接続してメッセージを受け取り、そのまま送り返すだけの機能を実装します。
axum::extract::ConnectInfo
は、接続情報を取得するための Extractor です。使わなくても動作に影響はありませんが、接続元の IP アドレスなどを取得する際に便利です。
// src/main.rs use axum::{ extract::ws::{Message, WebSocket, WebSocketUpgrade}, extract::connect_info::ConnectInfo, response::IntoResponse, routing::get, Router, }; use futures::StreamExt; use std::net::SocketAddr; use tokio::net::TcpListener; #[tokio::main] async fn main() { let app = Router::new().route("/ws", get(ws_handler)); let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); let listener = TcpListener::bind(addr).await.unwrap(); println!("Server running on ws://{}", addr); axum::serve( listener, app.into_make_service_with_connect_info::<SocketAddr>(), ) .await .unwrap(); } async fn ws_handler( ws: WebSocketUpgrade, ConnectInfo(addr): ConnectInfo<SocketAddr>, ) -> impl IntoResponse { ws.on_upgrade(move |socket| handle_socket(socket, addr)) } async fn handle_socket(mut socket: WebSocket, who: SocketAddr) { println!("{who} connected"); while let Some(Ok(msg)) = socket.next().await { if let Message::Text(text) = msg { if socket.send(Message::Text(text)).await.is_err() { break; } } } }
動作確認
1. サーバー起動
cargo run
2. wscat
で接続
wscat -c ws://127.0.0.1:3000/ws
3. テキストメッセージを送信
そのまま返ってくれば OK。3. ルームとクライアント管理の仕組み
特定のルームに参加した複数クライアント間でメッセージを共有したい場合、接続状態をサーバー側で保持する必要があります。 今回は簡略化のため、データベースやキャッシュストア、PubSub システムなどを使わず、サーバーのメモリ上で管理します。 ここでは、以下の構造体を導入して、状態を管理します。- Room
- 文字列で識別されるチャット部屋
- Peer
- クライアントを識別する要素(
SocketAddr
で識別) - ユーザーの認証情報などを使用してクライアントを識別したほうがいいと思うけど、IP アドレスで代用しちゃう
- クライアントを識別する要素(
- PeerKey
(Room, Peer)
のペア- 特定のルームに所属するクライアントを識別するためのキー
- AppState
- サーバーの状態を保持する構造体
Arc<Mutex<HashMap<PeerKey, Peer>>>
で、ルームとクライアントの紐づけを管理DashMap
などの並行処理に強いHashMap
を使ったほうがいいと思うけど、Arc<Mutex<>>
で代用しちゃう
use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc::Sender; use tokio::sync::Mutex; type Room = String; type Peer = Sender<Message>; type PeerKey = (Room, SocketAddr); #[derive(Clone)] struct AppState { rooms: Arc<Mutex<HashMap<PeerKey, Peer>>>, }
AppState
を main()
で初期化し、Router
に組み込みます。
use axum::extract::State; #[tokio::main] async fn main() { let state = AppState { rooms: Arc::new(Mutex::new(HashMap::new())), }; let app = Router::new() .route("/ws", get(ws_handler)) .with_state(state); // ... 省略(前と同様) } async fn ws_handler( ws: WebSocketUpgrade, ConnectInfo(addr): ConnectInfo<SocketAddr>, State(state): State<AppState>, // AppState を受け取る ) -> impl IntoResponse { ws.on_upgrade(move |socket| handle_socket(socket, addr, state)) } async fn handle_socket(mut socket: WebSocket, who: SocketAddr, state: AppState) { // ... 省略(前と同様) }
4. メッセージの種類とルーム参加・退出
クライアントから- ルームに参加
- ルームから退出
- メッセージ配信
use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] #[serde(tag = "event", rename_all = "snake_case")] enum MessagePayload { Join { room: String }, Leave { room: String }, Broadcast { room: String, message: String }, }この列挙型を使って、
{"event":"join","room":"general"}
のような JSON をパースして、ユーザーのリクエストを判別します。
5. WebSocket ハンドラでの参加・退出・ブロードキャスト処理
ここまでで、- WebSocket 接続を受け付ける基本構造
- ルームとクライアント管理の仕組み
- メッセージの種類とルーム参加・退出
handle_socket
関数内で、参加・退出・メッセージ配信の機能を実装します。ポイントは以下の通りです。
1. 受信タスク (recv_task
) と送信タスク (send_task
) の分離
- 受信処理と送信処理を独立した並行タスクとして管理し、コードの見通しを良くしておく
recv_task
- クライアントからのメッセージを受け取り、イベント内容に応じてルームへの追加・削除・メッセージ配信を行うタスク
send_task
- サーバーからクライアントへのメッセージ送信を行うタスク
全体像
use futures::SinkExt; use std::collections::HashSet; use tokio::sync::mpsc::channel; use tokio::sync::oneshot; async fn handle_socket(socket: WebSocket, who: SocketAddr, state: AppState) { println!("{who} connected"); // sender(サーバー => クライアントへの送信用)と receiver(クライアント => サーバーへの受信用)に分離する let (mut sender, mut receiver) = socket.split(); // tx は、サーバーからこの特定のクライアントへメッセージを送るためのチャンネル // rx は、上記 tx と対応するチャンネルで送信タスクで使用 // 100 は、チャンネルバッファサイズで必要に応じて調整可能 let (tx, mut rx) = channel::<Message>(100); // cancel_tx と cancel_rx は、受信タスクが終了した際に送信タスクへ「終了して良いよ」と通知するためのチャンネル // クライアント切断時、全ルームからの退出処理が完了したら cancel_tx で通知し、送信タスクを終了させる let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>(); // joined_rooms は、このクライアントが現在参加しているルームを格納するセット // join イベントで追加、leave イベントや切断時に削除する let mut joined_rooms: HashSet<Room> = HashSet::new(); let recv_task = tokio::spawn(async move { // 受信処理 }); let send_task = tokio::spawn(async move { // 送信処理 }); tokio::select! { res = send_task => { if let Err(e) = res { eprintln!("Send task panicked: {:?}", e); } } res = recv_task => { if let Err(e) = res { eprintln!("Receive task panicked: {:?}", e); } } } }
recv_task
(受信タスク)
// recv_task は、receiver.next() でクライアントからのメッセージを受け取り、イベントに応じて処理を行う let recv_task = tokio::spawn(async move { while let Some(msg) = receiver.next().await { let msg = match msg { Ok(msg) => msg, Err(e) => { // 何らかのエラー(ソケット切断など)が起こった場合はループを抜け、後片付け処理へ進む eprintln!("Error receiving message: {:?}", e); break; } }; // 受信メッセージは Text や Binary などの種類があるが、今回はテキストだけ対応 let msg = match msg { Message::Text(text) => text, _ => continue, }; // 受け取った文字列を MessagePayload 型にデシリアライズ // JSON が不正な場合はログにエラーを出力し、次のメッセージ待ちへ let payload: MessagePayload = match serde_json::from_str(&msg) { Ok(payload) => payload, Err(e) => { eprintln!("Error parsing message: {:?}", e); continue; } }; // ペイロードごとに処理を分岐 } // ソケットが閉じたらここに到達する // 参加していたルームからクライアントを削除してクリーンアップ for room in joined_rooms { remove_peer(&state, (room, who)).await; } // 送信タスクに「終了して良いよ」と通知 let _ = cancel_tx.send(()); });
send_task
(送信タスク)
// send_task は rx.recv() でサーバーからのメッセージを待ち、受け取ったメッセージをクライアントへ送信する let send_task = tokio::spawn(async move { loop { tokio::select! { Some(msg) = rx.recv() => { if let Err(e) = sender.send(msg).await { eprintln!("Error sending message: {:?}", e); break; } } _ = &mut cancel_rx => { // cancel_rx が起動したら、もう新たなメッセージ送信は不要なのでループを終了 break; } } } });
タスクの終了待ち合わせ
// 送信タスクか受信タスクのどちらかが終了した時点で handle_socket も終了 // クライアント切断時は受信タスク側で cancel_tx を送るため、送信タスクも停止し、tokio::select! ブロック全体が終了する tokio::select! { res = send_task => { if let Err(e) = res { eprintln!("Send task panicked: {:?}", e); } } res = recv_task => { if let Err(e) = res { eprintln!("Receive task panicked: {:?}", e); } } }
2. ルーム参加 (join
) 処理
- クライアントが
{"event":"join","room":"general"}
のようなイベントを送信した場合、そのクライアントをAppState
のrooms
に登録する - このクライアントにメッセージをブロードキャストできるよう、
Room
とSocketAddr
の複合キーに紐づけておく
async fn handle_socket(socket: WebSocket, who: SocketAddr, state: AppState) { // ... 省略(前と同様) let recv_task = tokio::spawn(async move { while let Some(msg) = receiver.next().await { // ... 省略(前と同様) match payload { MessagePayload::Join { room } => { add_peer(&state, (room.clone(), who), tx.clone()).await; joined_rooms.insert(room.clone()); println!("{who} joined room {room}"); } _ => (), } } // ... 省略(前と同様) }); } // 指定のキーでクライアントをルームに追加 async fn add_peer(state: &AppState, peer_key: PeerKey, peer: Peer) { let mut rooms = state.rooms.lock().await; rooms.insert(peer_key, peer); }
3. ルーム退出 (leave
) 処理
- クライアントが
{"event":"leave","room":"general"}
のようなイベントを送信した場合、そのクライアントを該当ルームから削除する - クライアントが明示的に
leave
を送らず、ソケットが切断された場合でも、後片付けとして参加中の全ルームからクライアントを削除しておく
async fn handle_socket(socket: WebSocket, who: SocketAddr, state: AppState) { // ... 省略(前と同様) let recv_task = tokio::spawn(async move { while let Some(msg) = receiver.next().await { // ... 省略(前と同様) match payload { MessagePayload::Join { room } => { add_peer(&state, (room.clone(), who), tx.clone()).await; joined_rooms.insert(room.clone()); println!("{who} joined room {room}"); } MessagePayload::Leave { room } => { remove_peer(&state, (room.clone(), who)).await; joined_rooms.remove(&room); println!("{who} left room {room}"); } _ => (), } } // ... 省略(前と同様) }); } // ステートから該当クライアントを削除 async fn remove_peer(state: &AppState, peer_key: PeerKey) { let mut rooms = state.rooms.lock().await; rooms.remove(&peer_key); }
4. メッセージ配信 (broadcast
) 処理
- クライアントが
{"event":"broadcast","room":"general","message":"Hello"}
のようなイベントを送信した場合、該当ルームに参加している全てのクライアントへ"Hello"
のメッセージを送信する
async fn handle_socket(socket: WebSocket, who: SocketAddr, state: AppState) { // ... 省略(前と同様) let recv_task = tokio::spawn(async move { while let Some(msg) = receiver.next().await { // ... 省略(前と同様) match payload { MessagePayload::Join { room } => { add_peer(&state, (room.clone(), who), tx.clone()).await; joined_rooms.insert(room.clone()); println!("{who} joined room {room}"); } MessagePayload::Leave { room } => { remove_peer(&state, (room.clone(), who)).await; joined_rooms.remove(&room); println!("{who} left room {room}"); } MessagePayload::Broadcast { room, message } => { if !joined_rooms.contains(&room) { eprintln!("{who} is not in room {room}"); continue; } broadcast_message(&state, &room, &message).await; println!("{who} broadcasted: {message} in {room}"); } } } // ... 省略(前と同様) }); } // 指定のルームに参加している全クライアントのリストを取得 async fn get_peers(state: &AppState, room: &str) -> Vec<Peer> { let rooms = state.rooms.lock().await; rooms .iter() .filter_map(|(key, peer)| { if key.0 == room { Some(peer.clone()) } else { None } }) .collect() } // 指定のルームに参加している全クライアントにテキストメッセージを配信 async fn broadcast_message(state: &AppState, room: &str, msg: &str) { let peers = get_peers(state, room).await; for peer in peers { if let Err(e) = peer.send(Message::Text(msg.to_string())).await { eprintln!("Error sending message to peer: {:?}", e); } } }これで、基本的な「参加・退出・メッセージ配信」のサイクルが完成しました。
ちゃんと動くか確認してみましょう。
動作確認
1. サーバー起動
cargo run
2. wscat
で接続
wscat -c ws://127.0.0.1:3000/ws
3. ルームに参加
{"event": "join", "room": "general"}コンソールに
127.0.0.1:XXXXX joined room general
と表示されれば OK。
4. もう 1 つのコンソールで同じ手順を繰り返し、同じルームに参加
{"event": "join", "room": "general"}
5. ルームにメッセージを配信
{"event": "broadcast", "room": "general", "message": "Hello!"}2 つのコンソールに
Hello!
と表示されれば OK。
6. ルームから退出
{"event": "leave", "room": "general"}再度メッセージを配信しても、退出したクライアントのコンソールに届かなければ OK。
GOOD 👍️
まとめ
本記事では、Rust でリアルタイムチャットサーバーを実装する手順を紹介しました。今回紹介したのは、あくまでも「最低限の仕組み」ですが、Rust を用いたマルチスレッドや非同期処理の概要を掴むには役立ったかなと感じています。 実運用を考えると、モニタリングやスケーリング、セキュリティなど、さまざまな課題が待ち受けています。
でもまぁ、Rust は書いてて楽しいので頑張れそうですw もし、この記事が Rust でのリアルタイム通信の基本構造を理解する一助になれば幸いです。
ありがとうございました!
おわりに
ドリコムでは一緒に働くメンバーを募集しています!募集一覧はコチラを御覧ください!