diff --git a/README.md b/README.md index e5c0d2c..6aa6e5b 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ go get github.com/elecbug/netkit@latest ### P2P -- [`p2p`](./p2p/): Library that integrates with graph libraries to form networks and enable p2p broadcast experiments. +- [`p2p`](./p2p/): Library that integrates with graph libraries to form networks and enable P2P broadcast experiments. ### Extensible diff --git a/p2p/broadcast.go b/p2p/broadcast.go index bf70916..9f89d56 100644 --- a/p2p/broadcast.go +++ b/p2p/broadcast.go @@ -3,8 +3,8 @@ package p2p // BroadcastProtocol defines the protocol used for broadcasting messages in the P2P network. type BroadcastProtocol int -var ( - Flooding BroadcastProtocol = 0 - Gossiping BroadcastProtocol = 1 - Custom BroadcastProtocol = 2 +const ( + Flooding BroadcastProtocol = iota + Gossiping + Custom ) diff --git a/p2p/network.go b/p2p/network.go deleted file mode 100644 index cd62498..0000000 --- a/p2p/network.go +++ /dev/null @@ -1,175 +0,0 @@ -package p2p - -import ( - "context" - "fmt" - "strconv" - "sync" - "time" - - "github.com/elecbug/netkit/graph" -) - -// Config holds configuration parameters for the P2P network. -type Network struct { - nodes map[PeerID]*p2pNode - cfg *Config -} - -// GenerateNetwork creates a P2P network from the given graph. -// nodeLatency and edgeLatency are functions that generate latencies for nodes and edges respectively. -func GenerateNetwork(g *graph.Graph, nodeLatency, edgeLatency func() float64, cfg *Config) (*Network, error) { - nodes := make(map[PeerID]*p2pNode) - maps := make(map[graph.NodeID]PeerID) - - // create nodes - for _, gn := range g.Nodes() { - num, err := strconv.Atoi(gn.String()) - - if err != nil { - return nil, err - } - - n := newNode(PeerID(num), nodeLatency()) - n.edges = make(map[PeerID]p2pEdge) - - nodes[n.id] = n - maps[gn] = n.id - } - - for _, gn := range g.Nodes() { - num, err := strconv.Atoi(gn.String()) - - if err != nil { - return nil, err - } - - n := nodes[PeerID(num)] - - for _, neighbor := range g.Neighbors(gn) { - j := maps[neighbor] - - edge := p2pEdge{ - targetID: PeerID(j), - edgeLatency: edgeLatency(), - } - - n.edges[edge.targetID] = edge - } - } - - return &Network{nodes: nodes, cfg: cfg}, nil -} - -// RunNetworkSimulation starts the message handling routines for all nodes in the network. -func (n *Network) RunNetworkSimulation(ctx context.Context) { - wg := &sync.WaitGroup{} - wg.Add(len(n.nodes)) - - for _, node := range n.nodes { - node.eachRun(n, wg, ctx) - } - - wg.Wait() -} - -// PeerIDs returns a slice of all node IDs in the network. -func (n *Network) PeerIDs() []PeerID { - ids := make([]PeerID, 0, len(n.nodes)) - - for id := range n.nodes { - ids = append(ids, id) - } - - return ids -} - -// Publish sends a message to the specified node's message queue. -func (n *Network) Publish(nodeID PeerID, msg string, protocol BroadcastProtocol, customProtocol func(msg Message, known []PeerID, sent []PeerID, received []PeerID, params map[string]any) *[]PeerID) error { - if node, ok := n.nodes[nodeID]; ok { - if !node.alive { - return fmt.Errorf("node %d is not alive", nodeID) - } - - node.msgQueue <- Message{From: nodeID, Content: msg, Protocol: protocol, HopCount: 0, CustomProtocol: customProtocol} - return nil - } - - return fmt.Errorf("node %d not found", nodeID) -} - -// Reachability calculates the fraction of nodes that have received the specified message. -func (n *Network) Reachability(msg string) float64 { - total := 0 - reached := 0 - - for _, node := range n.nodes { - total++ - node.mu.Lock() - if _, ok := node.seenAt[msg]; ok { - reached++ - } - node.mu.Unlock() - } - - return float64(reached) / float64(total) -} - -// FirstMessageReceptionTimes returns the first reception times of the specified message across all nodes. -func (n *Network) FirstMessageReceptionTimes(msg string) []time.Time { - firstTimes := make([]time.Time, 0) - - for _, node := range n.nodes { - node.mu.Lock() - if t, ok := node.seenAt[msg]; ok { - firstTimes = append(firstTimes, t) - } - - node.mu.Unlock() - } - - return firstTimes -} - -// NumberOfDuplicateMessages counts how many duplicate messages were received across all nodes. -func (n *Network) NumberOfDuplicateMessages(msg string) int { - dupCount := 0 - - for _, node := range n.nodes { - node.mu.Lock() - if count, ok := node.recvFrom[msg]; ok { - dupCount += len(count) - 1 - } - node.mu.Unlock() - } - - return dupCount -} - -// MessageInfo returns a snapshot of the node's message-related information. -func (n *Network) MessageInfo(nodeID PeerID, content string) (map[string]any, error) { - node := n.nodes[nodeID] - - if node == nil { - return nil, fmt.Errorf("node %d not found", nodeID) - } - - node.mu.Lock() - defer node.mu.Unlock() - - info := make(map[string]any) - - info["recv"] = make([]PeerID, 0) - for k := range node.recvFrom[content] { - info["recv"] = append(info["recv"].([]PeerID), k) - } - - info["sent"] = make([]PeerID, 0) - for k := range node.sentTo[content] { - info["sent"] = append(info["sent"].([]PeerID), k) - } - - info["seen"] = node.seenAt[content].String() - - return info, nil -} diff --git a/p2p/node.go b/p2p/node.go index e311c5c..725857c 100644 --- a/p2p/node.go +++ b/p2p/node.go @@ -46,7 +46,7 @@ func newNode(id PeerID, nodeLatency float64) *p2pNode { } // eachRun starts the message handling routine for the node. -func (n *p2pNode) eachRun(network *Network, wg *sync.WaitGroup, ctx context.Context) { +func (n *p2pNode) eachRun(network *P2P, wg *sync.WaitGroup, ctx context.Context) { go func(ctx context.Context, wg *sync.WaitGroup) { n.alive = true wg.Done() @@ -74,7 +74,7 @@ func (n *p2pNode) eachRun(network *Network, wg *sync.WaitGroup, ctx context.Cont if first { go func(msg Message) { time.Sleep(time.Duration(n.nodeLatency) * time.Millisecond) - n.publish(network, msg) + n.eachPublish(network, msg) }(msg) } } @@ -82,17 +82,8 @@ func (n *p2pNode) eachRun(network *Network, wg *sync.WaitGroup, ctx context.Cont }(ctx, wg) } -// // copyIDSet creates a shallow copy of a set of IDs. -// func copyIDSet(src map[PeerID]struct{}) map[PeerID]struct{} { -// dst := make(map[PeerID]struct{}, len(src)) -// for k := range src { -// dst[k] = struct{}{} -// } -// return dst -// } - -// publish sends the message to neighbors, excluding 'exclude' and already-sent targets. -func (n *p2pNode) publish(network *Network, msg Message) { +// eachPublish sends the message to neighbors, excluding 'exclude' and already-sent targets. +func (n *p2pNode) eachPublish(network *P2P, msg Message) { content := msg.Content protocol := msg.Protocol hopCount := msg.HopCount diff --git a/p2p/p2p.go b/p2p/p2p.go index 0402fe1..f355da0 100644 --- a/p2p/p2p.go +++ b/p2p/p2p.go @@ -1,20 +1,182 @@ // Package p2p provides types and interfaces for a peer-to-peer networking simulation. package p2p -// PeerID represents a unique identifier for a node in the P2P network. -type PeerID uint64 +import ( + "context" + "fmt" + "strconv" + "sync" + "time" -// Message represents a message sent between nodes in the P2P network. -type Message struct { - From PeerID - Content string - Protocol BroadcastProtocol - HopCount int - CustomProtocol func(Message, []PeerID, []PeerID, []PeerID, map[string]any) *[]PeerID -} + "github.com/elecbug/netkit/graph" +) // Config holds configuration parameters for the P2P network. -type Config struct { - GossipFactor float64 // fraction of neighbors to gossip to - CustomParams map[string]any // parameters for custom protocols +type P2P struct { + nodes map[PeerID]*p2pNode + cfg *Config +} + +// GenerateP2P creates a P2P network from the given graph. +// nodeLatency and edgeLatency are functions that generate latencies for nodes and edges respectively. +func GenerateP2P(g *graph.Graph, nodeLatency, edgeLatency func() float64, cfg *Config) (*P2P, error) { + nodes := make(map[PeerID]*p2pNode) + maps := make(map[graph.NodeID]PeerID) + + // create nodes + for _, gn := range g.Nodes() { + num, err := strconv.Atoi(gn.String()) + + if err != nil { + return nil, err + } + + n := newNode(PeerID(num), nodeLatency()) + n.edges = make(map[PeerID]p2pEdge) + + nodes[n.id] = n + maps[gn] = n.id + } + + for _, gn := range g.Nodes() { + num, err := strconv.Atoi(gn.String()) + + if err != nil { + return nil, err + } + + n := nodes[PeerID(num)] + + for _, neighbor := range g.Neighbors(gn) { + j := maps[neighbor] + + edge := p2pEdge{ + targetID: PeerID(j), + edgeLatency: edgeLatency(), + } + + n.edges[edge.targetID] = edge + } + } + + return &P2P{nodes: nodes, cfg: cfg}, nil +} + +// SimulateP2P starts the message handling routines for all nodes in the network. +func (p *P2P) SimulateP2P(ctx context.Context) { + wg := &sync.WaitGroup{} + wg.Add(len(p.nodes)) + + for _, node := range p.nodes { + node.eachRun(p, wg, ctx) + } + + wg.Wait() +} + +// PeerIDs returns a slice of all node IDs in the network. +func (p *P2P) PeerIDs() []PeerID { + ids := make([]PeerID, 0, len(p.nodes)) + + for id := range p.nodes { + ids = append(ids, id) + } + + return ids +} + +// Publish sends a message to the specified node's message queue. +func (p *P2P) Publish(nodeID PeerID, msg string, protocol BroadcastProtocol, customProtocol CustomProtocolFunc) error { + if node, ok := p.nodes[nodeID]; ok { + if !node.alive { + return fmt.Errorf("node %d is not alive", nodeID) + } + + node.msgQueue <- Message{ + From: nodeID, + Content: msg, + Protocol: protocol, + HopCount: 0, + CustomProtocol: customProtocol, + } + return nil + } + + return fmt.Errorf("node %d not found", nodeID) +} + +// Reachability calculates the fraction of nodes that have received the specified message. +func (p *P2P) Reachability(msg string) float64 { + total := 0 + reached := 0 + + for _, node := range p.nodes { + total++ + node.mu.Lock() + if _, ok := node.seenAt[msg]; ok { + reached++ + } + node.mu.Unlock() + } + + return float64(reached) / float64(total) +} + +// FirstMessageReceptionTimes returns the first reception times of the specified message across all nodes. +func (p *P2P) FirstMessageReceptionTimes(msg string) []time.Time { + firstTimes := make([]time.Time, 0) + + for _, node := range p.nodes { + node.mu.Lock() + if t, ok := node.seenAt[msg]; ok { + firstTimes = append(firstTimes, t) + } + + node.mu.Unlock() + } + + return firstTimes +} + +// NumberOfDuplicateMessages counts how many duplicate messages were received across all nodes. +func (p *P2P) NumberOfDuplicateMessages(msg string) int { + dupCount := 0 + + for _, node := range p.nodes { + node.mu.Lock() + if count, ok := node.recvFrom[msg]; ok { + dupCount += len(count) - 1 + } + node.mu.Unlock() + } + + return dupCount +} + +// MessageInfo returns a snapshot of the node's message-related information. +func (p *P2P) MessageInfo(nodeID PeerID, content string) (map[string]any, error) { + node := p.nodes[nodeID] + + if node == nil { + return nil, fmt.Errorf("node %d not found", nodeID) + } + + node.mu.Lock() + defer node.mu.Unlock() + + info := make(map[string]any) + + info["recv"] = make([]PeerID, 0) + for k := range node.recvFrom[content] { + info["recv"] = append(info["recv"].([]PeerID), k) + } + + info["sent"] = make([]PeerID, 0) + for k := range node.sentTo[content] { + info["sent"] = append(info["sent"].([]PeerID), k) + } + + info["seen"] = node.seenAt[content].String() + + return info, nil } diff --git a/p2p/p2p_test.go b/p2p/p2p_test.go index 004f484..55b05ec 100644 --- a/p2p/p2p_test.go +++ b/p2p/p2p_test.go @@ -21,7 +21,7 @@ func TestGenerateNetwork(t *testing.T) { nodeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.5) } edgeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.3) } - nw, err := p2p.GenerateNetwork(g, nodeLatency, edgeLatency, &p2p.Config{GossipFactor: 0.35}) + nw, err := p2p.GenerateP2P(g, nodeLatency, edgeLatency, &p2p.Config{GossipFactor: 0.35}) if err != nil { t.Fatalf("Failed to generate network: %v", err) } @@ -34,7 +34,7 @@ func TestGenerateNetwork(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - nw.RunNetworkSimulation(ctx) + nw.SimulateP2P(ctx) t.Logf("Publishing message '%s' from node %d\n", msg1, nw.PeerIDs()[0]) err = nw.Publish(nw.PeerIDs()[0], msg1, p2p.Flooding, nil) @@ -54,7 +54,7 @@ func TestGenerateNetwork(t *testing.T) { time.Sleep(700 * time.Millisecond) t.Logf("Reachability of message '%s': %f\n", msg2, nw.Reachability(msg2)) - nw.RunNetworkSimulation(context.Background()) + nw.SimulateP2P(context.Background()) t.Logf("Publishing message '%s' from node %d\n", msg3, nw.PeerIDs()[2]) err = nw.Publish(nw.PeerIDs()[2], msg3, p2p.Gossiping, nil) if err != nil { @@ -90,7 +90,7 @@ func TestMetrics(t *testing.T) { nodeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.5) } edgeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.3) } - nw, err := p2p.GenerateNetwork(g, nodeLatency, edgeLatency, &p2p.Config{GossipFactor: 0.35}) + nw, err := p2p.GenerateP2P(g, nodeLatency, edgeLatency, &p2p.Config{GossipFactor: 0.35}) if err != nil { t.Fatalf("Failed to generate network: %v", err) } @@ -101,7 +101,7 @@ func TestMetrics(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - nw.RunNetworkSimulation(ctx) + nw.SimulateP2P(ctx) t.Logf("Publishing message '%s' from node %d\n", msg1, nw.PeerIDs()[0]) err = nw.Publish(nw.PeerIDs()[0], msg1, p2p.Flooding, nil) diff --git a/p2p/type.go b/p2p/type.go new file mode 100644 index 0000000..8751e42 --- /dev/null +++ b/p2p/type.go @@ -0,0 +1,21 @@ +package p2p + +// PeerID represents a unique identifier for a node in the P2P network. +type PeerID uint64 + +// Message represents a message sent between nodes in the P2P network. +type Message struct { + From PeerID + Content string + Protocol BroadcastProtocol + HopCount int + CustomProtocol CustomProtocolFunc +} + +// Config holds configuration parameters for the P2P network. +type Config struct { + GossipFactor float64 // fraction of neighbors to gossip to + CustomParams map[string]any // parameters for custom protocols +} + +type CustomProtocolFunc func(msg Message, neighbours []PeerID, sentPeers []PeerID, receivedPeers []PeerID, customParams map[string]any) *[]PeerID diff --git a/test.sh b/test.sh index 3d1fb7b..80aea3d 100755 --- a/test.sh +++ b/test.sh @@ -1,8 +1,8 @@ -go test -v ./network-graph/ +go test -v ./graph/ cd script -python3 compare_metrics.py -i ../network-graph/directional.graph.log -c ../network-graph/directional.metrics.log -r ./directional.report.log -o ./directional.out.log -python3 compare_metrics.py -i ../network-graph/bidirectional.graph.log -c ../network-graph/bidirectional.metrics.log -r ./bidirectional.report.log -o ./bidirectional.out.log +python3 compare_metrics.py -i ../graph/directional.graph.log -c ../graph/directional.metrics.log -r ./directional.report.log -o ./directional.out.log +python3 compare_metrics.py -i ../graph/bidirectional.graph.log -c ../graph/bidirectional.metrics.log -r ./bidirectional.report.log -o ./bidirectional.out.log cd .. \ No newline at end of file