Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,17 @@ go get github.com/elecbug/netkit@latest

## Packages

### Graph algorithm
### Graph

- [`network-graph`](./network-graph/): Unweighted network analysis library.
- [`graph`](./network-graph/graph/): Library for creating and building graphs.
- [`standard_graph`](./network-graph/graph/standard_graph/): Library for generating standard graphs like Erdos-Reyni graph.
- [`algorithm`](./network-graph/algorithm/): Library containing various graph algorithms.

### P2P

- [`p2p`](./p2p/): Library that integrates with graph libraries to form networks and enable p2p broadcast experiments.

### Extensible

- [`bimap`](./bimap/): Bidirectional map with O(1) lookups key->value and value->key.
Expand Down
4 changes: 2 additions & 2 deletions network-graph/graph/standard_graph/standard_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (s STANDARD_GRAPH_TYPE) String(onlyAlphabet bool) string {
return "Erdős-Rényi"
}
case RANDOM_REGULAR:
return "Random-Regular"
return "Random Regular"
case BARABASI_ALBERT:
if onlyAlphabet {
return "Barabasi-Albert"
Expand All @@ -33,7 +33,7 @@ func (s STANDARD_GRAPH_TYPE) String(onlyAlphabet bool) string {
case WATTS_STROGATZ:
return "Watts-Strogatz"
case RANDOM_GEOMETRIC:
return "Random-Geometric"
return "Random Geometric"
case WAXMAN:
return "Waxman"
default:
Expand Down
9 changes: 0 additions & 9 deletions p2p/broadcast.go

This file was deleted.

11 changes: 11 additions & 0 deletions p2p/broadcast/broadcast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Package broadcast defines the protocols used for communication in the P2P network.
package broadcast

// Protocol defines the protocol used for broadcasting messages in the P2P network.
type Protocol int

var (
Flooding Protocol = 0
Gossiping Protocol = 1
Custom Protocol = 2
)
52 changes: 34 additions & 18 deletions p2p/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,13 @@ import (
"fmt"
"strconv"
"sync"
"time"

"github.com/elecbug/netkit/network-graph/graph"
"github.com/elecbug/netkit/network-graph/node"
"github.com/elecbug/netkit/p2p/broadcast"
)

// Message represents a message sent between nodes in the P2P network.
type Message struct {
From ID
Content string
Protocol BroadcastProtocol
}

// Config holds configuration parameters for the P2P network.
type Config struct {
GossipFactor float64 // fraction of neighbors to gossip to
}

// Config holds configuration parameters for the P2P network.
type Network struct {
nodes map[ID]*p2pNode
Expand Down Expand Up @@ -96,13 +86,8 @@ func (n *Network) NodeIDs() []ID {
return ids
}

// GetNode retrieves a node by its ID.
func (n *Network) GetNode(id ID) *p2pNode {
return n.nodes[id]
}

// Publish sends a message to the specified node's message queue.
func (n *Network) Publish(nodeID ID, msg string, protocol BroadcastProtocol) error {
func (n *Network) Publish(nodeID ID, msg string, protocol broadcast.Protocol) error {
if node, ok := n.nodes[nodeID]; ok {
if !node.alive {
return fmt.Errorf("node %d is not alive", nodeID)
Expand Down Expand Up @@ -132,6 +117,37 @@ func (n *Network) Reachability(msg string) float64 {
return float64(reached) / float64(total)
}

// FirstMessageReceptionTimes returns the first reception times of the specified message across all nodes.
func (n *Network) FirstMessageReceptionTimes(msg string) []time.Time {
firstTimes := make([]time.Time, 0)

for _, node := range n.nodes {
node.mu.Lock()
if t, ok := node.seenAt[msg]; ok {
firstTimes = append(firstTimes, t)
}

node.mu.Unlock()
}

return firstTimes
}

// NumberOfDuplicateMessages counts how many duplicate messages were received across all nodes.
func (n *Network) NumberOfDuplicateMessages(msg string) int {
dupCount := 0

for _, node := range n.nodes {
node.mu.Lock()
if count, ok := node.recvFrom[msg]; ok {
dupCount += len(count) - 1
}
node.mu.Unlock()
}

return dupCount
}

// MessageInfo returns a snapshot of the node's message-related information.
func (n *Network) MessageInfo(nodeID ID, content string) (map[string]any, error) {
node := n.nodes[nodeID]
Expand Down
7 changes: 3 additions & 4 deletions p2p/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ import (
"context"
"sync"
"time"
)

// ID represents a unique identifier for a node in the P2P network.
type ID uint64
"github.com/elecbug/netkit/p2p/broadcast"
)

// p2pNode represents a node in the P2P network.
type p2pNode struct {
Expand Down Expand Up @@ -125,7 +124,7 @@ func (n *p2pNode) publish(network *Network, msg Message, exclude map[ID]struct{}
willSendEdges = append(willSendEdges, edge)
}

if protocol == Gossiping && len(willSendEdges) > 0 {
if protocol == broadcast.Gossiping && len(willSendEdges) > 0 {
k := int(float64(len(willSendEdges)) * network.cfg.GossipFactor)
willSendEdges = willSendEdges[:k]
}
Expand Down
40 changes: 37 additions & 3 deletions p2p/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/elecbug/netkit/network-graph/graph/standard_graph"
"github.com/elecbug/netkit/p2p"
"github.com/elecbug/netkit/p2p/broadcast"
)

func TestGenerateNetwork(t *testing.T) {
Expand All @@ -38,15 +39,15 @@ func TestGenerateNetwork(t *testing.T) {
nw.RunNetworkSimulation(ctx)

t.Logf("Publishing message '%s' from node %d\n", msg1, nw.NodeIDs()[0])
err = nw.Publish(nw.NodeIDs()[0], msg1, p2p.Flooding)
err = nw.Publish(nw.NodeIDs()[0], msg1, broadcast.Flooding)
if err != nil {
t.Fatalf("Failed to publish message: %v", err)
}
time.Sleep(1000 * time.Millisecond)
t.Logf("Reachability of message '%s': %f\n", msg1, nw.Reachability(msg1))

t.Logf("Publishing message '%s' from node %d\n", msg2, nw.NodeIDs()[1])
err = nw.Publish(nw.NodeIDs()[1], msg2, p2p.Gossiping)
err = nw.Publish(nw.NodeIDs()[1], msg2, broadcast.Gossiping)
if err != nil {
t.Fatalf("Failed to publish message: %v", err)
}
Expand All @@ -57,7 +58,7 @@ func TestGenerateNetwork(t *testing.T) {

nw.RunNetworkSimulation(context.Background())
t.Logf("Publishing message '%s' from node %d\n", msg3, nw.NodeIDs()[2])
err = nw.Publish(nw.NodeIDs()[2], msg3, p2p.Gossiping)
err = nw.Publish(nw.NodeIDs()[2], msg3, broadcast.Gossiping)
if err != nil {
t.Fatalf("Failed to publish message: %v", err)
}
Expand All @@ -82,3 +83,36 @@ func TestGenerateNetwork(t *testing.T) {

os.WriteFile("p2p_result.log", data, 0644)
}

func TestMetrics(t *testing.T) {
g := standard_graph.ErdosRenyiGraph(1000, 50.000/1000, true)
t.Logf("Generated graph with %d nodes and %d edges\n", len(g.Nodes()), g.EdgeCount())
src := rand.NewSource(time.Now().UnixNano())

nodeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.5, src) }
edgeLatency := func() float64 { return p2p.LogNormalRand(math.Log(100), 0.3, src) }

nw, err := p2p.GenerateNetwork(g, nodeLatency, edgeLatency, &p2p.Config{GossipFactor: 0.35})
if err != nil {
t.Fatalf("Failed to generate network: %v", err)
}

t.Logf("Generated network with %d nodes\n", len(nw.NodeIDs()))

msg1 := "Hello, P2P World!"

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nw.RunNetworkSimulation(ctx)

t.Logf("Publishing message '%s' from node %d\n", msg1, nw.NodeIDs()[0])
err = nw.Publish(nw.NodeIDs()[0], msg1, broadcast.Flooding)
if err != nil {
t.Fatalf("Failed to publish message: %v", err)
}
time.Sleep(1000 * time.Millisecond)
t.Logf("Number of nodes: %d\n", len(nw.NodeIDs()))
t.Logf("Reachability of message '%s': %f\n", msg1, nw.Reachability(msg1))
t.Logf("First message reception times of message '%s': %v\n", msg1, nw.FirstMessageReceptionTimes(msg1))
t.Logf("Number of duplicate messages of message '%s': %d\n", msg1, nw.NumberOfDuplicateMessages(msg1))
}
18 changes: 18 additions & 0 deletions p2p/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package p2p

import "github.com/elecbug/netkit/p2p/broadcast"

// Message represents a message sent between nodes in the P2P network.
type Message struct {
From ID
Content string
Protocol broadcast.Protocol
}

// Config holds configuration parameters for the P2P network.
type Config struct {
GossipFactor float64 // fraction of neighbors to gossip to
}

// ID represents a unique identifier for a node in the P2P network.
type ID uint64