diff --git a/.gitignore b/.gitignore index 6adb588..fb99b84 100644 --- a/.gitignore +++ b/.gitignore @@ -31,4 +31,5 @@ log venv/ __pycache__/ -*.pyc \ No newline at end of file +*.pyc +temp \ No newline at end of file diff --git a/network-graph/graph/standard_graph/erdos_reyni.go b/network-graph/graph/standard_graph/erdos_reyni.go index 6102046..a4db56e 100644 --- a/network-graph/graph/standard_graph/erdos_reyni.go +++ b/network-graph/graph/standard_graph/erdos_reyni.go @@ -11,6 +11,10 @@ func ErdosRenyiGraph(n int, p float64, isUndirected bool) *graph.Graph { g := graph.New(isUndirected) + for i := 0; i < n; i++ { + g.AddNode(node.ID(toString(i))) + } + if isUndirected { for i := 0; i < n; i++ { for j := i + 1; j < n; j++ { diff --git a/p2p/helper.go b/p2p/helper.go new file mode 100644 index 0000000..a3f733f --- /dev/null +++ b/p2p/helper.go @@ -0,0 +1,18 @@ +package p2p + +import ( + "math" + "math/rand" +) + +// LogNormalRand generates a log-normally distributed random number +// with given mu and sigma parameters. +func LogNormalRand(mu, sigma float64, src rand.Source) float64 { + r := rand.New(src) + + u1 := r.Float64() + u2 := r.Float64() + z := math.Sqrt(-2.0*math.Log(u1)) * math.Cos(2*math.Pi*u2) + + return math.Exp(mu + sigma*z) +} diff --git a/p2p/network.go b/p2p/network.go new file mode 100644 index 0000000..8700e63 --- /dev/null +++ b/p2p/network.go @@ -0,0 +1,68 @@ +package p2p + +import ( + "strconv" + + "github.com/elecbug/netkit/network-graph/graph" + "github.com/elecbug/netkit/network-graph/node" +) + +// GenerateNetwork creates a P2P network from the given graph. +// nodeLatency and edgeLatency are functions that generate latencies for nodes and edges respectively. +func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64) (map[ID]*Node, error) { + nodes := make(map[ID]*Node) + maps := make(map[node.ID]ID) + + // create nodes + for _, gn := range g.Nodes() { + num, err := strconv.Atoi(gn.String()) + + if err != nil { + return nil, err + } + + n := &Node{ + ID: ID(num), + Latency: nodeLatency(), + Edges: make(map[ID]Edge), + } + + nodes[n.ID] = n + maps[gn] = n.ID + } + + for _, gn := range g.Nodes() { + num, err := strconv.Atoi(gn.String()) + + if err != nil { + return nil, err + } + + n := nodes[ID(num)] + + for _, neighbor := range g.Neighbors(gn) { + j := maps[neighbor] + + edge := Edge{ + TargetID: ID(j), + Latency: edgeLatency(), + } + + n.Edges[edge.TargetID] = edge + } + } + + return nodes, nil +} + +// RunNetworkSimulation starts the message handling routines for all nodes in the network. +func RunNetworkSimulation(nodes map[ID]*Node) { + for _, n := range nodes { + n.eachRun(nodes) + } +} + +// Publish sends a message to the specified node's message queue. +func Publish(node *Node, msg string) { + node.msgQueue <- Message{From: node.ID, Content: msg} +} diff --git a/p2p/node.go b/p2p/node.go new file mode 100644 index 0000000..e8ab8fd --- /dev/null +++ b/p2p/node.go @@ -0,0 +1,112 @@ +package p2p + +import ( + "sync" + "time" +) + +// ID represents a unique identifier for a node in the P2P network. +type ID uint64 + +// Message represents a message sent between nodes in the P2P network. +type Message struct { + From ID + Content string +} + +// Edge represents a connection from one node to another in the P2P network. +type Edge struct { + TargetID ID + Latency float64 // in milliseconds +} + +// Node represents a node in the P2P network. +type Node struct { + ID ID + Latency float64 + Edges map[ID]Edge + + RecvFrom map[string]map[ID]struct{} // content -> set of senders + SentTo map[string]map[ID]struct{} // content -> set of targets + SeenAt map[string]time.Time // content -> first arrival time + + msgQueue chan Message + mu sync.Mutex +} + +// Degree returns the number of edges connected to the node. +func (n *Node) Degree() int { + return len(n.Edges) +} + +// eachRun starts the message handling routine for the node. +func (n *Node) eachRun(network map[ID]*Node) { + go func() { + n.msgQueue = make(chan Message, 1000) + n.RecvFrom = make(map[string]map[ID]struct{}) + n.SentTo = make(map[string]map[ID]struct{}) + n.SeenAt = make(map[string]time.Time) + + for msg := range n.msgQueue { + first := false + var excludeSnapshot map[ID]struct{} + + n.mu.Lock() + if _, ok := n.RecvFrom[msg.Content]; !ok { + n.RecvFrom[msg.Content] = make(map[ID]struct{}) + } + n.RecvFrom[msg.Content][msg.From] = struct{}{} + + if _, ok := n.SeenAt[msg.Content]; !ok { + n.SeenAt[msg.Content] = time.Now() + first = true + excludeSnapshot = copyIDSet(n.RecvFrom[msg.Content]) + } + n.mu.Unlock() + + if first { + go func(content string, exclude map[ID]struct{}) { + time.Sleep(time.Duration(n.Latency) * time.Millisecond) + n.publish(network, content, exclude) + }(msg.Content, excludeSnapshot) + } + } + }() +} + +// copyIDSet creates a shallow copy of a set of IDs. +func copyIDSet(src map[ID]struct{}) map[ID]struct{} { + dst := make(map[ID]struct{}, len(src)) + for k := range src { + dst[k] = struct{}{} + } + return dst +} + +// publish sends the message to neighbors, excluding 'exclude' and already-sent targets. +func (n *Node) publish(network map[ID]*Node, content string, exclude map[ID]struct{}) { + n.mu.Lock() + if _, ok := n.SentTo[content]; !ok { + n.SentTo[content] = make(map[ID]struct{}) + } + + for _, edge := range n.Edges { + if _, wasSender := exclude[edge.TargetID]; wasSender { + continue + } + if _, already := n.SentTo[content][edge.TargetID]; already { + continue + } + if _, received := n.RecvFrom[content][edge.TargetID]; received { + continue + } + n.SentTo[content][edge.TargetID] = struct{}{} + + edgeCopy := edge + go func(e Edge) { + time.Sleep(time.Duration(e.Latency) * time.Millisecond) + network[e.TargetID].msgQueue <- Message{From: n.ID, Content: content} + }(edgeCopy) + } + n.mu.Unlock() +} diff --git a/p2p/p2p_test.go b/p2p/p2p_test.go new file mode 100644 index 0000000..99f44ad --- /dev/null +++ b/p2p/p2p_test.go @@ -0,0 +1,45 @@ +package p2p_test + +import ( + "math/rand" + "testing" + "time" + + "github.com/elecbug/netkit/network-graph/graph/standard_graph" + "github.com/elecbug/netkit/p2p" +) + +func TestGenerateNetwork(t *testing.T) { + g := standard_graph.ErdosRenyiGraph(1000, 0.005, true) + t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount()) + src := rand.NewSource(time.Now().UnixNano()) + + nodeLatency := func() float64 { return p2p.LogNormalRand(5.704, 0.5, src) } + edgeLatency := func() float64 { return p2p.LogNormalRand(5.704, 0.3, src) } + + nw, _ := p2p.GenerateNetwork(g, nodeLatency, edgeLatency) + t.Logf("Generated network with %d nodes\n", len(nw)) + for id, node := range nw { + t.Logf("Node %d: latency=%.2fms, edges=%v\n", id, node.Latency, node.Edges) + } + + p2p.RunNetworkSimulation(nw) + p2p.Publish(nw[0], "Hello, P2P Network!") + + time.Sleep(5 * time.Second) + + count := 0 + for id, node := range nw { + c := len(node.SentTo["Hello, P2P Network!"]) + t.Logf("Node %d sent %d/%d\n", id, c, len(node.Edges)) + t.Logf("Node %d data: recv: %v, sent: %v, seen: %v\n", + id, + node.RecvFrom["Hello, P2P Network!"], + node.SentTo["Hello, P2P Network!"], + node.SeenAt["Hello, P2P Network!"], + ) + count += c + } + + t.Logf("Total received count: %d\n", count) +}