From 79dee21a2e04ff3bbd033c2f43000a075545af95 Mon Sep 17 00:00:00 2001 From: Mindaugas Vinkelis Date: Mon, 14 Oct 2019 10:40:04 +0300 Subject: [PATCH] WIP connection factory --- examples/udp.rs | 3 +- src/net.rs | 7 +- src/net/connection.rs | 108 +++++++++++------ src/net/connection_impl.rs | 141 +++++++++++----------- src/net/connection_manager.rs | 172 ++++++++++++++------------- src/net/factory_impl.rs | 181 +++++++++++++++++++++++++++++ src/net/socket.rs | 92 +++------------ src/net/socket_with_conditioner.rs | 62 ++++++++++ src/net/virtual_connection.rs | 6 + src/packet/packet_structure.rs | 2 +- src/test_utils/fake_socket.rs | 10 +- 11 files changed, 512 insertions(+), 272 deletions(-) create mode 100644 src/net/factory_impl.rs create mode 100644 src/net/socket_with_conditioner.rs diff --git a/examples/udp.rs b/examples/udp.rs index 8d59a9c0..68505fb7 100644 --- a/examples/udp.rs +++ b/examples/udp.rs @@ -40,14 +40,13 @@ pub fn receive_data() { if let Some(result) = socket.recv() { match result { SocketEvent::Packet(packet) => { - let endpoint: SocketAddr = packet.addr(); let received_data: &[u8] = packet.payload(); // you can here deserialize your bytes into the data you have passed it when sending. println!( "Received packet from: {:?} with length {}", - endpoint, + packet.addr(), received_data.len() ); } diff --git a/src/net.rs b/src/net.rs index 7b7a5949..eae3d65e 100644 --- a/src/net.rs +++ b/src/net.rs @@ -1,21 +1,26 @@ //! This module provides the logic between the low-level abstract types and the types that the user will be interacting with. //! You can think of the socket, connection management, congestion control. -pub use self::connection::{Connection, ConnectionEventAddress, ConnectionMessenger}; +pub use self::connection::{Connection, ConnectionFactory, ConnectionMessenger}; +pub use self::connection_impl::ConnectionImpl; pub use self::connection_manager::{ConnectionManager, DatagramSocket}; pub use self::events::SocketEvent; +pub use self::factory_impl::FactoryImpl; pub use self::link_conditioner::LinkConditioner; pub use self::quality::{NetworkQuality, RttMeasurer}; pub use self::socket::Socket; +pub use self::socket_with_conditioner::SocketWithConditioner; pub use self::virtual_connection::VirtualConnection; mod connection; mod connection_impl; mod connection_manager; mod events; +mod factory_impl; mod link_conditioner; mod quality; mod socket; +mod socket_with_conditioner; mod virtual_connection; pub mod constants; diff --git a/src/net/connection.rs b/src/net/connection.rs index fedcd25b..16678a95 100644 --- a/src/net/connection.rs +++ b/src/net/connection.rs @@ -1,73 +1,113 @@ -use std::{self, fmt::Debug, net::SocketAddr, time::Instant}; - use crate::config::Config; +use std::{self, collections::HashMap, fmt::Debug, net::SocketAddr, time::Instant}; + /// Allows connection to send packet, send event and get global configuration. -pub trait ConnectionMessenger { +pub trait ConnectionMessenger { /// Returns global configuration. fn config(&self) -> &Config; - /// Sends a connection event. - fn send_event(&mut self, address: &SocketAddr, event: ReceiveEvent); + fn send_event(&mut self, event: ConnectionEvent); /// Sends a packet. fn send_packet(&mut self, address: &SocketAddr, payload: &[u8]); } -/// Returns an address of an event. -/// This is used by a `ConnectionManager`, because it doesn't know anything about connection events. -pub trait ConnectionEventAddress { - /// Returns event address - fn address(&self) -> SocketAddr; -} - /// Allows to implement actual connection. -/// Defines a type of `Send` and `Receive` events, that will be used by a connection. +/// Defines types of user and connection events that will be used by a connection. pub trait Connection: Debug { /// Defines a user event type. - type SendEvent: Debug + ConnectionEventAddress; + type UserEvent: Debug; /// Defines a connection event type. - type ReceiveEvent: Debug + ConnectionEventAddress; + type ConnectionEvent: Debug; - /// Creates new connection and initialize it by sending an connection event to the user. - /// * messenger - allows to send packets and events, also provides a config. - /// * address - defines a address that connection is associated with. - /// * time - creation time, used by connection, so that it doesn't get dropped immediately or send heartbeat packet. - /// * initial_data - if initiated by remote host, this will hold that a packet data. - fn create_connection( - messenger: &mut impl ConnectionMessenger, - address: SocketAddr, + /// Initial call with a payload, when connection is created by accepting remote packet. + fn after_remote_accepted( + &mut self, time: Instant, - initial_data: Option<&[u8]>, - ) -> Self; + messenger: &mut impl ConnectionMessenger, + payload: &[u8], + ); - /// Determines if the connection should be dropped due to its state. - fn should_drop( + /// Initial call with a event, when connection is created by accepting user event. + fn after_local_accepted( &mut self, - messenger: &mut impl ConnectionMessenger, time: Instant, - ) -> bool; + messenger: &mut impl ConnectionMessenger, + event: Self::UserEvent, + ); /// Processes a received packet: parse it and emit an event. fn process_packet( &mut self, - messenger: &mut impl ConnectionMessenger, - payload: &[u8], time: Instant, + messenger: &mut impl ConnectionMessenger, + payload: &[u8], ); /// Processes a received event and send a packet. fn process_event( &mut self, - messenger: &mut impl ConnectionMessenger, - event: Self::SendEvent, time: Instant, + messenger: &mut impl ConnectionMessenger, + event: Self::UserEvent, ); /// Processes various connection-related tasks: resend dropped packets, send heartbeat packet, etc... /// This function gets called frequently. fn update( &mut self, - messenger: &mut impl ConnectionMessenger, time: Instant, + messenger: &mut impl ConnectionMessenger, ); + + /// Last call before connection is destroyed. + fn before_discarded( + &mut self, + time: Instant, + messenger: &mut impl ConnectionMessenger, + ); +} + +/// Decides when to create and destroy connections, and provides a way for `ConnectionManager` to get connection from an user event. +pub trait ConnectionFactory: Debug { + /// An actual connection type that is created by a factory. + type Connection: Connection; + + /// Provides a mapping from user event to an actual physical address. + /// If `None` is returned, event is ignored. If address doesn't exists in the active connections list, then `should_accept_local` will be invoked. + /// Being factory method, it supports connections that are not necessary identified by `SocketAddr`. + /// E.g. QUIC use ConnectionId to identify the connection. + fn address_from_user_event<'s, 'a>( + &'s self, + event: &'a ::UserEvent, + ) -> Option<&'a SocketAddr> + where + 's: 'a; + + /// Determines if remote connection can be accepted. + /// If connection is accepted, then `after_remote_accepted` will be invoked on it. + fn should_accept_remote( + &mut self, + time: Instant, + address: SocketAddr, + data: &[u8], + ) -> Option; + + /// Determines if local connection can be accepted. + /// If connection is accepted, then `after_remote_accepted` will be invoked on it. + fn should_accept_local( + &mut self, + time: Instant, + address: SocketAddr, + event: &::UserEvent, + ) -> Option; + + /// This allows to implement all sorts of things, a few examples include: + /// * Banning a connection. + /// * Disconnect a connection, if there are too many connections in "connecting" state. + fn update(&mut self, time: Instant, connections: &mut HashMap); + + /// Determines if connection should be discarded. + /// If connection is discarded, then `before_discarded` will be invoked on it. + fn should_discard(&mut self, time: Instant, connection: &Self::Connection) -> bool; } diff --git a/src/net/connection_impl.rs b/src/net/connection_impl.rs index 4cc0ebc9..a776b8fc 100644 --- a/src/net/connection_impl.rs +++ b/src/net/connection_impl.rs @@ -1,87 +1,72 @@ +use crate::{ + error::{ErrorKind, Result}, + net::{Connection, ConnectionMessenger, VirtualConnection}, + packet::{DeliveryGuarantee, OutgoingPackets, Packet, PacketInfo}, +}; + +use super::events::SocketEvent; + use std::net::SocketAddr; use std::time::Instant; use log::error; -use crate::error::{ErrorKind, Result}; -use crate::packet::{DeliveryGuarantee, OutgoingPackets, Packet, PacketInfo}; - -use super::{ - events::SocketEvent, Connection, ConnectionEventAddress, ConnectionMessenger, VirtualConnection, -}; - -/// Required by `ConnectionManager` to properly handle connection event. -impl ConnectionEventAddress for SocketEvent { - /// Returns event address. - fn address(&self) -> SocketAddr { - match self { - SocketEvent::Packet(packet) => packet.addr(), - SocketEvent::Connect(addr) => *addr, - SocketEvent::Timeout(addr) => *addr, - } - } +pub struct ConnectionImpl { + pub non_accepted_timeout: Option, + pub conn: VirtualConnection, } -/// Required by `ConnectionManager` to properly handle user event. -impl ConnectionEventAddress for Packet { - /// Returns event address. - fn address(&self) -> SocketAddr { - self.addr() +impl std::fmt::Debug for ConnectionImpl { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "{}:{}", + self.conn.remote_address.ip(), + self.conn.remote_address.port() + ) } } -impl Connection for VirtualConnection { +impl Connection for ConnectionImpl { /// Defines a user event type. - type SendEvent = Packet; + type UserEvent = Packet; /// Defines a connection event type. - type ReceiveEvent = SocketEvent; + type ConnectionEvent = SocketEvent; - /// Creates new connection and initialize it by sending an connection event to the user. - /// * address - defines a address that connection is associated with. - /// * time - creation time, used by connection, so that it doesn't get dropped immediately or send heartbeat packet. - /// * initial_data - if initiated by remote host, this will hold that a packet data. - fn create_connection( - messenger: &mut impl ConnectionMessenger, - address: SocketAddr, + /// Initial call with a payload, when connection is created by accepting remote packet. + fn after_remote_accepted( + &mut self, time: Instant, - initial_data: Option<&[u8]>, - ) -> VirtualConnection { - // emit connect event if this is initiated by the remote host. - if initial_data.is_some() { - messenger.send_event(&address, SocketEvent::Connect(address)); - } - VirtualConnection::new(address, messenger.config(), time) + messenger: &mut impl ConnectionMessenger, + payload: &[u8], + ) { + // emit connect event, for remote connection + messenger.send_event(SocketEvent::Connect(self.conn.remote_address)); + self.process_packet(time, messenger, payload); } - /// Determines if the given `Connection` should be dropped due to its state. - fn should_drop( + /// Initial call with a event, when connection is created by accepting user event. + fn after_local_accepted( &mut self, - messenger: &mut impl ConnectionMessenger, time: Instant, - ) -> bool { - let should_drop = self.packets_in_flight() > messenger.config().max_packets_in_flight - || self.last_heard(time) >= messenger.config().idle_connection_timeout; - if should_drop { - messenger.send_event( - &self.remote_address, - SocketEvent::Timeout(self.remote_address), - ); - } - should_drop + messenger: &mut impl ConnectionMessenger, + event: Self::UserEvent, + ) { + self.process_event(time, messenger, event); } - /// Processes a received packet: parse it and emit an event. + /// Processes a received packet: parse it and emit an connection event. fn process_packet( &mut self, - messenger: &mut impl ConnectionMessenger, - payload: &[u8], time: Instant, + messenger: &mut impl ConnectionMessenger, + payload: &[u8], ) { if !payload.is_empty() { - match self.process_incoming(payload, time) { + match self.conn.process_incoming(payload, time) { Ok(packets) => { for incoming in packets { - messenger.send_event(&self.remote_address, SocketEvent::Packet(incoming.0)); + messenger.send_event(SocketEvent::Packet(incoming.0)); } } Err(err) => error!("Error occured processing incomming packet: {:?}", err), @@ -94,18 +79,19 @@ impl Connection for VirtualConnection { } } - /// Processes a received event and send a packet. + /// Processes an user event and send a packet. fn process_event( &mut self, - messenger: &mut impl ConnectionMessenger, - event: Self::SendEvent, time: Instant, + messenger: &mut impl ConnectionMessenger, + event: Self::UserEvent, ) { - let addr = self.remote_address; + self.non_accepted_timeout = None; + let addr = self.conn.remote_address; send_packets( messenger, &addr, - self.process_outgoing( + self.conn.process_outgoing( PacketInfo::user_packet( event.payload(), event.delivery_guarantee(), @@ -119,15 +105,15 @@ impl Connection for VirtualConnection { } /// Processes various connection-related tasks: resend dropped packets, send heartbeat packet, etc... - /// This function gets called very frequently. + /// This function gets called frequently. fn update( &mut self, - messenger: &mut impl ConnectionMessenger, time: Instant, + messenger: &mut impl ConnectionMessenger, ) { // resend dropped packets - for dropped in self.gather_dropped_packets() { - let packets = self.process_outgoing( + for dropped in self.conn.gather_dropped_packets() { + let packets = self.conn.process_outgoing( PacketInfo { packet_type: dropped.packet_type, payload: &dropped.payload, @@ -139,22 +125,37 @@ impl Connection for VirtualConnection { dropped.item_identifier, time, ); - send_packets(messenger, &self.remote_address, packets, "dropped packets"); + send_packets( + messenger, + &self.conn.remote_address, + packets, + "dropped packets", + ); } // send heartbeat packets if required if let Some(heartbeat_interval) = messenger.config().heartbeat_interval { - let addr = self.remote_address; - if self.last_sent(time) >= heartbeat_interval { + let addr = self.conn.remote_address; + if self.conn.last_sent(time) >= heartbeat_interval { send_packets( messenger, &addr, - self.process_outgoing(PacketInfo::heartbeat_packet(&[]), None, time), + self.conn + .process_outgoing(PacketInfo::heartbeat_packet(&[]), None, time), "heatbeat packet", ); } } } + + /// Last call before connection is destroyed. + fn before_discarded( + &mut self, + _time: Instant, + messenger: &mut impl ConnectionMessenger, + ) { + messenger.send_event(SocketEvent::Timeout(self.conn.remote_address)); + } } // Sends multiple outgoing packets. diff --git a/src/net/connection_manager.rs b/src/net/connection_manager.rs index 60adef47..5c13be1d 100644 --- a/src/net/connection_manager.rs +++ b/src/net/connection_manager.rs @@ -1,15 +1,10 @@ -use std::{self, collections::HashMap, fmt::Debug, io::Result, net::SocketAddr, time::Instant}; - -use crossbeam_channel::{self, unbounded, Receiver, Sender}; -use log::error; - use crate::{ - config::Config, net::Connection, net::ConnectionEventAddress, net::ConnectionMessenger, + config::Config, + net::{Connection, ConnectionFactory, ConnectionMessenger}, }; - -// TODO: maybe we can make a breaking change and use this instead of `ConnectionEventAddress` trait? -// #[derive(Debug)] -// pub struct ConnectionEvent(pub SocketAddr, pub Event); +use crossbeam_channel::{self, unbounded, Receiver, Sender}; +use log::error; +use std::{self, collections::HashMap, fmt::Debug, io::Result, net::SocketAddr, time::Instant}; /// A datagram socket is a type of network socket which provides a connectionless point for sending or receiving data packets. pub trait DatagramSocket: Debug { @@ -28,16 +23,16 @@ pub trait DatagramSocket: Debug { // This will be used by a `Connection`. #[derive(Debug)] -struct SocketEventSenderAndConfig { +struct SocketEventSenderAndConfig { config: Config, socket: TSocket, - event_sender: Sender, + event_sender: Sender, } -impl - SocketEventSenderAndConfig +impl + SocketEventSenderAndConfig { - fn new(config: Config, socket: TSocket, event_sender: Sender) -> Self { + fn new(config: Config, socket: TSocket, event_sender: Sender) -> Self { Self { config, socket, @@ -46,14 +41,14 @@ impl } } -impl ConnectionMessenger - for SocketEventSenderAndConfig +impl ConnectionMessenger + for SocketEventSenderAndConfig { fn config(&self) -> &Config { &self.config } - fn send_event(&mut self, _address: &SocketAddr, event: ReceiveEvent) { + fn send_event(&mut self, event: ConnectionEvent) { self.event_sender.send(event).expect("Receiver must exists"); } @@ -68,27 +63,35 @@ impl ConnectionMessenger { - connections: HashMap, +pub struct ConnectionManager { + factory: TConnectionFactory, + connections: HashMap, receive_buffer: Vec, - user_event_receiver: Receiver, - messenger: SocketEventSenderAndConfig, - event_receiver: Receiver, - user_event_sender: Sender, + user_event_receiver: Receiver<::UserEvent>, + messenger: SocketEventSenderAndConfig< + TSocket, + ::ConnectionEvent, + >, + connection_event_receiver: + Receiver<::ConnectionEvent>, + user_event_sender: Sender<::UserEvent>, } -impl ConnectionManager { +impl + ConnectionManager +{ /// Creates an instance of `ConnectionManager` by passing a socket and config. - pub fn new(socket: TSocket, config: Config) -> Self { - let (event_sender, event_receiver) = unbounded(); + pub fn new(socket: TSocket, factory: TConnectionFactory, config: Config) -> Self { + let (connection_event_sender, connection_event_receiver) = unbounded(); let (user_event_sender, user_event_receiver) = unbounded(); ConnectionManager { + factory, receive_buffer: vec![0; config.receive_buffer_max_size], connections: Default::default(), user_event_receiver, - messenger: SocketEventSenderAndConfig::new(config, socket, event_sender), + messenger: SocketEventSenderAndConfig::new(config, socket, connection_event_sender), user_event_sender, - event_receiver, + connection_event_receiver, } } @@ -97,6 +100,7 @@ impl ConnectionManager ConnectionManager { if let Some(conn) = self.connections.get_mut(&address) { - conn.process_packet(messenger, payload, time); - } else { - // create connection, but do not add to active connections list - let mut conn = - TConnection::create_connection(messenger, address, time, Some(payload)); - conn.process_packet(messenger, payload, time); + conn.process_packet(time, messenger, payload); + } else if let Some(mut conn) = + factory.should_accept_remote(time, address, payload) + { + conn.after_remote_accepted(time, messenger, payload); + self.connections.insert(address, conn); } } Err(e) => { @@ -129,47 +133,78 @@ impl ConnectionManager &Sender { + pub fn event_sender( + &self, + ) -> &Sender<::UserEvent> { &self.user_event_sender } /// Returns a handle to the event receiver which provides a thread-safe way to retrieve events /// from the connections. This should be used when the socket is busy running its polling loop in /// a separate thread. - pub fn event_receiver(&self) -> &Receiver { - &self.event_receiver + pub fn event_receiver( + &self, + ) -> &Receiver<::ConnectionEvent> { + &self.connection_event_receiver } - /// Returns socket reference. + /// Returns a socket reference. pub fn socket(&self) -> &TSocket { &self.messenger.socket } - /// Returns socket mutable reference. + /// Returns a mutable socket reference. #[allow(dead_code)] pub fn socket_mut(&mut self) -> &mut TSocket { &mut self.messenger.socket } + /// Returns a connection factory reference. + #[allow(dead_code)] + pub fn factory(&self) -> &TConnectionFactory { + &self.factory + } + + /// Returns a mutable connection factory reference. + #[allow(dead_code)] + pub fn factory_mut(&mut self) -> &mut TConnectionFactory { + &mut self.factory + } + /// Returns a number of active connections. #[cfg(test)] pub fn connections_count(&self) -> usize { @@ -283,43 +318,6 @@ mod tests { panic!["Did not receive the ignored packet"]; } - #[test] - fn receiving_does_not_allow_denial_of_service() { - let (mut server, mut client, _) = create_server_client_network(); - // send a bunch of packets to a server - for _ in 0..3 { - client - .send(Packet::unreliable( - server_address(), - vec![1, 2, 3, 4, 5, 6, 7, 8, 9], - )) - .unwrap(); - } - - let time = Instant::now(); - - client.manual_poll(time); - server.manual_poll(time); - - for _ in 0..6 { - assert![server.recv().is_some()]; - } - assert![server.recv().is_none()]; - - // the server shall not have any connection in its connection table even though it received - // packets - assert_eq![0, server.connection_count()]; - - server - .send(Packet::unreliable(client_address(), vec![1])) - .unwrap(); - - server.manual_poll(time); - - // the server only adds to its table after having sent explicitly - assert_eq![1, server.connection_count()]; - } - #[test] fn initial_sequenced_is_resent() { let (mut server, mut client, network) = create_server_client_network(); @@ -704,8 +702,8 @@ mod tests { } // ensure that we get the correct number of events to the server. - // 35 connect events plus the 35 messages - assert_eq!(events.len(), 70); + // 35 connect events plus the 1 connect message + assert_eq!(events.len(), 36); // finally the server decides to send us a message back. This necessarily will include // the ack information for 33 of the sent 35 packets. diff --git a/src/net/factory_impl.rs b/src/net/factory_impl.rs new file mode 100644 index 00000000..6db476de --- /dev/null +++ b/src/net/factory_impl.rs @@ -0,0 +1,181 @@ +use crate::config::Config; +use crate::net::{Connection, ConnectionFactory}; +use crate::packet::Packet; + +use super::{ConnectionImpl, VirtualConnection}; + +use std::collections::HashMap; +use std::net::SocketAddr; +use std::time::{Duration, Instant}; + +/// Interval before connection is banned if not accepted +const NON_ACCEPT_TIMEOUT: Duration = Duration::from_secs(1); +/// Interval how long connection is banned, after it was not accepted +const TEMPORARY_BAN_TIMEOUT: Duration = Duration::from_secs(60); + +/// Provides simple DDoS protection by auto disconnecting non accepted connections, and ban them for some time. +#[derive(Debug)] +pub struct FactoryImpl { + config: Config, + temporary_banned: HashMap, +} + +impl FactoryImpl { + pub fn new(config: Config) -> Self { + FactoryImpl { + config, + temporary_banned: Default::default(), + } + } + + fn should_accept( + &mut self, + time: Instant, + address: SocketAddr, + non_accepted_timeout: Option, + ) -> Option { + if let Some(banned_until) = self.temporary_banned.get(&address).copied() { + if non_accepted_timeout.is_none() { + self.temporary_banned.remove(&address); + } else if banned_until > time { + return None; + } + } + Some(ConnectionImpl { + non_accepted_timeout, + conn: VirtualConnection::new(address, &self.config, time), + }) + } +} + +impl ConnectionFactory for FactoryImpl { + type Connection = ConnectionImpl; + + fn address_from_user_event<'s, 'a>(&'s self, event: &'a Packet) -> Option<&'a SocketAddr> + where + 's: 'a, + { + Some(&event.addr) + } + + /// Accepts connection if not banned. + fn should_accept_remote( + &mut self, + time: Instant, + address: SocketAddr, + _data: &[u8], + ) -> Option { + self.should_accept(time, address, Some(time + NON_ACCEPT_TIMEOUT)) + } + + /// Accepts connection if not banned. + fn should_accept_local( + &mut self, + time: Instant, + address: SocketAddr, + _event: &::UserEvent, + ) -> Option { + self.should_accept(time, address, None) + } + + /// Removes addresses from ban list. + fn update(&mut self, time: Instant, _connections: &mut HashMap) { + self.temporary_banned + .retain(|_, banned_until| *banned_until > time); + } + + /// Discards connection and ban it if it was banned due to non accepted timeout. + fn should_discard(&mut self, time: Instant, connection: &Self::Connection) -> bool { + if connection + .non_accepted_timeout + .map_or(false, |timeout| timeout < time) + { + self.temporary_banned + .insert(connection.conn.remote_address, time + TEMPORARY_BAN_TIMEOUT); + return true; + } + connection.conn.should_drop(time) + } +} + +#[cfg(test)] +mod tests { + use super::{ConnectionFactory, FactoryImpl, NON_ACCEPT_TIMEOUT, TEMPORARY_BAN_TIMEOUT}; + use crate::packet::Packet; + use std::net::SocketAddr; + use std::time::{Duration, Instant}; + + /// The socket address of where the server is located. + const ADDR: &str = "127.0.0.1:10001"; + + fn address() -> SocketAddr { + ADDR.parse().unwrap() + } + + fn user_event() -> Packet { + Packet::unreliable(address(), Default::default()) + } + + #[test] + fn accepting_local_connection_do_not_set_accept_timeout() { + let mut factory = FactoryImpl::new(Default::default()); + + let conn = factory + .should_accept_local(Instant::now(), address(), &user_event()) + .unwrap(); + + assert_eq!(conn.non_accepted_timeout, None); + } + + #[test] + fn accepting_remote_connection_sets_accept_timeout() { + let time = Instant::now(); + let mut factory = FactoryImpl::new(Default::default()); + + let conn = factory.should_accept_remote(time, address(), &[]).unwrap(); + + assert_eq!(conn.non_accepted_timeout, Some(time + NON_ACCEPT_TIMEOUT)); + } + + #[test] + fn when_non_accept_timeout_expires_connection_is_discarded_and_banned() { + let time = Instant::now(); + let mut factory = FactoryImpl::new(Default::default()); + + let conn = factory.should_accept_remote(time, address(), &[]).unwrap(); + let time = time + NON_ACCEPT_TIMEOUT + Duration::from_nanos(1); + let is_discarded = factory.should_discard(time, &conn); + let banned = factory.temporary_banned.iter().nth(0).unwrap(); + + assert_eq!(is_discarded, true); + assert_eq!(*banned.0, address()); + assert_eq!(*banned.1, time + TEMPORARY_BAN_TIMEOUT); + } + + #[test] + fn accepting_remote_while_banned_returns_none() { + let time = Instant::now(); + let mut factory = FactoryImpl::new(Default::default()); + + factory + .temporary_banned + .insert(address(), time + Duration::from_secs(100)); + let conn = factory.should_accept_remote(time, address(), &[]); + + assert_eq!(conn.is_none(), true); + } + + #[test] + fn accepting_local_while_banned_removes_ban() { + let time = Instant::now(); + let mut factory = FactoryImpl::new(Default::default()); + + factory + .temporary_banned + .insert(address(), time + Duration::from_secs(100)); + let conn = factory.should_accept_local(time, address(), &user_event()); + + assert_eq!(conn.is_some(), true); + assert_eq!(factory.temporary_banned.is_empty(), true); + } +} diff --git a/src/net/socket.rs b/src/net/socket.rs index 77281477..7b7d8326 100644 --- a/src/net/socket.rs +++ b/src/net/socket.rs @@ -1,3 +1,4 @@ +use crossbeam_channel::{self, Receiver, Sender, TryRecvError}; use std::{ self, net::{Ipv4Addr, SocketAddr, SocketAddrV4, ToSocketAddrs, UdpSocket}, @@ -5,96 +6,38 @@ use std::{ time::{Duration, Instant}, }; -use crossbeam_channel::{self, Receiver, Sender, TryRecvError}; - use crate::{ config::Config, error::Result, - net::{ - events::SocketEvent, ConnectionManager, DatagramSocket, LinkConditioner, VirtualConnection, - }, + net::{ConnectionManager, DatagramSocket, SocketWithConditioner}, packet::Packet, }; -// Wraps `LinkConditioner` and `UdpSocket` together. LinkConditioner is enabled when building with a "tester" feature. -#[derive(Debug)] -struct SocketWithConditioner { - is_blocking_mode: bool, - socket: UdpSocket, - link_conditioner: Option, -} - -impl SocketWithConditioner { - pub fn new(socket: UdpSocket, is_blocking_mode: bool) -> Result { - socket.set_nonblocking(!is_blocking_mode)?; - Ok(SocketWithConditioner { - is_blocking_mode, - socket, - link_conditioner: None, - }) - } +#[cfg(feature = "tester")] +use crate::net::LinkConditioner; - #[cfg(feature = "tester")] - pub fn set_link_conditioner(&mut self, link_conditioner: Option) { - self.link_conditioner = link_conditioner; - } -} - -/// Provides a `DatagramSocket` implementation for `SocketWithConditioner` -impl DatagramSocket for SocketWithConditioner { - // Determinate whether packet will be sent or not based on `LinkConditioner` if enabled. - fn send_packet(&mut self, addr: &SocketAddr, payload: &[u8]) -> std::io::Result { - if cfg!(feature = "tester") { - if let Some(ref mut link) = &mut self.link_conditioner { - if !link.should_send() { - return Ok(0); - } - } - } - self.socket.send_to(payload, addr) - } - - /// Receives a single packet from UDP socket. - fn receive_packet<'a>( - &mut self, - buffer: &'a mut [u8], - ) -> std::io::Result<(&'a [u8], SocketAddr)> { - self.socket - .recv_from(buffer) - .map(move |(recv_len, address)| (&buffer[..recv_len], address)) - } - - /// Returns the socket address that this socket was created from. - fn local_addr(&self) -> std::io::Result { - self.socket.local_addr() - } - - /// Returns whether socket operates in blocking or non-blocking mode. - fn is_blocking_mode(&self) -> bool { - self.is_blocking_mode - } -} +use super::{FactoryImpl, SocketEvent}; /// A reliable UDP socket implementation with configurable reliability and ordering guarantees. #[derive(Debug)] pub struct Socket { - handler: ConnectionManager, + handler: ConnectionManager, } impl Socket { /// Binds to the socket and then sets up `ActiveConnections` to manage the "connections". /// Because UDP connections are not persistent, we can only infer the status of the remote - /// endpoint by looking to see if they are still sending packets or not + /// endpoint by looking to see if they are still sending packets or not. pub fn bind(addresses: A) -> Result { Self::bind_with_config(addresses, Config::default()) } - /// Binds to any local port on the system, if available + /// Binds to any local port on the system, if available. pub fn bind_any() -> Result { Self::bind_any_with_config(Config::default()) } - /// Binds to any local port on the system, if available, with a given config + /// Binds to any local port on the system, if available, with a given config. pub fn bind_any_with_config(config: Config) -> Result { let loopback = Ipv4Addr::new(127, 0, 0, 1); let address = SocketAddrV4::new(loopback, 0); @@ -116,6 +59,7 @@ impl Socket { Ok(Socket { handler: ConnectionManager::new( SocketWithConditioner::new(socket, config.blocking_mode)?, + FactoryImpl::new(config.clone()), config, ), }) @@ -135,19 +79,19 @@ impl Socket { self.handler.event_receiver().clone() } - /// Sends a single packet - pub fn send(&mut self, packet: Packet) -> Result<()> { + /// Sends a single user event. + pub fn send(&mut self, event: Packet) -> Result<()> { self.handler .event_sender() - .send(packet) + .send(event) .expect("Receiver must exists."); Ok(()) } - /// Receives a single packet + /// Receives a single connection event. pub fn recv(&mut self) -> Option { match self.handler.event_receiver().try_recv() { - Ok(pkt) => Some(pkt), + Ok(event) => Some(event), Err(TryRecvError::Empty) => None, Err(TryRecvError::Disconnected) => panic!["This can never happen"], } @@ -162,9 +106,9 @@ impl Socket { /// Runs the polling loop with a specified sleep duration. This should run in a spawned thread /// since calls to `self.manual_poll` are blocking. pub fn start_polling_with_duration(&mut self, sleep_duration: Option) { - // nothing should break out of this loop! + // Nothing should break out of this loop! loop { - self.manual_poll(Instant::now()); + self.handler.manual_poll(Instant::now()); match sleep_duration { None => yield_now(), Some(duration) => sleep(duration), @@ -172,7 +116,7 @@ impl Socket { } } - /// Processes any inbound/outbound packets and handle idle clients + /// Processes any inbound/outbound packets and handle idle clients. pub fn manual_poll(&mut self, time: Instant) { self.handler.manual_poll(time); } diff --git a/src/net/socket_with_conditioner.rs b/src/net/socket_with_conditioner.rs new file mode 100644 index 00000000..9594f624 --- /dev/null +++ b/src/net/socket_with_conditioner.rs @@ -0,0 +1,62 @@ +use super::{DatagramSocket, LinkConditioner}; +use crate::error::Result; +use std::net::{SocketAddr, UdpSocket}; + +// Wrap `LinkConditioner` and `UdpSocket` together. LinkConditioner is enabled when building with a "tester" feature. +#[derive(Debug)] +pub struct SocketWithConditioner { + is_blocking_mode: bool, + socket: UdpSocket, + link_conditioner: Option, +} + +impl SocketWithConditioner { + pub fn new(socket: UdpSocket, is_blocking_mode: bool) -> Result { + socket.set_nonblocking(!is_blocking_mode)?; + Ok(SocketWithConditioner { + is_blocking_mode, + socket, + link_conditioner: None, + }) + } + + #[cfg(feature = "tester")] + pub fn set_link_conditioner(&mut self, link_conditioner: Option) { + self.link_conditioner = link_conditioner; + } +} + +/// Provides a `DatagramSocket` implementation for `SocketWithConditioner`. +impl DatagramSocket for SocketWithConditioner { + // Determinates whether packet will be sent or not based on `LinkConditioner` if enabled. + fn send_packet(&mut self, addr: &SocketAddr, payload: &[u8]) -> std::io::Result { + if cfg!(feature = "tester") { + if let Some(ref mut link) = &mut self.link_conditioner { + if !link.should_send() { + return Ok(0); + } + } + } + self.socket.send_to(payload, addr) + } + + /// Receives a single packet from UDP socket. + fn receive_packet<'a>( + &mut self, + buffer: &'a mut [u8], + ) -> std::io::Result<(&'a [u8], SocketAddr)> { + self.socket + .recv_from(buffer) + .map(move |(recv_len, address)| (&buffer[..recv_len], address)) + } + + /// Returns the socket address that this socket was created from. + fn local_addr(&self) -> std::io::Result { + self.socket.local_addr() + } + + /// Returns whether socket operates in blocking or non-blocking mode. + fn is_blocking_mode(&self) -> bool { + self.is_blocking_mode + } +} diff --git a/src/net/virtual_connection.rs b/src/net/virtual_connection.rs index 0f363a11..4eafd80b 100644 --- a/src/net/virtual_connection.rs +++ b/src/net/virtual_connection.rs @@ -404,6 +404,12 @@ impl VirtualConnection { pub fn gather_dropped_packets(&mut self) -> Vec { self.acknowledge_handler.dropped_packets() } + + /// Determines if the connection should be dropped due to its state. + pub fn should_drop(&self, time: Instant) -> bool { + self.packets_in_flight() > self.config.max_packets_in_flight + || self.last_heard(time) >= self.config.idle_connection_timeout + } } impl fmt::Debug for VirtualConnection { diff --git a/src/packet/packet_structure.rs b/src/packet/packet_structure.rs index b9605ab3..50724e9f 100644 --- a/src/packet/packet_structure.rs +++ b/src/packet/packet_structure.rs @@ -17,7 +17,7 @@ use crate::packet::{DeliveryGuarantee, OrderingGuarantee, PacketType}; /// You are able to send packets with the above reliability types. pub struct Packet { /// The endpoint from where it came. - addr: SocketAddr, + pub(crate) addr: SocketAddr, /// The raw payload of the packet. payload: Box<[u8]>, /// Defines on how the packet will be delivered. diff --git a/src/test_utils/fake_socket.rs b/src/test_utils/fake_socket.rs index 5a897add..dbe4d6d9 100644 --- a/src/test_utils/fake_socket.rs +++ b/src/test_utils/fake_socket.rs @@ -2,20 +2,24 @@ use std::{net::SocketAddr, time::Instant}; use crossbeam_channel::{Receiver, Sender}; -use crate::net::{ConnectionManager, LinkConditioner, VirtualConnection}; +use crate::net::{ConnectionManager, FactoryImpl, LinkConditioner}; use crate::test_utils::*; use crate::{error::Result, Config, Packet, SocketEvent}; /// Provides a similar to the real a `Socket`, but with emulated socket implementation. pub struct FakeSocket { - handler: ConnectionManager, + handler: ConnectionManager, } impl FakeSocket { /// Binds to the socket. pub fn bind(network: &NetworkEmulator, addr: SocketAddr, config: Config) -> Result { Ok(Self { - handler: ConnectionManager::new(network.new_socket(addr)?, config), + handler: ConnectionManager::new( + network.new_socket(addr)?, + FactoryImpl::new(config.clone()), + config, + ), }) }