Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion apps/freenet-ping/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion crates/core/src/client_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,8 @@ async fn register_subscription_listener(
operation = operation_type,
"Subscriber listener registered successfully"
);
// Register client subscription to prevent upstream unsubscription while this client is active
op_manager.ring.add_client_subscription(&key, client_id);
Ok(())
}
_ => {
Expand Down Expand Up @@ -1397,6 +1399,8 @@ async fn process_open_request(
phase = "listener_registered",
"Subscriber listener registered successfully"
);
// Register client subscription to enable subscription tree pruning on disconnect
op_manager.ring.add_client_subscription(&key, client_id);
}
_ => {
tracing::error!(
Expand Down Expand Up @@ -1596,8 +1600,16 @@ async fn process_open_request(
tracing::debug!(
client_id = %client_id,
request_id = %request_id,
"Received disconnect request from client"
"Received disconnect request from client, triggering subscription cleanup"
);

// Notify the node to clean up this client's subscriptions and trigger tree pruning
if let Err(err) = op_manager
.notify_node_event(NodeEvent::ClientDisconnected { client_id })
.await
{
tracing::error!(%client_id, "Failed to notify node of client disconnect: {}", err);
}
}
ClientRequest::NodeQueries(query) => {
tracing::debug!(
Expand Down
22 changes: 20 additions & 2 deletions crates/core/src/client_events/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,12 +419,30 @@ async fn websocket_interface(
}
Ok(None) => continue,
Err(None) => {
tracing::debug!("client channel closed on request");
tracing::debug!(%client_id, "Client channel closed, notifying node for subscription cleanup");
// Notify node about client disconnect to trigger subscription cleanup
let _ = request_sender
.send(ClientConnection::Request {
client_id,
req: Box::new(ClientRequest::Disconnect { cause: None }),
auth_token: auth_token.as_ref().map(|t| t.0.clone()),
attested_contract: auth_token.as_ref().map(|t| t.1),
})
.await;
let _ = server_sink.send(Message::Close(None)).await;
return Ok(())
},
Err(Some(err)) => {
tracing::debug!(err = %err, "client channel error on request");
tracing::debug!(%client_id, err = %err, "Client channel error, notifying node for subscription cleanup");
// Notify node about client disconnect to trigger subscription cleanup even on error
let _ = request_sender
.send(ClientConnection::Request {
client_id,
req: Box::new(ClientRequest::Disconnect { cause: None }),
auth_token: auth_token.as_ref().map(|t| t.0.clone()),
attested_contract: auth_token.as_ref().map(|t| t.1),
})
.await;
return Err(err)
},
}
Expand Down
20 changes: 13 additions & 7 deletions crates/core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,18 +430,17 @@ pub(crate) enum NodeEvent {
TransactionTimedOut(Transaction),
/// Transaction completed successfully - cleanup client subscription
TransactionCompleted(Transaction),
/// Local subscription completed - deliver SubscribeResponse to client via result router.
/// **Standalone** subscription completed - deliver SubscribeResponse to client via result router.
///
/// **IMPORTANT:** This event is ONLY used for standalone subscriptions (no remote peers available).
/// Normal network subscriptions go through `handle_op_result`, which sends results via
/// `result_router_tx` directly without needing this event.
///
/// **Architecture Note (Issue #2075):**
/// This event is part of the decoupled subscription architecture. Local client subscriptions
/// are handled separately from network peer subscriptions:
/// - This event notifies the client layer that a subscription request has been processed
/// Local client subscriptions are handled separately from network peer subscriptions:
/// - Subsequent contract updates are delivered via the executor's `update_notifications`
/// channels (see `send_update_notification` in runtime.rs)
/// - Network peer subscriptions use the `seeding_manager.subscribers` for UPDATE propagation
///
/// This separation keeps the ops/ module (network operations) independent from the
/// client_events/ module (local WebSocket client handling).
LocalSubscribeComplete {
tx: Transaction,
key: ContractKey,
Expand All @@ -455,6 +454,10 @@ pub(crate) enum NodeEvent {
BroadcastProximityCache {
message: ProximityCacheMessage,
},
/// A WebSocket client disconnected - clean up its subscriptions and trigger tree pruning.
ClientDisconnected {
client_id: ClientId,
},
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -537,6 +540,9 @@ impl Display for NodeEvent {
NodeEvent::BroadcastProximityCache { message } => {
write!(f, "BroadcastProximityCache ({message:?})")
}
NodeEvent::ClientDisconnected { client_id } => {
write!(f, "ClientDisconnected (client: {client_id})")
}
}
}
}
Expand Down
127 changes: 84 additions & 43 deletions crates/core/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,34 +425,43 @@ async fn report_result(
);
}

// Send to result router
// Send to result router (skip for sub-operations - parent handles notification)
if let Some(transaction) = tx {
let host_result = op_res.to_host_result();
let router_tx_clone = op_manager.result_router_tx.clone();
let event_notifier = op_manager.to_event_listener.clone();

// Spawn fire-and-forget task to avoid blocking report_result()
// while still guaranteeing message delivery
tokio::spawn(async move {
if let Err(e) = router_tx_clone.send((transaction, host_result)).await {
tracing::error!(
"CRITICAL: Result router channel closed - dual-path delivery broken. \
Router or session actor has crashed. Transaction: {}. Error: {}. \
Consider restarting node.",
transaction,
e
);
// TODO: Consider implementing circuit breaker or automatic recovery
} else {
// Transaction completed successfully, notify to clean up subscriptions
use crate::message::NodeEvent;
use either::Either;
let _ = event_notifier
.notifications_sender
.send(Either::Right(NodeEvent::TransactionCompleted(transaction)))
.await;
}
});
// Sub-operations (e.g., Subscribe spawned by PUT) don't notify clients directly;
// the parent operation handles the client response.
if op_manager.is_sub_operation(transaction) {
tracing::debug!(
tx = %transaction,
"Skipping client notification for sub-operation"
);
} else {
let host_result = op_res.to_host_result();
let router_tx_clone = op_manager.result_router_tx.clone();
let event_notifier = op_manager.to_event_listener.clone();

// Spawn fire-and-forget task to avoid blocking report_result()
// while still guaranteeing message delivery
tokio::spawn(async move {
if let Err(e) = router_tx_clone.send((transaction, host_result)).await {
tracing::error!(
"CRITICAL: Result router channel closed - dual-path delivery broken. \
Router or session actor has crashed. Transaction: {}. Error: {}. \
Consider restarting node.",
transaction,
e
);
// TODO: Consider implementing circuit breaker or automatic recovery
} else {
// Transaction completed successfully, notify to clean up subscriptions
use crate::message::NodeEvent;
use either::Either;
let _ = event_notifier
.notifications_sender
.send(Either::Right(NodeEvent::TransactionCompleted(transaction)))
.await;
}
});
}
}

// check operations.rs:handle_op_result to see what's the meaning of each state
Expand Down Expand Up @@ -848,21 +857,37 @@ async fn process_message_v1<CB>(
.await;
}
NetMessageV1::Unsubscribed {
ref key, ref from, ..
ref key,
ref from,
ref transaction,
} => {
tracing::debug!(
"Received Unsubscribed message for contract {} from peer {}",
key,
from
);
// Convert PeerKeyLocation to PeerId for remove_subscriber
tracing::debug!(%key, %from, "Received Unsubscribed");
let peer_id = PeerId {
addr: from
.socket_addr()
.expect("from peer should have socket address"),
pub_key: from.pub_key().clone(),
};
op_manager.ring.remove_subscriber(key, &peer_id);

let result = op_manager.ring.remove_subscriber(key, &peer_id);

if let Some(upstream) = result.notify_upstream {
let upstream_addr = upstream
.socket_addr()
.expect("upstream must have socket address");
tracing::debug!(%key, %upstream_addr, "Propagating Unsubscribed");

let own_location = op_manager.ring.connection_manager.own_location();
let unsubscribe_msg = NetMessage::V1(NetMessageV1::Unsubscribed {
transaction: *transaction,
key: *key,
from: own_location,
});

if let Err(e) = conn_manager.send(upstream_addr, unsubscribe_msg).await {
tracing::warn!(%key, %upstream_addr, error = %e, "Failed to propagate Unsubscribed");
}
}
break;
}
NetMessageV1::ProximityCache { .. } => {
Expand Down Expand Up @@ -1096,21 +1121,37 @@ where
.await;
}
NetMessageV1::Unsubscribed {
ref key, ref from, ..
ref key,
ref from,
ref transaction,
} => {
tracing::debug!(
"Received Unsubscribed message for contract {} from peer {}",
key,
from
);
// Convert PeerKeyLocation to PeerId for remove_subscriber
tracing::debug!(%key, %from, "Received Unsubscribed");
let peer_id = PeerId {
addr: from
.socket_addr()
.expect("from peer should have socket address"),
pub_key: from.pub_key().clone(),
};
op_manager.ring.remove_subscriber(key, &peer_id);

let result = op_manager.ring.remove_subscriber(key, &peer_id);

if let Some(upstream) = result.notify_upstream {
let upstream_addr = upstream
.socket_addr()
.expect("upstream must have socket address");
tracing::debug!(%key, %upstream_addr, "Propagating Unsubscribed");

let own_location = op_manager.ring.connection_manager.own_location();
let unsubscribe_msg = NetMessage::V1(NetMessageV1::Unsubscribed {
transaction: *transaction,
key: *key,
from: own_location,
});

if let Err(e) = conn_manager.send(upstream_addr, unsubscribe_msg).await {
tracing::warn!(%key, %upstream_addr, error = %e, "Failed to propagate Unsubscribed");
}
}
break;
}
NetMessageV1::ProximityCache { ref message } => {
Expand Down
Loading