diff --git a/Cargo.lock b/Cargo.lock index dda5b5f..f948c57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -124,7 +124,7 @@ checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "messages" version = "0.1.0" -source = "git+https://github.com/Rustastic/Messages.git#1a8c9feb6133fc3434e73c2fbfdb09e1732ae6c0" +source = "git+https://github.com/Rustastic/Messages.git#08bd4d7d9c72a6696ad3d9ba1a77b3d38307bc85" dependencies = [ "bincode", "crossbeam-channel", @@ -220,9 +220,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.96" +version = "2.0.98" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5d0adab1ae378d7f53bdebc67a39f1f151407ef230f0ce2883572f5d8985c80" +checksum = "36147f1a48ae0ec2b5b3bc5b537d267457555a10dc06f3dbc8cb11ba3006d3b1" dependencies = [ "proc-macro2", "quote", diff --git a/src/chat_client.rs b/src/chat_client.rs index 8388221..1b52e9c 100644 --- a/src/chat_client.rs +++ b/src/chat_client.rs @@ -4,18 +4,14 @@ use colored::Colorize; use crossbeam_channel::{select_biased, Receiver, Sender}; use log::{error, info, warn}; -use messages::{ - client_commands::*, - high_level_messages::{Message, MessageContent}, -}; +use messages::{client_commands::*, high_level_messages::*}; use rand::Rng; use std::collections::{HashMap, HashSet}; use wg_2024::{ network::{NodeId, SourceRoutingHeader}, packet::{ - self, Ack, FloodRequest, FloodResponse, Fragment, Nack, NackType, NodeType, Packet, - PacketType, + Ack, FloodRequest, FloodResponse, Fragment, Nack, NackType, NodeType, Packet, PacketType, }, }; @@ -145,16 +141,114 @@ impl ChatClient { self.query_communication_servers(); } ChatClientCommand::SendMessageTo(destination_id, text) => { - let session_id = self.session_id_counter; - self.session_id_counter += 1; - - let message = - Message::new_client_message(session_id, self.id, destination_id, text); - - self.send_message_to(message); + if self.running { + if let Some(server_id) = self.registered { + let session_id = self.session_id_counter; + self.session_id_counter += 1; + let message = Message::new_client_message( + session_id, + self.id, + server_id, + ClientMessage::SendMessage { + recipient_id: destination_id, + content: text, + }, + ); + self.send_message_to(message, destination_id); + } else { + error!( + "{} [ ChatClient {} ]: Cannot send message, not registered to any server", + "✗".red(), + self.id + ); + self.controller_send + .send(ChatClientEvent::ErrorNotRegistered) + .unwrap(); + } + } else { + error!( + "{} [ ChatClient {} ]: Cannot send message, ChatClient is not running", + "✗".red(), + self.id + ); + self.controller_send + .send(ChatClientEvent::ErrorNotRunning) + .unwrap(); + } + } + ChatClientCommand::RegisterTo(server_id) => { + if self.running { + if self.communication_server_list.contains(&server_id) { + self.register_to(server_id) + } else { + error!( + "{} [ ChatClient {} ]: Cannot register to server {}, it is not a communication server", + "✗".red(), + self.id, + server_id + ); + } + } else { + error!( + "{} [ ChatClient {} ]: Cannot send message, ChatClient is not running", + "✗".red(), + self.id + ); + self.controller_send + .send(ChatClientEvent::ErrorNotRunning) + .unwrap(); + } + } + ChatClientCommand::GetClientList => { + if self.running { + if let Some(_) = self.registered { + self.get_client_list(); + } else { + error!( + "{} [ ChatClient {} ]: Cannot send message, not registered to any server", + "✗".red(), + self.id + ); + self.controller_send + .send(ChatClientEvent::ErrorNotRegistered) + .unwrap(); + } + } else { + error!( + "{} [ ChatClient {} ]: Cannot send message, ChatClient is not running", + "✗".red(), + self.id + ); + self.controller_send + .send(ChatClientEvent::ErrorNotRunning) + .unwrap(); + } + } + ChatClientCommand::LogOut => { + if self.running { + if let Some(_) = self.registered { + self.logout(); + } else { + error!( + "{} [ ChatClient {} ]: Cannot send message, not registered to any server", + "✗".red(), + self.id + ); + self.controller_send + .send(ChatClientEvent::ErrorNotRegistered) + .unwrap(); + } + } else { + error!( + "{} [ ChatClient {} ]: Cannot send message, ChatClient is not running", + "✗".red(), + self.id + ); + self.controller_send + .send(ChatClientEvent::ErrorNotRunning) + .unwrap(); + } } - ChatClientCommand::RegisterTo(server_id) => self.register_to(server_id), - ChatClientCommand::GetClientListFrom(_) => self.get_client_list(), } } @@ -235,7 +329,7 @@ impl ChatClient { self.packet_send.contains_key(&destination) } - fn handle_fragment(&mut self, mut packet: Packet, fragment: Fragment) { + fn handle_fragment(&mut self, packet: Packet, fragment: Fragment) { // Add the fragment to the buffer info!( "{} [ ChatClient {} ]: forwarded the the fragment [ fragment_index: {} ] of the Packet [ session_id: {} ]", @@ -280,7 +374,7 @@ impl ChatClient { warn!("├─>{} Sending to Simulation Controller...", "!!!".yellow()); self.controller_send - .send(DroneEvent::PacketDropped(packet)) + .send(ChatClientEvent::ControllerShortcut(packet)) .unwrap(); warn!( @@ -313,7 +407,7 @@ impl ChatClient { warn!("├─>{} Sending to Simulation Controller...", "!!!".yellow()); self.controller_send - .send(DroneEvent::PacketDropped(packet)) + .send(ChatClientEvent::ControllerShortcut(packet)) .unwrap(); warn!( @@ -328,25 +422,14 @@ impl ChatClient { } } - fn handle_ack_nack(&mut self, mut packet: Packet) { - if let PacketType::Nack(nack) = packet.clone().pack_type { - warn!( - "{} [ ChatClient {} ]: received a {}", - "!!!".yellow(), - self.id, - packet.pack_type, - ); - // Send a nack to the previous node - self.send_nack(packet, None, NackType::Dropped); - } else { - warn!( - "{} [ ChatClient {} ]: received a {}", - "!!!".yellow(), - self.id, - packet.pack_type, - ); - self.forward_packet(packet); - } + fn handle_ack_nack(&mut self, packet: Packet) { + warn!( + "{} [ ChatClient {} ]: received a {}", + "!!!".yellow(), + self.id, + packet.pack_type, + ); + self.forward_packet(packet); } fn send_nack(&self, mut packet: Packet, fragment: Option, nack_type: NackType) { @@ -396,7 +479,7 @@ impl ChatClient { //there is an error in sending the packet, the drone should send the packet to the simulation controller self.controller_send - .send(DroneEvent::PacketDropped(packet)) + .send(ChatClientEvent::ControllerShortcut(packet)) .unwrap(); warn!( "└─>{} [ ChatClient {} ]: sent A Nack to the Simulation Controller", @@ -432,7 +515,7 @@ impl ChatClient { // Send to the simulation controller self.controller_send - .send(DroneEvent::PacketDropped(packet)) + .send(ChatClientEvent::ControllerShortcut(packet)) .unwrap(); warn!( "└─>{} [ ChatClient {} ]: sent A Nack to the Simulation Controller", @@ -619,7 +702,7 @@ impl ChatClient { warn!("├─>{} Sending to Simulation Controller...", "!!!".yellow()); self.controller_send - .send(DroneEvent::PacketDropped(new_packet)) + .send(ChatClientEvent::ControllerShortcut(new_packet)) .unwrap(); warn!( @@ -642,7 +725,7 @@ impl ChatClient { // Send the packet to the simulation controller self.controller_send - .send(DroneEvent::PacketDropped(new_packet)) + .send(ChatClientEvent::ControllerShortcut(new_packet)) .unwrap(); warn!( @@ -693,7 +776,7 @@ impl ChatClient { warn!("├─>{} Sending to Simulation Controller...", "!!!".yellow()); self.controller_send - .send(DroneEvent::PacketDropped(new_packet)) + .send(ChatClientEvent::ControllerShortcut(new_packet)) .unwrap(); warn!( @@ -715,7 +798,7 @@ impl ChatClient { warn!("├─>{} Sending to Simulation Controller...", "!!!".yellow()); self.controller_send - .send(DroneEvent::PacketDropped(new_packet)) + .send(ChatClientEvent::ControllerShortcut(new_packet)) .unwrap(); warn!( @@ -727,7 +810,6 @@ impl ChatClient { } // the sim controller could send a command to the client that makes it start the flooding - fn start_flooding(&mut self) { let flood_request = FloodRequest { @@ -1060,7 +1142,6 @@ impl ChatClient { } } - todo!("MODIFICARE QUESTO METODO E FARE IN MODO CHE COMUNICHI CON IL SIM CONTROLLER"); fn read_message(&mut self) { if let Some(message) = self.message_buffer.pop() { if message.destination_id != self.id { @@ -1075,8 +1156,19 @@ impl ChatClient { } if let MessageContent::FromServer(server_message) = message.content { match server_message { - messages::ServerMessage::ClientList(items) => { - self.client_list = items; + ServerMessage::ServerType(server_type) => { + if let ServerType::Chat = server_type { + self.communication_server_list.push(message.source_id); + info!( + "{} [ ChatClient {} ]: Discovered communication server [ CommunicationServer {} ]", + "✓".green(), + self.id, + message.source_id + ); + } + } + ServerMessage::ClientList(client_list) => { + self.client_list = client_list; info!( "{} [ ChatClient {} ]: Updated client list: {:?}", @@ -1084,8 +1176,12 @@ impl ChatClient { self.id, self.client_list ); + + self.controller_send + .send(ChatClientEvent::ClientList(self.client_list.clone())) + .unwrap(); } - messages::ServerMessage::MessageReceived { sender_id, content } => { + ServerMessage::MessageReceived { sender_id, content } => { info!( "{} [ ChatClient {} ]: Message received from [ Client {} ]: {}", "✓".green(), @@ -1093,13 +1189,48 @@ impl ChatClient { sender_id, content ); + + self.controller_send + .send(ChatClientEvent::MessageReceived(sender_id, content)) + .unwrap(); } - messages::ServerMessage::ErrorWrongClientId => { - error!( - "{} [ ChatClient {} ]: Received an error indicating wrong client ID", - "✗".red(), - self.id + ServerMessage::UnreachableClient(client_id) => { + info!( + "{} [ ChatClient {} ]: Client {} is unreachable", + "!!!".yellow(), + self.id, + client_id ); + + self.client_list.retain(|&id| id != client_id); + + self.controller_send + .send(ChatClientEvent::UnreachableClient(client_id)) + .unwrap(); + } + ServerMessage::SuccessfulRegistration => { + self.registered = Some(message.source_id); + info!( + "{} [ ChatClient {} ]: Successfully registered to the server [ CommunicationServer {} ]", + "✓".green(), + self.id, + message.source_id + ); + self.controller_send + .send(ChatClientEvent::SuccessfulRegistration(message.source_id)) + .unwrap(); + } + ServerMessage::SuccessfullLogOut => { + self.registered = None; + info!( + "{} [ ChatClient {} ]: Successfully logged out from the server [ CommunicationServer {} ]", + "✓".green(), + self.id, + message.source_id + ); + self.controller_send + .send(ChatClientEvent::SuccessfulLogOut) + .unwrap(); } _ => { error!( @@ -1109,9 +1240,16 @@ impl ChatClient { ); } } + } else { + // errore un client non può comunicare direttamente con un altro client + error!( + "{} [ ChatClient {} ]: Received a message from an unexpected source: [ Client {} ]", + "✗".red(), + self.id, + message.source_id + ); } } else { - //log that there are no messages to read info!( "{} [ ChatClient {} ]: No messages to read", "ℹ".blue(), @@ -1119,44 +1257,53 @@ impl ChatClient { ); } } - todo!("gestire i casi di errore"); - fn send_message_to(&self, message: Message) { - // controllo che il chat client conosca i server, che sia registrato ad uno di loro e che la destinazione sia registrata a quel server - if self.running - && let Some(server_id) = self.registered - { - self.get_client_list(); - if self.client_list.contains(&message.source_id) { - if let Ok(fragments) = self.assembler.fragment_message(&message) { - let path = self.network_knowledge.find_paths_to(self.id, server_id)[0]; - let routing_header = SourceRoutingHeader { - hop_index: 0, - hops: path.clone(), - }; + fn send_message_to(&mut self, message: Message, client_destination: NodeId) { + self.get_client_list(); + if self.client_list.contains(&client_destination) { + let path = self + .network_knowledge + .find_paths_to(self.id, message.destination_id)[0] + .clone(); - for frag in fragments { - let packet_to_send = Packet { - routing_header: routing_header.clone(), - session_id: message.session_id, - pack_type: PacketType::MsgFragment(frag), - }; + let routing_header = SourceRoutingHeader { + hop_index: 1, + hops: path.clone(), + }; + + if let Ok(fragments) = self.assembler.fragment_message(&message) { + for frag in fragments { + let packet_to_send = Packet { + routing_header: routing_header.clone(), + session_id: message.session_id, + pack_type: PacketType::MsgFragment(frag.clone()), + }; - self.packet_cache.insert(packet_to_send.clone()); + self.packet_cache.insert(packet_to_send.clone()); - info!( - "{} [ ChatClient {} ]: Sending fragment with session_id: {} and fragment_index: {} to [ Server {} ]", + info!( + "{} [ ChatClient {} ]: Sending fragment with session_id: {} and fragment_index: {} to [ CommunicationServer {} ]", "✓".green(), self.id, message.session_id, frag.fragment_index, - server_id + message.destination_id ); - self.forward_packet(packet_to_send); - } + self.forward_packet(packet_to_send); } } + } else { + // unreachable + error!( + "{} [ ChatClient {} ]: Cannot send message, destination client {} is unreachable", + "✗".red(), + self.id, + client_destination + ); + self.controller_send + .send(ChatClientEvent::UnreachableClient(client_destination)) + .unwrap(); } } @@ -1168,14 +1315,14 @@ impl ChatClient { let query = Message::new_client_message( session_id, self.id, - server_id, - messages::high_level_messages::ClientMessage::GetServerType, + *server_id, + ClientMessage::GetServerType, ); - let path = self.network_knowledge.find_paths_to(self.id, server_id)[0]; + let path = self.network_knowledge.find_paths_to(self.id, *server_id)[0].clone(); let routing_header = SourceRoutingHeader { - hop_index: 0, + hop_index: 1, hops: path, }; @@ -1204,37 +1351,35 @@ impl ChatClient { } fn get_client_list(&mut self) { - if self.running - && let Some(server_id) = self.registered - { - let session_id = self.session_id_counter; - self.session_id_counter += 1; + let server_id = self.registered.unwrap(); + let session_id = self.session_id_counter; + self.session_id_counter += 1; - let message = Message::new_client_message( - session_id, - self.id, - server_id, - messages::high_level_messages::ClientMessage::GetClientList, - ); + let message = Message::new_client_message( + session_id, + self.id, + server_id, + ClientMessage::GetClientList, + ); - let path = self.network_knowledge.find_paths_to(self.id, server_id)[0]; + let path = self.network_knowledge.find_paths_to(self.id, server_id)[0].clone(); - let routing_header = SourceRoutingHeader { - hop_index: 0, - hops: path, - }; + let routing_header = SourceRoutingHeader { + hop_index: 1, + hops: path, + }; - if let Ok(fragments) = self.assembler.fragment_message(&message) { - for frag in fragments { - let packet = Packet { - routing_header: routing_header.clone(), - session_id, - pack_type: PacketType::MsgFragment(frag), - }; + if let Ok(fragments) = self.assembler.fragment_message(&message) { + for frag in fragments { + let packet = Packet { + routing_header: routing_header.clone(), + session_id, + pack_type: PacketType::MsgFragment(frag), + }; - self.packet_cache.insert(packet.clone()); + self.packet_cache.insert(packet.clone()); - info!( + info!( "{} [ ChatClient {} ]: Requesting client list from [ Server {} ] with session_id: {}", "ℹ".blue(), self.id, @@ -1242,50 +1387,84 @@ impl ChatClient { session_id ); - self.forward_packet(packet); - } + self.forward_packet(packet); } } } fn register_to(&mut self, server_id: NodeId) { - if self.running && self.communication_server_list.contains(&server_id) { - let session_id = self.session_id_counter; - self.session_id_counter += 1; - let message = Message::new_client_message( - session_id, - self.id, - server_id, - messages::high_level_messages::ClientMessage::RegisterToChat, - ); + let session_id = self.session_id_counter; + self.session_id_counter += 1; + let message = Message::new_client_message( + session_id, + self.id, + server_id, + ClientMessage::RegisterToChat, + ); - let path = self.network_knowledge.find_paths_to(self.id, server_id)[0]; + let path = self.network_knowledge.find_paths_to(self.id, server_id)[0].clone(); - let routing_header = SourceRoutingHeader { - hop_index: 0, - hops: path, - }; + let routing_header = SourceRoutingHeader { + hop_index: 1, + hops: path, + }; - if let Ok(fragments) = self.assembler.fragment_message(&message) { - for frag in fragments { - let packet = Packet { - routing_header: routing_header.clone(), - session_id, - pack_type: PacketType::MsgFragment(frag), - }; + if let Ok(fragments) = self.assembler.fragment_message(&message) { + for frag in fragments { + let packet = Packet { + routing_header: routing_header.clone(), + session_id, + pack_type: PacketType::MsgFragment(frag), + }; - self.packet_cache.insert(packet.clone()); + self.packet_cache.insert(packet.clone()); - info!( - "{} [ ChatClient {} ]: Registering to [ Server {} ] with session_id: {}", + info!( + "{} [ ChatClient {} ]: Registering to [ CommunicationServer {} ] with session_id: {}", "ℹ".blue(), self.id, server_id, session_id ); - self.forward_packet(packet); - } + self.forward_packet(packet); + } + } + } + + fn logout(&mut self) { + let server_id = self.registered.unwrap(); + let session_id = self.session_id_counter; + self.session_id_counter += 1; + let message = + Message::new_client_message(session_id, self.id, server_id, ClientMessage::Logout); + + let path = self.network_knowledge.find_paths_to(self.id, server_id)[0].clone(); + + let routing_header = SourceRoutingHeader { + hop_index: 1, + hops: path, + }; + + if let Ok(fragments) = self.assembler.fragment_message(&message) { + for frag in fragments { + let packet = Packet { + routing_header: routing_header.clone(), + session_id, + pack_type: PacketType::MsgFragment(frag), + }; + + self.packet_cache.insert(packet.clone()); + + info!( + "{} [ ChatClient {} ]: Logging out from [ CommunicationServer {} ] with session_id: {}", + "ℹ".blue(), + self.id, + server_id, + session_id + ); + + self.forward_packet(packet); } } } diff --git a/src/network_structure.rs b/src/network_structure.rs index c6ab59b..1a7439a 100644 --- a/src/network_structure.rs +++ b/src/network_structure.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{HashMap, HashSet}, - thread::current, -}; +use std::collections::{HashMap, HashSet}; use wg_2024::{network::NodeId, packet::NodeType}; diff --git a/src/packet_cache.rs b/src/packet_cache.rs index c6cb772..12be2b8 100644 --- a/src/packet_cache.rs +++ b/src/packet_cache.rs @@ -1,8 +1,8 @@ use std::collections::HashMap; use log::error; -use rand::seq::index; -use wg_2024::packet::{Fragment, Packet, PacketType}; + +use wg_2024::packet::{Packet, PacketType}; pub struct PacketCache { // session_id, fragment_id