From 550a0099b7af236f4c0fb5c38ac7349d277a3e27 Mon Sep 17 00:00:00 2001 From: Octavian Date: Wed, 17 Dec 2025 19:09:06 +0200 Subject: [PATCH] L2 subnetwork topology + netcfg CLI - API Node builds a low-level L2 network topology plan during prepare_topology call from the solver-level generated topology. It then transmits this plan as JSON routes to workers. - Shard Nodes receives the JSON plan and invokes the new dnet-netcfg tool (bridges, /31 addressing, routes, MTU rate, etc.) - netcfg: new macOS CLI that: - intakes plan from shard node in JSON form and applies it, validates it and optionally creates a new network service to persist interfaces across reboots. - handles virtual bridge creation/destruction, member iface attachment/detachment, changes MTU rate, updates routing table. - tests connections with pings, routes and iperf3 speeds. - netcfg requires sudo for some commands. It fails with permission errors on which the API Node prompts the user for the password of that specific peer. Once a password is given once netcfg registers itself with NOPASSWD via visudo in sudoers. It also moves itself in /opt/homebrew/bin because of the safe path requirement of visudo and modifies it's permissions. - unit test netcfg tool --- pyproject.toml | 3 + src/cli/api.py | 1 + src/dnet/api/http_api.py | 98 ++++ src/dnet/api/models.py | 10 + src/dnet/core/network/create.py | 169 ++++++ src/dnet/core/network/netcfg.py | 956 ++++++++++++++++++++++++++++++++ src/dnet/shard/http_api.py | 172 ++++++ src/dnet/shard/models.py | 31 ++ src/dnet/tui.py | 52 +- tests/test_netcfg.py | 280 ++++++++++ 10 files changed, 1771 insertions(+), 1 deletion(-) create mode 100644 src/dnet/core/network/create.py create mode 100644 src/dnet/core/network/netcfg.py create mode 100644 tests/test_netcfg.py diff --git a/pyproject.toml b/pyproject.toml index 33ba5f4e..12344de9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,6 +38,7 @@ dependencies = [ "dnet-p2p @ file://${PROJECT_ROOT}/lib/dnet-p2p/bindings/py", "rich>=13.0.0", "psutil>=5.9.0", + "pytest>=8.4.2", ] [project.optional-dependencies] @@ -56,6 +57,7 @@ dev = [ [project.scripts] dnet-api = "cli.api:main" dnet-shard = "cli.shard:main" +dnet-netcfg = "dnet.core.network.netcfg:main" [build-system] requires = ["uv_build>=0.8.17,<0.9.0"] @@ -79,6 +81,7 @@ markers = [ "core: tests for core memory/cache/utils not tied to api/shard", "e2e: integration tests requiring live servers or multiple components", "integration: model catalog integration tests for CI (manual trigger)", + "netcfg: test low-level network configuration executable" ] [tool.ruff] diff --git a/src/cli/api.py b/src/cli/api.py index 93ec9ae3..26b48854 100644 --- a/src/cli/api.py +++ b/src/cli/api.py @@ -94,6 +94,7 @@ def update_tui_model_info( inference_manager=inference_manager, model_manager=model_manager, node_id=node_id, + tui=tui, ) tui.update_status("Starting Servers...") diff --git a/src/dnet/api/http_api.py b/src/dnet/api/http_api.py index 1035d00f..b6bc3734 100644 --- a/src/dnet/api/http_api.py +++ b/src/dnet/api/http_api.py @@ -1,4 +1,5 @@ from typing import Optional, Any, List +import httpx import asyncio import os from hypercorn import Config @@ -7,6 +8,7 @@ from fastapi import FastAPI from fastapi.responses import JSONResponse, StreamingResponse from dnet.utils.model import get_model_config_json +from dnet.core.network.create import topo_to_network from distilp.profiler import profile_model from dnet.utils.logger import logger from .models import ( @@ -25,6 +27,8 @@ from .inference import InferenceManager from .model_manager import ModelManager from dnet_p2p import DnetDeviceProperties +import sys +import getpass class HTTPServer: @@ -35,6 +39,7 @@ def __init__( inference_manager: InferenceManager, model_manager: ModelManager, node_id: str, + tui: Optional["DnetTUI"] = None, ): self.http_port = http_port self.cluster_manager = cluster_manager @@ -43,6 +48,7 @@ def __init__( self.node_id = node_id self.app = FastAPI() self.http_server: Optional[asyncio.Task] = None + self.tui = tui async def start(self, shutdown_trigger: Any = lambda: asyncio.Future()) -> None: await self._setup_routes() @@ -251,6 +257,89 @@ async def get_topology(self) -> TopologyInfo: ) return topo + async def _create_subnetworks( + self, + topology: TopologyInfo, + netcfg_password: Optional[str] = None, + netcfg_persist: Optional[bool] = False, + netcfg_user: Optional[str] = None, + ) -> None: + net_topo = topo_to_network(topology) + shards = self.cluster_manager.shards + + # Forward subnetwork config to shards for execution + async with httpx.AsyncClient(trust_env=False) as client: + for instance, plan in net_topo.nodes.items(): + shard = shards.get(instance) + if not shard or shard.is_manager: + continue + url = f"http://{shard.local_ip}:{shard.server_port}/register" + payload = {"mappings": [r.model_dump(exclude_none=True) for r in plan.routes]} + + try: + res = await client.post(url, json=payload, timeout=10.0) + except Exception as e: + logger.warning("Network config failed on %s: %r", instance, e) + continue + logger.error(res) + if res.status_code == 200: + continue + + if res.status_code == 403: + password = netcfg_password + persist = bool(netcfg_persist or False) + user = netcfg_user + if not password and getattr(self, "tui", None) is not None: + persist = await self.tui.prompt_yes_no( + f"Shard {instance} needs sudo. Persist NOPASSWD for future runs?", + default=False, + ) + if persist: + default_user = getpass.getuser() + user = await self.tui.prompt_text( + "User to grant NOPASSWD", default=default_user + ) + password = await self.tui.prompt_password( + f"Enter admin password for {instance}: " + ) + + if password: + body = { + "mappings": payload["mappings"], + "password": password, + "persist": persist, + "user": user, + } + try: + res2 = await client.post( + f"http://{shard.local_ip}:{shard.server_port}/register_with_password", + json=body, + timeout=60.0, + ) + logger.error("=====================") + logger.error(res2) + if res2.status_code == 200: + continue + if res2.status_code == 401: + logger.warning("Wrong password for %s", instance) + continue + except Exception as e: + logger.warning( + "Privileged apply failed on %s: %r", instance, e + ) + else: + logger.warning( + "Network config skipped on %s (password not provided)", + instance, + ) + else: + logger.warning( + "Network config skipped on %s (status %s): %s", + instance, + res.status_code, + (res.text or "")[:120], + ) + async def prepare_topology(self, req: PrepareTopologyRequest) -> TopologyInfo: try: if not self.model_manager.is_model_available(req.model): @@ -292,6 +381,15 @@ async def prepare_topology(self, req: PrepareTopologyRequest) -> TopologyInfo: profiles, model_profile, req.model, num_layers, req.kv_bits ) self.cluster_manager.current_topology = topology + + await self._create_subnetworks( + topology, + netcfg_password=req.netcfg_password, + netcfg_persist=req.netcfg_persist, + netcfg_user=req.netcfg_user, + ) + logger.warning("AWAIT NETWORK DONE") + return topology except Exception as e: logger.exception("Error in prepare_topology: %s", e) diff --git a/src/dnet/api/models.py b/src/dnet/api/models.py index e27681ed..3f3f05a3 100644 --- a/src/dnet/api/models.py +++ b/src/dnet/api/models.py @@ -320,6 +320,16 @@ class PrepareTopologyRequest(BaseModel): max_batch_exp: int = Field( default=2, description="Max batch size as power of 2 exponent" ) + # Optional network config credentials for privileged apply on shards + netcfg_password: Optional[str] = Field( + default=None, description="Admin password on shards for network config" + ) + netcfg_persist: Optional[bool] = Field( + default=False, description="Persist sudoers rule for future runs" + ) + netcfg_user: Optional[str] = Field( + default=None, description="User to grant NOPASSWD to (when persisting)" + ) class ManualDevice(BaseModel): diff --git a/src/dnet/core/network/create.py b/src/dnet/core/network/create.py new file mode 100644 index 00000000..1ee88691 --- /dev/null +++ b/src/dnet/core/network/create.py @@ -0,0 +1,169 @@ + +import ipaddress +from pydantic import BaseModel, Field +from typing import List, Optional, Literal, Dict, Tuple +from dnet.core.types.topology import TopologyInfo +from dnet_p2p import DnetDeviceProperties +from dnet.utils.logger import logger +from dnet.core.types.topology import ( + TopologyInfo as TypesTopologyInfo, +) + +# Globals +TB_MDP_COST = 10 +ETH_MDP_COST = 20 +ETH_FDP_COST = 100 +WIFI_FDB_COST = 200 + +MDP_IPV4_BASE = ipaddress.IPv4Address("10.101.0.0") +FDP_IPV4_BASE = ipaddress.IPv4Address("10.101.128.0") +AP_IPV4_BASE = ipaddress.IPv4Address("10.101.160.0") + +MDP_INC = 0 +FDP_INV = 0 +AP_INV = 0 + +# Models +# NOTE: for now the FDP route is empty +class NetworkRoute(BaseModel): + node: str = Field(..., description="Name of neighbor node") + mdp_self_ipv4: str = Field( ..., description="Primary path self IPv4") + mdp_ipv4: str = Field( ..., description="Primary path IPv4") + mdp_interface: str = Field( ..., description="inet for primary path") + mdp_cost: int = Field(..., description="Routing cost for primary path") + fdp_ipv4: str = Field(..., description="Fallback path IPv4") + fdp_interface: str = Field( ..., description="inet for fallback path") + fdp_cost: int = Field(..., description="Routing cost for fallback path") + +class NodeNetworkPlan(BaseModel): + instance: str = Field(..., description="") + mgmt_ipv4: str = Field(..., description="Management IPv4") + routes: List[NetworkRoute] = Field( + default_factory=list, description="Per-neighbor route plans" + ) + tb_bridges: List[str] = Field( + default_factory=list, + description="per‑port Thunderbolt bridge interface names", + ) + +# L2/L3 topology for orchestration network. +class NetworkTopologyPlan(BaseModel): + nodes: Dict[str, NodeNetworkPlan] = Field( + default_factory=dict, description="Per‑node network plans" + ) + +def _get_tb_link(a: DnetDeviceProperties, b: DnetDeviceProperties): + if not a.thunderbolt and not b.thunderbolt: + return (False, None, None) + for b_host, b_connected in b.thunderbolt.instances: + for b_con in b_connected: + if hasattr(a.thunderbolt, "instances"): + for a_host, _ in a.thunderbolt.instances: + if b_con.uuid == a_host.uuid: + return (True, a_host.uuid, b_host.uuid) + return (False, None, None) + +def topo_to_network(topo: TypesTopologyInfo) -> NetworkTopologyPlan: + _reset_addr_inc() + nodes: Dict[str, NodeNetworkPlan] = {} + dev_by_name = {d.instance: d for d in topo.devices} + br_count: Dict[str, int] = {} + tb_uuid_to_bridge: Dict[str, Dict[str, str]] = {} + links: set[tuple[str, str]] = set() + + for name, dev in dev_by_name.items(): + nodes[name] = NodeNetworkPlan( + instance=name, + mgmt_ipv4="", + routes=[], + tb_bridges=[], + ) + + for ass in topo.assignments: + if not ass.next_instance: continue + link = tuple(sorted([ass.instance, ass.next_instance])) + links.add(link) + + for a_name, b_name in links: + a = dev_by_name.get(a_name) + b = dev_by_name.get(b_name) + connected, uuid_a, uuid_b = _get_tb_link(a, b) + if not connected or not uuid_a or not uuid_b: + # TODO: Fallback to ethernet or wifi + continue + + # map bridges to connections + if a_name not in tb_uuid_to_bridge: tb_uuid_to_bridge[a_name] = {} + if uuid_a not in tb_uuid_to_bridge[a_name]: + idx = br_count.get(a_name, 0) + 1 + br_count[a_name] = idx + br_a = f"bridge{idx}" + tb_uuid_to_bridge[a_name][uuid_a] = br_a + if br_a not in nodes[a_name].tb_bridges: + nodes[a_name].tb_bridges.append(br_a) + else: + br_a = tb_uuid_to_bridge[a_name][uuid_a] + + if b_name not in tb_uuid_to_bridge: tb_uuid_to_bridge[b_name] = {} + if uuid_b not in tb_uuid_to_bridge[b_name]: + idx = br_count.get(b_name, 0) + 1 + br_count[b_name] = idx + br_b = f"bridge{idx}" + tb_uuid_to_bridge[b_name][uuid_b] = br_b + if br_b not in nodes[b_name].tb_bridges: + nodes[b_name].tb_bridges.append(br_b) + else: + br_b = tb_uuid_to_bridge[b_name][uuid_b] + + ip_a, ip_b = _alloc_mdp_link() + nodes[a_name].routes.append( + NetworkRoute( + node=b_name, + mdp_self_ipv4=ip_a, + mdp_ipv4=ip_b, + mdp_interface=br_a, + mdp_cost=TB_MDP_COST, + fdp_ipv4="", + fdp_interface="", + fdp_cost=0, + ) + ) + nodes[b_name].routes.append( + NetworkRoute( + node=a_name, + mdp_self_ipv4=ip_b, + mdp_ipv4=ip_a, + mdp_interface=br_b, + mdp_cost=TB_MDP_COST, + fdp_ipv4="", + fdp_interface="", + fdp_cost=0, + ) + ) + return NetworkTopologyPlan(nodes=nodes) + +def _reset_addr_inc(): + global MDP_INC + global FDP_INC + global AP_INC + MDP_INC = 0 + FDP_INC = 0 + AP_INC = 0 + +def _alloc_mdp_link() -> Tuple[str, str]: + global MDP_INC + root = int(MDP_IPV4_BASE) + MDP_INC + MDP_INC += 2 + return f"{ipaddress.IPv4Address(root)}", f"{ipaddress.IPv4Address(root+1)}" + +def _alloc_fdp_link() -> Tuple[str, str]: + global FDP_INC + root = int(FDP_IPV4_BASE) + FDP_INC + FDP_INC += 2 + return f"{ipaddress.IPv4Address(root)}", f"{ipaddress.IPv4Address(root+1)}" + +def _alloc_ap_link() -> Tuple[str, str]: + global AP_INC; + root = int(AP_IPV4_BASE) + AP_INC + AP_INC += 2 + return f"{ipaddress.IPv4Address(root)}", f"{ipaddress.IPv4Address(root+1)}" diff --git a/src/dnet/core/network/netcfg.py b/src/dnet/core/network/netcfg.py new file mode 100644 index 00000000..4283e469 --- /dev/null +++ b/src/dnet/core/network/netcfg.py @@ -0,0 +1,956 @@ +#!/usr/bin/env python3 + +import os +import getpass +import socket +import re +import sys +import json +import argparse +import ipaddress +import subprocess +import socket +from typing import Optional + + +def is_perm_err(s: str) -> bool: + return ( + "permission denied" in s + or "operation not permitted" in s + or "sioc" in s.lower() + ) + + +# Return: returncode, output, stderr +# NOTE: prints are written in stderr to separate from json output in stdout +def __exec(cmd: list[str]) -> tuple[int, str, str]: + try: + print("$ " + " ".join(cmd), file=sys.stderr) + p = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=False, + text=True, + ) + out = (p.stdout or "").strip() + err = (p.stderr or "").strip() + if p.returncode != 0: + print( + f"WARN: Command failed ({p.returncode}): {' '.join(cmd)}\n{err}", + file=sys.stderr, + ) + return p.returncode, out, err + except Exception as e: + print(f"ERROR: Command execution failed for {cmd}: {e}", file=sys.stderr) + return 127, "", str(e) + + +def _ensure_up(obj: str): + rc, _, err = __exec(["ifconfig", obj, "up"]) + if rc != 0 and is_perm_err(err): + raise PermissionError(err) + + +# Bridges and Interfaces ======= + + +def _bridge_exists(name: str) -> bool: + rc, _, _ = __exec(["ifconfig", name]) + return rc == 0 + + +def _create_bridge(bridge: str) -> bool: + if _bridge_exists(bridge): + return True + rc, _, err = __exec(["ifconfig", bridge, "create"]) + if rc != 0: + if "File exists" not in err: + raise RuntimeError(f"Failed to create {bridge}: {err}") + return rc == 0 + + +def _set_bridge_status(bridge: str, enabled: bool): + op = "up" if enabled else "down" + rc, _, err = __exec(["ifconfig", bridge, op]) + if rc != 0 and is_perm_err(err): + raise PermissionError(err) + return rc == 0 + + +def _ensure_bridge(bridge: str) -> None: + if not _bridge_exists(bridge): + _create_bridge(bridge) + _set_bridge_status(bridge, enabled=True) + + +def _get_bridge_members(name: str) -> list[str]: + rc, out, _ = __exec(["ifconfig", name]) + if rc != 0: + return [] + members: list[str] = [] + for line in out.splitlines(): + m = re.search(r"\bmember:\s+(\w+)\b", line) + if m: + members.append(m.group(1)) + return members + + +def _list_bridges() -> list[str]: + rc, out, _ = __exec(["ifconfig", "-l"]) + if rc == 0 and out: + ifs = out.split() + return [i for i in ifs if i.startswith("bridge")] + return [] + + +def _destroy_bridge(br: Optional[str] = None) -> list[dict[str, str]]: + actions: list[dict[str, str]] = [] + if br == "bridge0": + return actions + rc, _, err = __exec(["ifconfig", br, "destroy"]) + if rc != 0 and is_perm_err(err): + raise PermissionError(err) + actions.append({"destroyed": br}) + return actions + + +def _get_thunderbolt_interfaces() -> list[str]: + rc, out, _ = __exec(["networksetup", "-listallhardwareports"]) + if rc != 0: + return [] + cur_port = None + en_list: list[str] = [] + for line in out.splitlines(): + if line.startswith("Hardware Port:"): + cur_port = line.split(":", 1)[1].strip() + elif line.startswith("Device:") and cur_port: + dev = line.split(":", 1)[1].strip() + if "Thunderbolt" in cur_port and dev.startswith("en"): + en_list.append(dev) + cur_port = None + return en_list + + +def _get_tb_ports() -> list[dict[str, str]]: + rc, out, err = __exec(["system_profiler", "SPThunderboltDataType", "-json"]) + if rc != 0: + raise RuntimeError(err or "system_profiler_failed") + try: + data = json.loads(out or "{}") + except Exception: + data = {} + arr = data.get("SPThunderboltDataType", []) or [] + ports: list[dict[str, str]] = [] + for item in arr: + name = str(item.get("_name", "")) + uuid = str(item.get("domain_uuid_key", "")) + for k, v in item.items(): + if not isinstance(v, dict): + continue + if ( + not (k.startswith("receptacle_") and k.endswith("_tag")) + and k != "receptacle_upstream_ambiguous_tag" + ): + continue + status = str(v.get("receptacle_status_key", "")) + if status and status in [ + "receptacle_connected", + "receptacle_no_devices_connected", + ]: + rec = { + "bus": name, + "uuid": uuid, + "receptacle": str(v.get("receptacle_id_key", "")), + "status": status, + "speed": str(v.get("current_speed_key", "")), + "link": str(v.get("link_status_key", "")), + } + ports.append(rec) + return ports + + +def _find_interface(en: str) -> Optional[str]: + for br in _list_bridges(): + if en in _get_bridge_members(br): + return br + return None + + +# NOTE: +# Interfaces that are up can reject PROMISC mode transition +# forced by addm so we explicitly disable them before every move +def _move_interface(from_br: str, to_br: str, en: str) -> None: + _ensure_bridge(to_br) + _set_interface_status(en, enabled=False) + _detach_interface(en, from_br) + _attach_interface(en, to_br) + _set_interface_status(en, enabled=True) + _set_bridge_status(to_br, enabled=True) + + +def _set_interface_status(en: str, enabled: bool) -> list[dict[str, str]]: + op = "up" if enabled else "down" + rc, _, err = __exec(["ifconfig", en, op]) + if rc != 0 and is_perm_err(err): + raise PermissionError(err) + + +def _set_interface_mtu(en: str, mtu: int) -> list[dict[str, str]]: + actions: list[dict[str, str]] = [] + rc, _, err = __exec(["ifconfig", en, "mtu", mtu]) + if rc != 0 and is_perm_err(err): + raise PermissionError(err) + actions.append({"updated": en, "mtu": mtu}) + return actions + +def _get_interface_mtu(en: str) -> Optional[int]: + rc, out, _ = __exec(["ifconfig", en]) + if rc != 0 or not out: + return None + m = re.search(r"\bmtu\s+(\d+)\b", out) + if not m: + return None + try: + return int(m.group(1)) + except Exception: + return None + + +def _attach_interface(en: str, bridge: str) -> list[dict[str, str]]: + actions: list[dict[str, str]] = [] + _set_interface_status(en, enabled=False) + rc, _, err = __exec(["ifconfig", bridge, "addm", en]) + if rc != 0 and is_perm_err(err): + raise PermissionError(err) + _set_interface_status(en, enabled=True) + actions.append({"attached": en, "to": bridge}) + return actions + + +def _detach_interface(en: str, br: str = None) -> list[dict[str, str]]: + actions: list[dict[str, str]] = [] + if not br: + return actions + rc, _, err = __exec(["ifconfig", br, "deletem", en]) + if rc != 0 and is_perm_err(err): + raise PermissionError(err) + actions.append({"detached": en, "from": br}) + return actions + + +# IPv4 Control ======= + + +def _assign_ip(ifname: str, ip: str, netmask: str) -> None: + rc, _, err = __exec(["ifconfig", ifname, "inet", ip, netmask, "alias"]) + if rc != 0: + if is_perm_err(err): + raise PermissionError(err) + rc2, _, err2 = __exec(["ifconfig", ifname, "inet", ip, netmask]) + if rc2 != 0 and is_perm_err(err2): + raise PermissionError(err2) + + +def _route_to_peer(ifname: str, peer: str, netmask: str) -> int: + rc, _, err = __exec(["route", "change", peer, "-interface", ifname]) + if rc != 0 and is_perm_err(err): + raise PermissionError(err) + return -1 + return 0 + + +def ip_lists(ifname: str) -> list[str]: + rc, out, _ = __exec(["ifconfig", ifname]) + addrs: list[str] = [] + if rc != 0: + return addrs + for line in (out or "").splitlines(): + line = line.strip() + if line.startswith("inet ") and "inet6" not in line: + parts = line.split() + if len(parts) >= 2: + addrs.append(parts[1]) + return addrs + + +def ip_add(ifname: str, ip: str, netmask: str) -> list[dict[str, str]]: + _assign_ip(ifname, ip, netmask) + _set_interface_status(ifname, enabled=True) + return [{"ipv4": ip, "on": ifname, "netmask": netmask}] + + +def ip_del(ifname: str, ip: str) -> list[dict[str, str]]: + rc, _, err = __exec(["ifconfig", ifname, "-alias", ip]) + if rc != 0 and is_perm_err(err): + raise PermissionError(err) + return [{"removed": ip, "on": ifname}] + + +def ip_clear(interfaces: Optional[list[str]] = None) -> list[dict[str, str]]: + actions: list[dict[str, str]] = [] + ifs = interfaces or _list_bridges() + for ifn in ifs: + for ip in ip_lists(ifn): + actions += ip_del(ifn, ip) + return actions + + +# Network ======= + + +def apply_network_mappings(mappings: list[dict]) -> dict: + bridges = {str(m.get("mdp_interface")) for m in mappings if m.get("mdp_interface")} + for br in sorted(bridges): + _ensure_bridge(br) + + assigned: dict[str, str] = {} + used_ens: set[str] = set() + current_b0 = set(_get_bridge_members("bridge0")) + + actions: list[dict[str, str]] = [] + + for m in mappings: + br_obj = m.get("mdp_interface") + if not br_obj: + continue + target_br = str(br_obj) + if not re.match(r"^(en|bridge)\d+$", target_br): + raise ValueError("invalid_interface") + if target_br in assigned: + continue + + current_en = m.get("thunderbolt_en", m.get("tb_self_en")) + if not current_en: + print( + f"WARN: No available Thunderbolt en* port to attach to {target_br}", + file=sys.stderr, + ) + continue + if not re.match(r"^(en|bridge)\d+$", str(current_en)): + raise ValueError("invalid_interface") + if current_en in used_ens: + raise RuntimeError("duplicate_interface") + used_ens.add(current_en) + + from_br = _find_interface(str(current_en)) or "" + _move_interface(from_br, target_br, current_en) + assigned[target_br] = current_en + actions.append( + {"bridge": target_br, "member": current_en, "moved_from": from_br or "none"} + ) + + for m in mappings: + self_ip = m.get("mdp_self_ipv4") + peer_ip = m.get("mdp_ipv4") + iface = m.get("mdp_interface") + if self_ip and peer_ip and iface: + try: + ipaddress.IPv4Address(str(self_ip)) + ipaddress.IPv4Address(str(peer_ip)) + except Exception: + raise ValueError("invalid_ip") + netmask = "255.255.255.254" + _assign_ip(iface, self_ip, netmask) + if self_ip not in ip_lists(iface): + raise RuntimeError("Failed assign IPv4 Address: {self_ip}") + actions.append( + { + "bridge": iface, + "addr": self_ip, + "netmask": netmask, + } + ) + if _route_to_peer(iface, peer_ip, netmask) < 0: + raise RuntimeError( + "Failed to update routing table with peer IP: {peer_ip}" + ) + return { + "status": "ok", + "actions": actions, + "assigned": assigned, + "current_bridge0": sorted(current_b0), + } + +# Sudoers registration ======== + +# visudo rejects unsafe paths and permissions +# add executable here for registration +def register_in_homebrew_bin() -> list[dict[str, str]]: + actions: list[dict[str, str]] = [] + dest = "/opt/homebrew/bin/dnet-netcfg" + src = os.path.abspath(__file__) + with open(src, "r", encoding="utf-8") as f: + content = f.read() + os.makedirs(os.path.dirname(dest), exist_ok=True) + with open(dest, "w", encoding="utf-8") as f: + f.write(content) + os.chmod(dest, 0o755) + actions.append({"installed": dest, "from": src}) + user = os.environ.get("SUDO_USER") or os.environ.get("USER") or getpass.getuser() + actions += register_sudoers(user) + return actions + + +def register_sudoers(user: str) -> list[dict[str, str]]: + bin_path = "/opt/homebrew/bin/dnet-netcfg" + if not os.path.exists(bin_path): + raise FileNotFoundError(bin_path) + st = os.stat(bin_path) + if (st.st_mode & 0o022) != 0: + raise RuntimeError("unsafe_binary_permissions") + line = f"{user} ALL=(root) NOPASSWD: {bin_path}\n" + path = "/etc/sudoers.d/dnet-netcfg" + tmp = path + ".tmp" + with open(tmp, "w") as f: + f.write(line) + os.chmod(tmp, 0o440) + rc, _, err = __exec(["visudo", "-cf", tmp]) + if rc != 0: + try: + os.remove(tmp) + except FileNotFoundError: + pass + raise RuntimeError("sudoers_validation_failed") + os.replace(tmp, path) + return [{"wrote": path, "user": user, "bin": bin_path}] + + +def remove_sudoers() -> list[dict[str, str]]: + path = "/etc/sudoers.d/dnet-netcfg" + try: + os.remove(path) + rc = 0 + except FileNotFoundError: + rc = 0 + except PermissionError as e: + raise PermissionError(str(e)) + return [{"removed": path, "rc": str(rc)}] + + +def reset_network(also_remove_sudoers: bool = False) -> dict: + actions: list[dict[str, str]] = [] + dest = "bridge0" + _ensure_bridge(dest) + _ensure_up(dest) + for br in _list_bridges(): + if br == dest: continue + for en in _get_bridge_members(br): + _move_interface(br, dest, en) + actions.append({"moved": en, "from": br, "to": dest}) + for br in _list_bridges(): + if br == dest: + continue + actions += _destroy_bridge(br) + if also_remove_sudoers: + actions += remove_sudoers() + return {"status": "ok", "reset": True, "actions": actions} + + +def status() -> dict: + brs = _list_bridges() + members = {br: _get_bridge_members(br) for br in brs} + tb = _get_thunderbolt_interfaces() + addrs = {br: ip_lists(br) for br in brs} + return { + "ok": True, + "bridges": brs, + "members": members, + "thunderbolt": tb, + "addresses": addrs, + } + + +# Network plan execution ====== + +def read_plan(path: Optional[str]) -> list[dict]: + if path: + with open(path, "r") as f: + obj = json.load(f) + else: + data = sys.stdin.read() + obj = json.loads(data) + if isinstance(obj, dict) and "mappings" in obj: + return list(obj["mappings"]) + if isinstance(obj, list): + return obj + raise SystemExit(2) + + +def _validate_plan(mappings: list[dict]) -> dict: + errors: list[str] = [] + warnings: list[str] = [] + used_en: set[str] = set() + for i, m in enumerate(mappings): + iface = m.get("mdp_interface") + if not iface or not re.match(r"^(en|bridge)\d+$", str(iface)): + errors.append(f"mapping[{i}]: invalid_interface") + en = m.get("thunderbolt_en") or m.get("tb_self_en") + if en and not re.match(r"^en\d+$", str(en)): + errors.append(f"mapping[{i}]: invalid_member") + if en: + if str(en) in used_en: + errors.append(f"mapping[{i}]: duplicate_interface:{en}") + used_en.add(str(en)) + sip = m.get("mdp_self_ipv4") + pip = m.get("mdp_ipv4") + try: + ipaddress.IPv4Address(str(sip)) + ipaddress.IPv4Address(str(pip)) + except Exception: + errors.append(f"mapping[{i}]: invalid_ip") + ok = len(errors) == 0 + return {"ok": ok, "errors": errors, "warnings": warnings, "count": len(mappings)} + + +def _test_ping(peer: str, count: int = 2) -> dict: + rc, out, err = __exec(["ping", "-n", "-c", str(int(count)), peer]) + return {"ok": rc == 0, "peer": peer, "exit": rc, "stdout": out} + + +def _test_route(peer: str, expect: Optional[str] = None) -> dict: + rc, out, err = __exec(["route", "-n", "get", peer]) + iface = dest = gw = None + for line in (out or "").splitlines(): + line = line.strip() + if line.startswith("interface:"): + iface = line.split(":", 1)[1].strip() + elif line.startswith("destination:"): + dest = line.split(":", 1)[1].strip() + elif line.startswith("gateway:"): + gw = line.split(":", 1)[1].strip() + direct = bool(dest and dest != "default" and gw and gw.startswith("link#")) + ok = bool(rc == 0 and iface and (expect is None or iface == expect)) + return { + "ok": ok, + "peer": peer, + "interface": iface or "", + "expect": expect or "", + "destination": dest or "", + "gateway": gw or "", + "direct": direct, + "exit": rc, + } + + +# ping and iperf3 ====== + +def _test_listen(port: int, token: Optional[str] = None, timeout: float = 5.0) -> dict: + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + s.settimeout(float(timeout)) + s.bind(("0.0.0.0", int(port))) + try: + data, addr = s.recvfrom(2048) + ok = True + text = data.decode(errors="ignore") + if token: + ok = text.strip() == str(token) + try: + s.sendto(data, addr) + except Exception: + pass + return {"ok": ok, "from": f"{addr[0]}:{addr[1]}", "data": text} + except socket.timeout: + return {"ok": False, "error": "timeout"} + finally: + try: + s.close() + except Exception: + pass + + +def _test_iperf3_server(port: int = 5201, bind: Optional[str] = None, one_shot: bool = True) -> dict: + cmd = ["iperf3", "-s"] + if one_shot: + cmd.append("-1") + cmd += ["-p", str(int(port))] + if bind: + cmd += ["-B", str(bind)] + rc, out, err = __exec(cmd) + return {"ok": rc == 0, "port": port, "bind": bind or "", "exit": rc, "stdout": out} + + +def _test_iperf3_client(peer: str, port: int = 5201, time: int = 3) -> dict: + cmd = ["iperf3", "-c", str(peer), "-p", str(int(port)), "-t", str(int(time)), "-J"] + rc, out, err = __exec(cmd) + res: dict = {"ok": rc == 0, "peer": peer, "port": port, "exit": rc} + if out: + try: + data = json.loads(out) + end = data.get("end", {}) if isinstance(data, dict) else {} + sum_r = end.get("sum_received", {}) if isinstance(end, dict) else {} + sum_s = end.get("sum_sent", {}) if isinstance(end, dict) else {} + bps = sum_r.get("bits_per_second") or sum_s.get("bits_per_second") + if bps is not None: + res["bits_per_second"] = bps + res["iperf3"] = data + except Exception: + res["stdout"] = out + return res + + +# persistent service ==== + +def _service_exists(name: str) -> bool: + rc, out, _ = __exec(["networksetup", "-listallnetworkservices"]) + if rc != 0: + return False + for line in (out or "").splitlines(): + line = line.strip() + if not line or line.startswith("An asterisk"): + continue + if line.startswith("*"): + line = line[1:].strip() + if line == name: + return True + return False + + +def _ensure_service_for_iface(service: str, iface: str) -> None: + if not _service_exists(service): + __exec(["networksetup", "-createnetworkservice", service, iface]) + + +def _persist_mappings(mappings: list[dict]) -> dict: + actions: list[dict[str, str]] = [] + for m in mappings: + iface = m.get("mdp_interface") + self_ip = m.get("mdp_self_ipv4") + if not iface or not self_ip: + continue + iface = str(iface) + self_ip = str(self_ip) + service = f"DNET-{iface}" + _ensure_service_for_iface(service, iface) + netmask = "255.255.255.254" + __exec(["ipconfig", "set", iface, "MANUAL", self_ip, netmask]) + actions.append({"persisted": iface, "service": service, "ip": self_ip, "netmask": netmask}) + return {"ok": True, "actions": actions} + + +# argument parsing ==== + +def main() -> None: + ap = argparse.ArgumentParser( + prog="dnet-netcfg", + description="Utility to control bridges and Thunderbolt interfaces on macOS", + ) + ap.add_argument( + "--file", + "-f", + help="Path to JSON plan for apply mode. If omitted, reads stdin.", + ) + + sub = ap.add_subparsers(dest="cmd") + + # APPLY + p_apply = sub.add_parser("apply", help="Apply network mappings from JSON plan") + p_apply.add_argument("--file", "-f", help="Path to JSON plan (else read stdin)") + + # STATUS + sub.add_parser( + "status", help="Show bridges, members, Thunderbolt interfaces, and addresses" + ) + + # NETWORK + p_net = sub.add_parser("network", help="Network operations") + net = p_net.add_subparsers(dest="net_cmd") + + p_net_apply = net.add_parser("apply", help="Apply network mappings from JSON plan") + p_net_apply.add_argument("--file", "-f", help="Path to JSON plan (else read stdin)") + + p_net_validate = net.add_parser("validate", help="Validate plan JSON") + p_net_validate.add_argument("--file", "-f", help="Path to JSON plan (else read stdin)") + + p_net_reset = net.add_parser( + "reset", help="Reset members to bridge0 and destroy extras" + ) + p_net_reset.add_argument( + "--also-remove-sudoers", + action="store_true", + help="Also remove sudoers rule during reset", + ) + + net.add_parser( + "status", help="Show bridges, members, Thunderbolt interfaces, and addresses" + ) + + # TEST COMMANDS (minimal) + p_test = sub.add_parser("test", help="Connectivity tests") + test = p_test.add_subparsers(dest="test_cmd") + p_t_ping = test.add_parser("ping", help="ICMP ping a peer") + p_t_ping.add_argument("peer", help="Peer IPv4") + p_t_ping.add_argument("--count", type=int, default=2) + p_t_route = test.add_parser("route", help="Show route to a peer") + p_t_route.add_argument("peer", help="Peer IPv4") + p_t_route.add_argument("--expect", help="Expected egress interface (e.g. bridge1)") + p_t_is = test.add_parser("iperf3-server", help="Run iperf3 in server mode (one-shot)") + p_t_is.add_argument("--port", type=int, default=5201) + p_t_is.add_argument("--bind", default=None, help="Bind IP") + p_t_ic = test.add_parser("iperf3-client", help="Run iperf3 client to peer") + p_t_ic.add_argument("peer", help="Peer IPv4") + p_t_ic.add_argument("--port", type=int, default=5201) + p_t_ic.add_argument("--time", type=int, default=3) + + # BRIDGE COMMANDS + p_br = sub.add_parser("bridge", help="Bridge operations") + br = p_br.add_subparsers(dest="br_cmd") + p_br_create = br.add_parser("create", help="Create and bring up bridges") + p_br_create.add_argument("name", nargs="+", help="Bridge name(s)") + p_br_destroy = br.add_parser("destroy", help="Destroy bridge") + p_br_destroy.add_argument("bridge", help="Specific bridge to destroy") + + p_br_attach = br.add_parser("attach", help="Attach interface to bridge") + p_br_attach.add_argument("iface", help="Interface (e.g. en3)") + p_br_attach.add_argument("bridge", help="Bridge (e.g. bridge1)") + + p_br_detach = br.add_parser("detach", help="Detach interface from bridge") + p_br_detach.add_argument("iface", help="Interface (e.g., en3)") + p_br_detach.add_argument("bridge", help="Bridge (e.g. bridge0)") + + p_br_move = br.add_parser( + "move", help="Move member interface from source bridges to destination" + ) + p_br_move.add_argument("iface", help="Interface (e.g. en3)") + p_br_move.add_argument("src_br", help="Source bridge") + p_br_move.add_argument("dst_br", default="bridge0", help="Destination bridge") + + # IFACE COMMANDS + p_if = sub.add_parser("iface", help="Interface operations") + iff = p_if.add_subparsers(dest="iface_cmd") + + p_if_add = iff.add_parser("ip-add", help="Add IPv4 to interface") + p_if_add.add_argument("iface", help="Interface") + p_if_add.add_argument("ip", help="IPv4") + p_if_add.add_argument("netmask", help="Netmask") + + p_if_del = iff.add_parser("ip-del", help="Remove IPv4 from interface") + p_if_del.add_argument("iface", help="Interface") + p_if_del.add_argument("ip", help="IPv4") + + p_if_mtu = iff.add_parser("mtu", help="Set MTU value for interface.") + p_if_mtu.add_argument("iface", help="Interface") + p_if_mtu.add_argument("rate", help="MTU Value") + + # THUNDERBOLT COMMANDS + p_tb = sub.add_parser("thunderbolt", help="Thunderbolt operations") + tb = p_tb.add_subparsers(dest="tb_cmd") + tb.add_parser("list", help="List Thunderbolt interfaces") + p_tb_rec = tb.add_parser("ports", help="List Thunderbolt ports with status filtering") + p_tb_rec.add_argument( + "--status", + action="append", + dest="status", + help="Filter by status (repeatable). Defaults to connected/no_devices_connected", + ) + + p_sudo = sub.add_parser("sudoers", help="Sudoers operations") + sd = p_sudo.add_subparsers(dest="sudo_cmd") + p_sudo_reg = sd.add_parser("register", help="Register sudoers rule for dnet-netcfg") + p_sudo_reg.add_argument("--user", required=True, help="Username to grant NOPASSWD") + sd.add_parser("remove", help="Remove sudoers rule for dnet-netcfg") + + p_reset = sub.add_parser( + "reset", help="Reset members back and destroy extra bridges" + ) + p_reset.add_argument( + "--also-remove-sudoers", + action="store_true", + help="Also remove sudoers rule during reset", + ) + + ns = ap.parse_args() + try: + cmd = ns.cmd + + # Read config file and execute + if not cmd: + if not ns.file and sys.stdin.isatty(): + print( + json.dumps( + { + "ok": False, + "error": "usage", + "message": "Use a subcommand or provide --file/STDIN for apply.", + } + ) + ) + sys.exit(2) + plan = read_plan(ns.file) + res = apply_network_mappings(plan) + ok = bool(isinstance(res, dict) and res.get("actions")) + out = {"ok": ok} + if isinstance(res, dict): + out.update(res) + print(json.dumps(out)) + sys.exit(0 if ok else 3) + + if cmd == "apply": + plan = read_plan(ns.file) + res = apply_network_mappings(plan) + ok = bool(isinstance(res, dict) and res.get("actions")) + out = {"ok": ok} + if isinstance(res, dict): + out.update(res) + print(json.dumps(out)) + sys.exit(0 if ok else 3) + + if cmd == "status": + print(json.dumps(status())) + sys.exit(0) + + if cmd == "reset": + r = reset_network( + also_remove_sudoers=bool(getattr(ns, "also_remove_sudoers", False)) + ) + actions = r.get("actions", []) + print(json.dumps({"ok": True, "actions": actions})) + sys.exit(0) + + if cmd == "network": + if not ns.net_cmd: + p_net.print_help() + sys.exit(2) + + match ns.net_cmd: + case "apply": + plan = read_plan(ns.file) + res = apply_network_mappings(plan) + ok = bool(isinstance(res, dict) and res.get("actions")) + out = {"ok": ok} + if isinstance(res, dict): + out.update(res) + actions = out + case "validate": + plan = read_plan(ns.file) + out = _validate_plan(plan) + print(json.dumps(out)) + sys.exit(0 if out.get("ok") else 3) + case "reset": + r = reset_network( + also_remove_sudoers=bool( + getattr(ns, "also_remove_sudoers", False) + ) + ) + actions = r.get("actions", []) + case "status": + print(json.dumps(status())) + sys.exit(0) + + print(json.dumps({"ok": True, "actions": actions})) + sys.exit(0) + + if cmd == "bridge": + if not ns.br_cmd: + p_br.print_help() + sys.exit(2) + + match ns.br_cmd: + case "create": + actions: list[dict[str, str]] = [] + for name in ns.name: + _ensure_bridge(name) + actions.append({"ensured": name}) + case "destroy": + actions = _destroy_bridge(ns.bridge) + case "attach": + actions = _attach_interface(ns.iface, ns.bridge) + case "detach": + actions = _detach_interface(ns.iface, from_bridge=ns.bridge) + case "move": + iface = ns.iface + source = ns.src_br + dest = ns.dst_br or "bridge0" + _ensure_bridge(ns.dst_br) + actions: list[dict[str, str]] = [] + if ns.src_br != ns.dst_br and iface in _get_bridge_members( + ns.src_br + ): + _move_interface(ns.src_br, ns.dst_br, ns.iface) + actions.append( + {"moved": ns.iface, "from": ns.src_br, "to": ns.dst_br} + ) + + print(json.dumps({"ok": True, "actions": actions})) + sys.exit(0) + + if cmd == "iface": + if not ns.iface_cmd: + p_if.print_help() + sys.exit(2) + + match ns.iface_cmd: + case "ip-add": + actions = ip_add(ns.iface, ns.ip, ns.netmask) + case "ip-del": + actions = ip_del(ns.iface, ns.ip) + case "mtu": + actions = _set_interface_mtu(ns.iface, ns.rate) + + print(json.dumps({"ok": True, "actions": actions})) + sys.exit(0) + + if cmd == "test": + if not ns.test_cmd: + p_test.print_help() + sys.exit(2) + if ns.test_cmd == "ping": + out = _test_ping(ns.peer, int(getattr(ns, "count", 2))) + print(json.dumps(out)) + sys.exit(0 if out.get("ok") else 1) + if ns.test_cmd == "route": + out = _test_route(ns.peer, getattr(ns, "expect", None)) + print(json.dumps(out)) + sys.exit(0 if out.get("ok") else 1) + if ns.test_cmd == "iperf3-server": + out = _test_iperf3_server(int(getattr(ns, "port", 5201)), getattr(ns, "bind", None)) + print(json.dumps(out)) + sys.exit(0 if out.get("ok") else 1) + if ns.test_cmd == "iperf3-client": + out = _test_iperf3_client(ns.peer, int(getattr(ns, "port", 5201)), int(getattr(ns, "time", 3))) + print(json.dumps(out)) + sys.exit(0 if out.get("ok") else 1) + + if cmd == "thunderbolt": + if not ns.tb_cmd: + p_tb.print_help() + sys.exit(2) + if ns.tb_cmd == "list": + print( + json.dumps( + {"ok": True, "thunderbolt": _get_thunderbolt_interfaces()} + ) + ) + sys.exit(0) + if ns.tb_cmd == "ports": + print(json.dumps({"ok": True, "ports": _get_tb_ports()})) + sys.exit(0) + + if cmd == "sudoers": + if not ns.sudo_cmd: + p_sudo.print_help() + sys.exit(2) + if ns.sudo_cmd == "register": + actions = register_in_homebrew_bin() + actions += register_sudoers(ns.user) + print(json.dumps({"ok": True, "actions": actions})) + sys.exit(0) + if ns.sudo_cmd == "remove": + actions = remove_sudoers() + print(json.dumps({"ok": True, "actions": actions})) + sys.exit(0) + + print( + json.dumps({"ok": False, "error": "usage", "message": "Unknown subcommand"}) + ) + sys.exit(2) + except PermissionError as e: + print(json.dumps({"ok": False, "error": "permission", "message": str(e)})) + sys.exit(13) + except Exception as e: + print(json.dumps({"ok": False, "error": "exception", "message": str(e)})) + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/src/dnet/shard/http_api.py b/src/dnet/shard/http_api.py index 630fe01c..708306c9 100644 --- a/src/dnet/shard/http_api.py +++ b/src/dnet/shard/http_api.py @@ -21,6 +21,9 @@ ShardProfileRequest, ShardProfileResponse, ShardUnloadModelResponse, + NetworkLink, + NetworkRegistrationRequest, + NetworkRegistrationWithPasswordRequest, ) from dnet_p2p import AsyncDnetP2P from dnet.protos import dnet_ring_pb2 @@ -29,6 +32,8 @@ from dnet.utils.profile_subproc import profile_device_via_subprocess from distilp.common import DeviceProfile from dnet.utils.repack import delete_repacked_layers +import os +from dnet.core.network.netcfg import apply_network_mappings class HTTPServer: @@ -334,3 +339,170 @@ async def cleanup_repacked(request: Request) -> JSONResponse: except Exception as e: logger.error("/cleanup_repacked failed: %s", e) return JSONResponse(status_code=500, content={"error": str(e)}) + + @self.app.post("/register") + async def register_subnet(req: NetworkRegistrationRequest) -> JSONResponse: + """Receive and apply network topology plan from HEAD node""" + if not req.mappings: + return JSONResponse( + status_code=400, content={"error": "No mappings provided"} + ) + logger.info(f"Received /register request: {req}") + plan = [m.model_dump(exclude_none=True) for m in req.mappings] + try: + result = apply_network_mappings(plan) + logger.info(f"RESULT: {result}") + if isinstance(result, dict) and not result.get("actions"): + # TODO: Show actions better + raise PermissionError("No network changes applied") + return JSONResponse(status_code=200, content=result) + except Exception as e: + msg = str(e) + logger.error(f"ERROR RAISED /register: {msg}") + if ( # Try sudo without requesting password + "Operation not permitted" in msg + or "permission denied" in msg + or "SIOCIFCREATE2" in msg + ): + logger.error(f"IN SIOCIFCRATE2 BLOCK") + import subprocess, json, time + + base = os.path.join(os.path.expanduser("~"), ".dria", "dnet") + os.makedirs(base, exist_ok=True) + plan_path = os.path.join( + base, f"plan-{int(time.time() * 1000)}-{os.getpid()}.json" + ) + with open(plan_path, "w") as tf: + json.dump({"mappings": plan}, tf) + logger.error(f"DONE CREATING TEMP FILE") + cmd = ["sudo", "-n", "dnet-netcfg", "--file", plan_path] + p = subprocess.run( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + os.remove(plan_path) + if p.returncode == 0: + data = json.loads(p.stdout or "{}") + return JSONResponse(status_code=200, content=data) + hint = { + "error": "requires_privileged_helper", + "message": "Try the /register_with_password. Or run the command below.", + "command": "sudo dnet-netcfg --file /path/to/plan.json", + } + logger.error(f"DONE RUNNING dnet-netcfg") + return JSONResponse(status_code=403, content=hint) + raise + + @self.app.post("/register_with_password") + async def register_with_password( + req: NetworkRegistrationWithPasswordRequest, + ) -> JSONResponse: + """Receive and apply network topology plan with privilages""" + logger.info(f"Received /register_with_password request: {req}") + import subprocess, json, getpass, time + + if not req.mappings: + return JSONResponse( + status_code=400, + content={"error": "No mappings provided"}, + headers={"Cache-Control": "no-store"}, + ) + plan = [m.model_dump(exclude_none=True) for m in req.mappings] + base = os.path.join(os.path.expanduser("~"), ".dria", "dnet") + os.makedirs(base, exist_ok=True) + plan_path = os.path.join( + base, f"plan-{int(time.time() * 1000)}-{os.getpid()}.json" + ) + with open(plan_path, "w") as tf: + json.dump({"mappings": plan}, tf) + tf.flush() + + # Validate password is correct + pv = subprocess.run( + ["sudo", "-S", "-v"], + input=req.password + "\n", + text=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + if pv.returncode != 0: + os.remove(plan_path) + return JSONResponse( + status_code=401, + content={"error": "Wrong password"}, + headers={"Cache-Control": "no-store"}, + ) + + logger.info("Validated password.") + + # persist sudo by registering sudoers (use cached auth from sudo -v) + if req.persist: + user = req.user or getpass.getuser() + sudoers = f"{user} ALL=(root) NOPASSWD: /usr/local/bin/dnet-netcfg, /sbin/ifconfig, /usr/sbin/networksetup\n" + p = subprocess.run( + ["sudo", "-n", "tee", "/etc/sudoers.d/dnet-netcfg"], + input=sudoers, + text=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + if p.returncode != 0: + return JSONResponse( + status_code=403, + content={"error": "sudoers_write_failed", "stderr": p.stderr}, + headers={"Cache-Control": "no-store"}, + ) + for cmd in ( + ["chmod", "0440", "/etc/sudoers.d/dnet-netcfg"], + ["chown", "root:wheel", "/etc/sudoers.d/dnet-netcfg"], + ["visudo", "-cf", "/etc/sudoers.d/dnet-netcfg"], + ): + p = subprocess.run( + ["sudo", "-n", *cmd], + text=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + if p.returncode != 0: + return JSONResponse( + status_code=403, + content={ + "error": "sudoers_install_failed", + "stderr": p.stderr, + }, + headers={"Cache-Control": "no-store"}, + ) + # Execute plan with privileges + p = subprocess.run( + ["sudo", "-n", "dnet-netcfg", "--file", plan_path], + text=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + if p.returncode != 0: + p = subprocess.run( + ["sudo", "-S", "dnet-netcfg", "--file", plan_path], + input=req.password + "\n", + text=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + rc = p.returncode + out = p.stdout or "{}" + err = p.stderr or "" + try: + os.remove(plan_path) + except Exception: + pass + if rc == 0: + return JSONResponse(status_code=200, content=json.loads(out), headers={"Cache-Control": "no-store"}) + # Non‑zero exit: map 13 (permission) to 403, otherwise 500 + status = 403 if rc == 13 else 500 + try: + body = json.loads(out) + except Exception: + body = {"ok": False, "error": "netcfg_failed", "stderr": err, "exit_code": rc} + body.setdefault("exit_code", rc) + return JSONResponse(status_code=status, content=body, headers={"Cache-Control": "no-store"}) diff --git a/src/dnet/shard/models.py b/src/dnet/shard/models.py index 3b8eed3c..5c81a428 100644 --- a/src/dnet/shard/models.py +++ b/src/dnet/shard/models.py @@ -109,3 +109,34 @@ class HealthResponse(BaseModel): grpc_port: int = Field(..., description="gRPC server port") http_port: int = Field(..., description="HTTP server port") instance: Optional[str] = Field(default=None, description="Shard name") + + +class NetworkLink(BaseModel): + node: str = Field(..., description="Name of neighbouring node") + mdp_ipv4: str = Field(..., description="Main data subnet to neighbour") + mdp_self_ipv4: Optional[str] = Field( + default=None, description="Main data subnet IP on this host" + ) + mdp_interface: str = Field(..., description="Virtual interface of data connection") + mdp_cost: int = Field(..., description="Cost of main data connection") + fdp_ipv4: str = Field(..., description="Fallback subnet to neighbout") + fdp_interface: str = Field(..., description="Virtual interface of fallback connection") + fdp_cost: int = Field(..., description="Cost of fallback connection") + thunderbolt_en: Optional[str] = Field( + default=None, + description="Optional: explicit Thunderbolt en* device name to attach (e.g., 'en6')", + ) + tb_self_uuid: Optional[str] = Field( + default=None, + description="Optional: Thunderbolt port domain UUID for disambiguation", + ) + +class NetworkRegistrationRequest(BaseModel): + mappings: List[NetworkLink] = Field(..., description="List of neighbours") + + +class NetworkRegistrationWithPasswordRequest(BaseModel): + mappings: List[NetworkLink] + password: str + persist: bool = False + user: Optional[str] = None diff --git a/src/dnet/tui.py b/src/dnet/tui.py index 8ad221a1..62574e37 100644 --- a/src/dnet/tui.py +++ b/src/dnet/tui.py @@ -80,6 +80,7 @@ def __init__(self, title: str = "DNET"): # Setup logging self.log_handler = TUILogHandler(self.log_queue) dnet_logger.addHandler(self.log_handler) + self._live: Optional[Live] = None def _generate_header(self) -> Panel: return Panel( @@ -223,7 +224,8 @@ async def run(self, stop_event: asyncio.Event) -> None: """Run the TUI loop until stop_event is set.""" self.is_running = True - with Live(self.layout, console=self.console, refresh_per_second=4, screen=True): + with Live(self.layout, console=self.console, refresh_per_second=4, screen=True) as live: + self._live = live while not stop_event.is_set(): self.layout["header"].update(self._generate_header()) self.layout["body"].update(self._generate_logs()) @@ -234,3 +236,51 @@ async def run(self, stop_event: asyncio.Event) -> None: self.is_running = False dnet_logger.removeHandler(self.log_handler) + self._live = None + + async def prompt_yes_no(self, prompt: str, default: bool = False) -> bool: + def _ask() -> bool: + p = f"{prompt} [{'Y/n' if default else 'y/N'}]: " + ans = input(p).strip().lower() + if not ans: + return default + return ans in ("y", "yes") + + if self._live: + self._live.stop() + try: + return _ask() + finally: + self._live.start() + return _ask() + + async def prompt_text(self, prompt: str, default: Optional[str] = None) -> str: + def _ask() -> str: + p = f"{prompt}" + if default: + p += f" [{default}]" + p += ": " + ans = input(p).strip() + return ans or (default or "") + + if self._live: + self._live.stop() + try: + return _ask() + finally: + self._live.start() + return _ask() + + async def prompt_password(self, prompt: str) -> str: + import getpass as _gp + + def _ask() -> str: + return _gp.getpass(prompt) + + if self._live: + self._live.stop() + try: + return _ask() + finally: + self._live.start() + return _ask() diff --git a/tests/test_netcfg.py b/tests/test_netcfg.py new file mode 100644 index 00000000..3e34de8a --- /dev/null +++ b/tests/test_netcfg.py @@ -0,0 +1,280 @@ +import io +import json +import types +import builtins +import os +import pytest + +import dnet.core.network.netcfg as netcfg + +pytestmark = pytest.mark.netcfg + + +# Fake the output from ifconfig commands +class FakeExec: + def __init__(self, table=None): + self.table = table or {} + self.calls = [] + + def __call__(self, cmd): + key = tuple(cmd) + self.calls.append(key) + rc, out, err = self.table.get(key, (0, "", "")) + return rc, out, err + + +def test_status_parses_members_and_ips(monkeypatch): + fx = FakeExec( + { + ("ifconfig", "-l"): (0, "lo0 en0 en1 bridge0 bridge1", ""), + ("ifconfig", "bridge0"): ( + 0, + "\nmember: en3\ninet 10.0.0.1 netmask 0xffffff00 mtu 1500", + "", + ), + ("ifconfig", "bridge1"): (0, "\nmember: en4\n", ""), + ("ifconfig", "bridge0", "up"): (0, "", ""), + ("networksetup", "-listallhardwareports"): ( + 0, + "Hardware Port: Thunderbolt 1\nDevice: en3\n\nHardware Port: Wi-Fi\nDevice: en1\n", + "", + ), + } + ) + monkeypatch.setattr(netcfg, "__exec", fx) + + out = netcfg.status() + assert out["ok"] is True + assert set(out["bridges"]) >= {"bridge0", "bridge1"} + assert out["members"]["bridge0"] == ["en3"] + assert "en3" in out["thunderbolt"] + + +def test_apply_network_mappings_integration(): + import platform + + if platform.system() != "Darwin": + pytest.skip("macOS only") + + idx = 1 + while netcfg._bridge_exists(f"bridge{idx}"): + idx += 1 + target_br = f"bridge{idx}" + + iface = None + mem0 = netcfg._get_bridge_members("bridge0") + if mem0: + iface = mem0[0] + else: + tbs = netcfg._get_thunderbolt_interfaces() + if tbs: + iface = tbs[0] + + if not iface: + pytest.skip("no target interface to attach") + + plan = [ + { + "mdp_interface": target_br, + "thunderbolt_en": iface, + "mdp_self_ipv4": "10.10.0.0", + "mdp_ipv4": "10.10.0.1", + } + ] + + is_root = hasattr(os, "geteuid") and os.geteuid() == 0 + + if not is_root: + with pytest.raises(Exception) as e: + netcfg.apply_network_mappings(plan) + assert "permit" in str(e.value).lower() or "sioc" in str(e.value).lower() + else: + res = netcfg.apply_network_mappings(plan) + assert res["status"] == "ok" + assert any( + a.get("member") == iface and a.get("bridge") == target_br + for a in res["actions"] + ) + assert any(a.get("addr") == "10.10.0.0" for a in res["actions"]) + + +def test_read_plan_from_file(tmp_path): + p = tmp_path / "plan.json" + plan = { + "mappings": [ + { + "mdp_interface": "bridge3", + "thunderbolt_en": "en4", + "mdp_self_ipv4": "10.0.0.0", + "mdp_ipv4": "10.0.0.1", + } + ] + } + p.write_text(json.dumps(plan)) + + got = netcfg.read_plan(str(p)) + assert isinstance(got, list) and got[0]["mdp_interface"] == "bridge3" + + +def test_register_sudoers_writes_tmp_and_validates(monkeypatch, tmp_path): + files = {} + + def fake_open(path, mode="r", encoding=None): + if "w" in mode: + buf = io.StringIO() + def _close(): + files[path] = buf.getvalue() + buf.close = _close # type: ignore + return buf + return io.StringIO(files.get(path, "")) + + def fake_exists(path): + return path == "/opt/homebrew/bin/dnet-netcfg" + + class Stat: + st_mode = 0o755 + + def fake_stat(path): + return Stat() + + def fake_chmod(path, mode): + return None + + def fake_replace(src, dst): + files[dst] = files.get(src, "") + files.pop(src, None) + + fx = FakeExec({("visudo", "-cf", "/etc/sudoers.d/dnet-netcfg.tmp"): (0, "", "")}) + + monkeypatch.setattr(netcfg, "__exec", fx) + monkeypatch.setattr(netcfg.os.path, "exists", fake_exists) + monkeypatch.setattr(netcfg.os, "stat", fake_stat) + monkeypatch.setattr(netcfg.os, "chmod", fake_chmod) + monkeypatch.setattr(netcfg.os, "replace", fake_replace) + monkeypatch.setattr(builtins, "open", fake_open) + + out = netcfg.register_sudoers("tester") + assert any(a.get("wrote") == "/etc/sudoers.d/dnet-netcfg" for a in out) + assert "/etc/sudoers.d/dnet-netcfg" in files + assert ( + "tester ALL=(root) NOPASSWD: /opt/homebrew/bin/dnet-netcfg" + in files["/etc/sudoers.d/dnet-netcfg"] + ) + + +def test_register_in_homebrew_bin_monkeypatched(monkeypatch): + files = {} + + def fake_open(path, mode="r", encoding=None): + if "w" in mode: + buf = io.StringIO() + + def _close(): + files[path] = buf.getvalue() + + buf.close = _close # type: ignore + return buf + return io.StringIO("print('ok')\n") + + def fake_chmod(path, mode): + return None + + # Prevent actually writing sudoers + monkeypatch.setattr(netcfg, "register_sudoers", lambda user: [{"user": user}]) + monkeypatch.setattr(builtins, "open", fake_open) + monkeypatch.setattr(netcfg.os, "makedirs", lambda d, exist_ok: None) + monkeypatch.setattr(netcfg.os, "chmod", fake_chmod) + monkeypatch.setenv("USER", "tester") + + out = netcfg.register_in_homebrew_bin() + assert any(a.get("installed") == "/opt/homebrew/bin/dnet-netcfg" for a in out) + assert "/opt/homebrew/bin/dnet-netcfg" in files + + +def _require_macos_root(): + import platform + if platform.system() != "Darwin": + pytest.skip("macOS only") + if not (hasattr(os, "geteuid") and os.geteuid() == 0): + pytest.skip("requires root to modify networking") + + +def _next_bridge_name(): + idx = 1 + while netcfg._bridge_exists(f"bridge{idx}"): + idx += 1 + return f"bridge{idx}" + + +def _pick_iface(): + m0 = netcfg._get_bridge_members("bridge0") + if m0: + return m0[0] + tbs = netcfg._get_thunderbolt_interfaces() + if tbs: + return tbs[0] + return None + + +def test_bridge_create_destroy_integration(): + _require_macos_root() + br = _next_bridge_name() + try: + netcfg._ensure_bridge(br) + assert netcfg._bridge_exists(br) + assert netcfg._get_bridge_members(br) == [] + finally: + netcfg._destroy_bridge(br) + assert not netcfg._bridge_exists(br) + + +def test_attach_detach_integration(): + _require_macos_root() + iface = _pick_iface() + if not iface: + pytest.skip("no interface available to move") + src_br = netcfg._find_interface(iface) or "bridge0" + dst_br = _next_bridge_name() + netcfg._ensure_bridge(dst_br) + try: + netcfg._move_interface(src_br, dst_br, iface) + assert iface in netcfg._get_bridge_members(dst_br) + assert iface not in netcfg._get_bridge_members(src_br) + finally: + netcfg._move_interface(dst_br, src_br, iface) + netcfg._destroy_bridge(dst_br) + assert iface in netcfg._get_bridge_members(src_br) + + +def test_ip_assign_and_clear_integration(): + _require_macos_root() + br = _next_bridge_name() + netcfg._ensure_bridge(br) + try: + ip = "10.250.250.0" + mask = "255.255.255.254" + netcfg.ip_add(br, ip, mask) + assert ip in netcfg.ip_lists(br) + netcfg.ip_del(br, ip) + assert ip not in netcfg.ip_lists(br) + finally: + netcfg._destroy_bridge(br) + + +def test_mtu_set_restore_integration(): + _require_macos_root() + iface = _pick_iface() + if not iface: + pytest.skip("no interface available") + orig = netcfg._get_interface_mtu(iface) + assert orig is not None + # Toggle between common safe MTUs on macOS (1500/9000) + new = 9000 if orig != 9000 else 1500 + try: + netcfg._set_interface_mtu(iface, new) + cur = netcfg._get_interface_mtu(iface) + # Some interfaces clamp/ignore non-supported MTUs; accept either target or original + assert cur in (new, orig) + finally: + if orig is not None: + netcfg._set_interface_mtu(iface, orig)