From 067611149ae79ed883f8153496e36ebb4777498d Mon Sep 17 00:00:00 2001 From: elecbug Date: Wed, 15 Oct 2025 10:16:49 +0900 Subject: [PATCH 1/2] feat: update more metric for p2p network --- README.md | 7 ++- .../graph/standard_graph/standard_graph.go | 4 +- p2p/broadcast.go | 9 ---- p2p/broadcast/broadcast.go | 10 ++++ p2p/network.go | 52 ++++++++++++------- p2p/node.go | 7 ++- p2p/p2p_test.go | 40 ++++++++++++-- p2p/type.go | 18 +++++++ 8 files changed, 110 insertions(+), 37 deletions(-) delete mode 100644 p2p/broadcast.go create mode 100644 p2p/broadcast/broadcast.go create mode 100644 p2p/type.go diff --git a/README.md b/README.md index 2afe1f8..1784e7d 100644 --- a/README.md +++ b/README.md @@ -13,12 +13,17 @@ go get github.com/elecbug/netkit@latest ## Packages -### Graph algorithm +### Graph - [`network-graph`](./network-graph/): Unweighted network analysis library. - [`graph`](./network-graph/graph/): Library for creating and building graphs. + - [`standard_graph`](./network-graph/graph/standard_graph/): Library for generating standard graphs like Erdos-Reyni graph. - [`algorithm`](./network-graph/algorithm/): Library containing various graph algorithms. +### P2P + +- [`p2p`](./p2p/): Library that integrates with graph libraries to form networks and enable p2p broadcast experiments. + ### Extensible - [`bimap`](./bimap/): Bidirectional map with O(1) lookups key->value and value->key. diff --git a/network-graph/graph/standard_graph/standard_graph.go b/network-graph/graph/standard_graph/standard_graph.go index ae2bc6f..759126c 100644 --- a/network-graph/graph/standard_graph/standard_graph.go +++ b/network-graph/graph/standard_graph/standard_graph.go @@ -23,7 +23,7 @@ func (s STANDARD_GRAPH_TYPE) String(onlyAlphabet bool) string { return "Erdős-Rényi" } case RANDOM_REGULAR: - return "Random-Regular" + return "Random Regular" case BARABASI_ALBERT: if onlyAlphabet { return "Barabasi-Albert" @@ -33,7 +33,7 @@ func (s STANDARD_GRAPH_TYPE) String(onlyAlphabet bool) string { case WATTS_STROGATZ: return "Watts-Strogatz" case RANDOM_GEOMETRIC: - return "Random-Geometric" + return "Random Geometric" case WAXMAN: return "Waxman" default: diff --git a/p2p/broadcast.go b/p2p/broadcast.go deleted file mode 100644 index a8754c3..0000000 --- a/p2p/broadcast.go +++ /dev/null @@ -1,9 +0,0 @@ -package p2p - -// BroadcastProtocol defines the protocol used for broadcasting messages in the P2P network. -type BroadcastProtocol int - -var ( - Flooding BroadcastProtocol = 0 - Gossiping BroadcastProtocol = 1 -) diff --git a/p2p/broadcast/broadcast.go b/p2p/broadcast/broadcast.go new file mode 100644 index 0000000..912eb2a --- /dev/null +++ b/p2p/broadcast/broadcast.go @@ -0,0 +1,10 @@ +// Package broadcast defines the protocols used for communication in the P2P network. +package broadcast + +// Protocol defines the protocol used for broadcasting messages in the P2P network. +type Protocol int + +var ( + Flooding Protocol = 0 + Gossiping Protocol = 1 +) diff --git a/p2p/network.go b/p2p/network.go index 328a9b5..df0149f 100644 --- a/p2p/network.go +++ b/p2p/network.go @@ -5,23 +5,13 @@ import ( "fmt" "strconv" "sync" + "time" "github.com/elecbug/netkit/network-graph/graph" "github.com/elecbug/netkit/network-graph/node" + "github.com/elecbug/netkit/p2p/broadcast" ) -// Message represents a message sent between nodes in the P2P network. -type Message struct { - From ID - Content string - Protocol BroadcastProtocol -} - -// Config holds configuration parameters for the P2P network. -type Config struct { - GossipFactor float64 // fraction of neighbors to gossip to -} - // Config holds configuration parameters for the P2P network. type Network struct { nodes map[ID]*p2pNode @@ -96,13 +86,8 @@ func (n *Network) NodeIDs() []ID { return ids } -// GetNode retrieves a node by its ID. -func (n *Network) GetNode(id ID) *p2pNode { - return n.nodes[id] -} - // Publish sends a message to the specified node's message queue. -func (n *Network) Publish(nodeID ID, msg string, protocol BroadcastProtocol) error { +func (n *Network) Publish(nodeID ID, msg string, protocol broadcast.Protocol) error { if node, ok := n.nodes[nodeID]; ok { if !node.alive { return fmt.Errorf("node %d is not alive", nodeID) @@ -132,6 +117,37 @@ func (n *Network) Reachability(msg string) float64 { return float64(reached) / float64(total) } +// FirstMessageReceptionTimes returns the first reception times of the specified message across all nodes. +func (n *Network) FirstMessageReceptionTimes(msg string) []time.Time { + firstTimes := make([]time.Time, 0) + + for _, node := range n.nodes { + node.mu.Lock() + if t, ok := node.seenAt[msg]; ok { + firstTimes = append(firstTimes, t) + } + + node.mu.Unlock() + } + + return firstTimes +} + +// NumberOfDuplicateMessages counts how many duplicate messages were received across all nodes. +func (n *Network) NumberOfDuplicateMessages(msg string) int { + dupCount := 0 + + for _, node := range n.nodes { + node.mu.Lock() + if count, ok := node.recvFrom[msg]; ok { + dupCount += len(count) - 1 + } + node.mu.Unlock() + } + + return dupCount +} + // MessageInfo returns a snapshot of the node's message-related information. func (n *Network) MessageInfo(nodeID ID, content string) (map[string]any, error) { node := n.nodes[nodeID] diff --git a/p2p/node.go b/p2p/node.go index 5a562a6..9c64049 100644 --- a/p2p/node.go +++ b/p2p/node.go @@ -4,10 +4,9 @@ import ( "context" "sync" "time" -) -// ID represents a unique identifier for a node in the P2P network. -type ID uint64 + "github.com/elecbug/netkit/p2p/broadcast" +) // p2pNode represents a node in the P2P network. type p2pNode struct { @@ -125,7 +124,7 @@ func (n *p2pNode) publish(network *Network, msg Message, exclude map[ID]struct{} willSendEdges = append(willSendEdges, edge) } - if protocol == Gossiping && len(willSendEdges) > 0 { + if protocol == broadcast.Gossiping && len(willSendEdges) > 0 { k := int(float64(len(willSendEdges)) * network.cfg.GossipFactor) willSendEdges = willSendEdges[:k] } diff --git a/p2p/p2p_test.go b/p2p/p2p_test.go index 3190d41..9806128 100644 --- a/p2p/p2p_test.go +++ b/p2p/p2p_test.go @@ -12,6 +12,7 @@ import ( "github.com/elecbug/netkit/network-graph/graph/standard_graph" "github.com/elecbug/netkit/p2p" + "github.com/elecbug/netkit/p2p/broadcast" ) func TestGenerateNetwork(t *testing.T) { @@ -38,7 +39,7 @@ func TestGenerateNetwork(t *testing.T) { nw.RunNetworkSimulation(ctx) t.Logf("Publishing message '%s' from node %d\n", msg1, nw.NodeIDs()[0]) - err = nw.Publish(nw.NodeIDs()[0], msg1, p2p.Flooding) + err = nw.Publish(nw.NodeIDs()[0], msg1, broadcast.Flooding) if err != nil { t.Fatalf("Failed to publish message: %v", err) } @@ -46,7 +47,7 @@ func TestGenerateNetwork(t *testing.T) { t.Logf("Reachability of message '%s': %f\n", msg1, nw.Reachability(msg1)) t.Logf("Publishing message '%s' from node %d\n", msg2, nw.NodeIDs()[1]) - err = nw.Publish(nw.NodeIDs()[1], msg2, p2p.Gossiping) + err = nw.Publish(nw.NodeIDs()[1], msg2, broadcast.Gossiping) if err != nil { t.Fatalf("Failed to publish message: %v", err) } @@ -57,7 +58,7 @@ func TestGenerateNetwork(t *testing.T) { nw.RunNetworkSimulation(context.Background()) t.Logf("Publishing message '%s' from node %d\n", msg3, nw.NodeIDs()[2]) - err = nw.Publish(nw.NodeIDs()[2], msg3, p2p.Gossiping) + err = nw.Publish(nw.NodeIDs()[2], msg3, broadcast.Gossiping) if err != nil { t.Fatalf("Failed to publish message: %v", err) } @@ -82,3 +83,36 @@ func TestGenerateNetwork(t *testing.T) { os.WriteFile("p2p_result.log", data, 0644) } + +func TestMetrics(t *testing.T) { + g := standard_graph.ErdosRenyiGraph(1000, 50.000/1000, true) + t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount()) + src := rand.NewSource(time.Now().UnixNano()) + + nodeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.5, src) } + edgeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.3, src) } + + nw, err := p2p.GenerateNetwork(g, nodeLatency, edgeLatency, &p2p.Config{GossipFactor: 0.35}) + if err != nil { + t.Fatalf("Failed to generate network: %v", err) + } + + t.Logf("Generated network with %d nodes\n", len(nw.NodeIDs())) + + msg1 := "Hello, P2P World!" + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + nw.RunNetworkSimulation(ctx) + + t.Logf("Publishing message '%s' from node %d\n", msg1, nw.NodeIDs()[0]) + err = nw.Publish(nw.NodeIDs()[0], msg1, broadcast.Flooding) + if err != nil { + t.Fatalf("Failed to publish message: %v", err) + } + time.Sleep(1000 * time.Millisecond) + t.Logf("Number of nodes: %d\n", len(nw.NodeIDs())) + t.Logf("Reachability of message '%s': %f\n", msg1, nw.Reachability(msg1)) + t.Logf("First message reception times of message '%s': %v\n", msg1, nw.FirstMessageReceptionTimes(msg1)) + t.Logf("Number of duplicate messages of message '%s': %d\n", msg1, nw.NumberOfDuplicateMessages(msg1)) +} diff --git a/p2p/type.go b/p2p/type.go new file mode 100644 index 0000000..4a53daf --- /dev/null +++ b/p2p/type.go @@ -0,0 +1,18 @@ +package p2p + +import "github.com/elecbug/netkit/p2p/broadcast" + +// Message represents a message sent between nodes in the P2P network. +type Message struct { + From ID + Content string + Protocol broadcast.Protocol +} + +// Config holds configuration parameters for the P2P network. +type Config struct { + GossipFactor float64 // fraction of neighbors to gossip to +} + +// ID represents a unique identifier for a node in the P2P network. +type ID uint64 From 29fac224e0a8624fbb7a7f67206cffc2e5f63a3e Mon Sep 17 00:00:00 2001 From: elecbug Date: Wed, 15 Oct 2025 10:21:48 +0900 Subject: [PATCH 2/2] temp: update custom --- p2p/broadcast/broadcast.go | 1 + 1 file changed, 1 insertion(+) diff --git a/p2p/broadcast/broadcast.go b/p2p/broadcast/broadcast.go index 912eb2a..4ac3dfb 100644 --- a/p2p/broadcast/broadcast.go +++ b/p2p/broadcast/broadcast.go @@ -7,4 +7,5 @@ type Protocol int var ( Flooding Protocol = 0 Gossiping Protocol = 1 + Custom Protocol = 2 )