Skip to content
This repository was archived by the owner on Jun 14, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions app/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"errors"
"fmt"
"github.com/Sirupsen/logrus"
"github.com/Tapjoy/dynamiq/app/partitioner"
"github.com/Tapjoy/dynamiq/app/stats"
"github.com/hashicorp/memberlist"
"github.com/tpjg/goriakpbc"
"math/rand"
"strconv"
Expand Down Expand Up @@ -36,6 +38,7 @@ type Config struct {
Queues *Queues
RiakPool *riak.Client
Topics *Topics
Nodes *memberlist.Memberlist
}

type Core struct {
Expand Down Expand Up @@ -75,6 +78,8 @@ func GetCoreConfig(config_file *string) (*Config, error) {
logrus.Fatal(err)
}

cfg.Nodes = InitMember(&cfg)
logrus.Info("Nodes ?", cfg.Nodes)
cfg.RiakPool = initRiakPool(&cfg)
cfg.Queues = loadQueuesConfig(&cfg)
switch cfg.Stats.Type {
Expand Down Expand Up @@ -120,11 +125,15 @@ func loadQueuesConfig(cfg *Config) *Queues {
name := string(elem[:])
// Get the Riak RdtMap of settings for this queue
configMap, _ := configBucket.FetchMap(queueConfigRecordName(name))
visTimeout, _ := cfg.GetVisibilityTimeout(name)
ringCache := partitioner.NewTimedRingCache(time.Second*time.Duration(visTimeout), time.Second*1)

// Pre-warm the settings object
queue := &Queue{
Name: name,
Config: configMap,
Parts: InitPartitions(cfg, name),
Name: name,
Config: configMap,
Partitioner: partitioner.NewRangePartitioner(cfg.Nodes, ringCache),
Parts: InitPartitions(cfg, name),
}
// TODO: We should be handling errors here
// Set the queue in the queue map
Expand All @@ -143,11 +152,15 @@ func (cfg *Config) InitializeQueue(queueName string) error {
}
// Add to the known set of queues
err = cfg.addToKnownQueues(queueName)
visTimeout, _ := cfg.GetVisibilityTimeout(queueName)
ringCache := partitioner.NewTimedRingCache(time.Second*time.Duration(visTimeout), time.Second*1)

// Now, add the queue into our memory-cache of data
cfg.Queues.QueueMap[queueName] = &Queue{
Name: queueName,
Parts: InitPartitions(cfg, queueName),
Config: configMap,
Name: queueName,
Parts: InitPartitions(cfg, queueName),
Partitioner: partitioner.NewRangePartitioner(cfg.Nodes, ringCache),
Config: configMap,
}
return err
}
Expand Down Expand Up @@ -254,7 +267,7 @@ func (cfg *Config) getQueueSetting(paramName string, queueName string) (string,
}
}
}

if value == "" {
// Read from riak
client := cfg.RiakConnection()
Expand Down
12 changes: 5 additions & 7 deletions app/httpinterface_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"github.com/Sirupsen/logrus"
"github.com/go-martini/martini"
"github.com/hashicorp/memberlist"
"github.com/martini-contrib/binding"
"github.com/martini-contrib/render"
"net/http"
Expand Down Expand Up @@ -67,12 +66,12 @@ func dynamiqMartini(cfg *Config) *martini.ClassicMartini {
type HTTP_API_V1 struct {
}

func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) {
func (h HTTP_API_V1) InitWebserver(cfg *Config) {
// tieing our Queue to HTTP interface == bad we should move this somewhere else
// Queues.Queues is dumb. Need a better name-chain
queues := cfg.Queues
// also tieing topics this is next for refactor
topics := InitTopics(cfg, queues)
topics := InitTopics(cfg, queues)
cfg.Topics = &topics
m := dynamiqMartini(cfg)
m.Use(render.Renderer())
Expand All @@ -82,7 +81,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) {
// STATUS / STATISTICS API BLOCK
m.Get("/status/servers", func() string {
return_string := ""
for _, member := range list.Members() {
for _, member := range cfg.Nodes.Members() {
return_string += fmt.Sprintf("Member: %s %s\n", member.Name, member.Addr)
}
return return_string
Expand All @@ -102,10 +101,9 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) {
}
})


m.Delete("/queues/:queue", func(r render.Render, params martini.Params) {
var present bool
_, present = queues.QueueMap[params["queue"]]
_, present = queues.QueueMap[params["queue"]]
if present == true {
queues.DeleteQueue(params["queue"], cfg)
deleted := true
Expand Down Expand Up @@ -290,7 +288,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) {
if batchSize <= 0 {
r.JSON(422, fmt.Sprint("Batchsizes must be non-negative integers greater than 0"))
}
messages, err := queues.QueueMap[params["queue"]].Get(cfg, list, batchSize)
messages, err := queues.QueueMap[params["queue"]].Get(cfg, cfg.Nodes, batchSize)

if err != nil && err.Error() != NOPARTITIONS {
// We're choosing to ignore nopartitions issues for now and treat them as normal 200s
Expand Down
146 changes: 146 additions & 0 deletions app/partitioner/classic_partitioner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package partitioner

import (
"errors"
"github.com/Sirupsen/logrus"
"github.com/Tapjoy/lane"
"github.com/hashicorp/memberlist"
"math"
"math/rand"
"sort"
"sync"
"time"
)

const NOPARTITIONS string = "no available partitions"

type ClassicPartitioner struct {
sync.RWMutex

MinPartitions int
MaxPartitions int
VisibilityTimeout float64
Nodes *memberlist.Memberlist
partitionCount int
partitions *lane.PQueue
}

type Partition struct {
Id int
LastUsed time.Time
}

func NewClassicPartitioner(list *memberlist.Memberlist, minPartitions int, maxPartitions int, visibilityTimeout float64) *ClassicPartitioner {
partitioner := &ClassicPartitioner{
Nodes: list,
MinPartitions: minPartitions,
MaxPartitions: maxPartitions,
VisibilityTimeout: visibilityTimeout,
partitionCount: 0,
partitions: lane.NewPQueue(lane.MINPQ),
}
partitioner.makePartitions(minPartitions)
return partitioner
}

func (r *ClassicPartitioner) makePartitions(partitionsToMake int) {
var initialTime time.Time
offset := r.partitionCount
for partitionId := offset; partitionId < offset+partitionsToMake; partitionId++ {
if r.MaxPartitions > partitionId {
partition := new(Partition)
partition.Id = partitionId
partition.LastUsed = initialTime
r.partitions.Push(partition, rand.Int63n(100000))
r.partitionCount = r.partitionCount + 1
}
}
}

func (r *ClassicPartitioner) getMyPosition() (int64, int64) {
nodes := r.Nodes.Members()
var nodeNames []string
for _, node := range nodes {
nodeNames = append(nodeNames, node.Name)
}

sort.Strings(nodeNames)

myPos := sort.SearchStrings(nodeNames, r.Nodes.LocalNode().Name)
return int64(myPos), int64(len(nodeNames))
}

func (r *ClassicPartitioner) getPartitionPosition() (int64, *Partition, int64, error) {
//iterate over the partitions and then increase or decrease the number of partitions

//TODO move loging out of the sync operation for better throughput
myPartition := -1

var err error
poppedPartition, _ := r.partitions.Pop()
var workingPartition *Partition
if poppedPartition != nil {
workingPartition = poppedPartition.(*Partition)
} else {
// this seems a little scary
return int64(myPartition), workingPartition, int64(r.partitionCount), errors.New(NOPARTITIONS)
}
if time.Since(workingPartition.LastUsed).Seconds() > r.VisibilityTimeout {
myPartition = workingPartition.Id
} else {
r.partitions.Push(workingPartition, workingPartition.LastUsed.UnixNano())
r.Lock()
defer r.Unlock()
if r.partitionCount < r.MaxPartitions {
workingPartition = new(Partition)
workingPartition.Id = r.partitionCount
myPartition = workingPartition.Id
r.partitionCount = r.partitionCount + 1
} else {
err = errors.New(NOPARTITIONS)
}
}
return int64(myPartition), workingPartition, int64(r.partitionCount), err
}

func (r *ClassicPartitioner) pushPartition(partition *Partition, lock bool) {
if lock {
partition.LastUsed = time.Now()
r.partitions.Push(partition, partition.LastUsed.UnixNano())
} else {
partition.LastUsed = time.Now().Add(-(time.Duration(r.VisibilityTimeout) * time.Second))
r.partitions.Push(partition, partition.LastUsed.UnixNano())
}
}

func (r *ClassicPartitioner) GetRange(lockIncrement int64) (int64, int64, int, error) {
//get the node position and the node count
myPosition, nodeCount := r.getMyPosition()

//calculate the range that our node is responsible for
nodeKeyspaceSize := math.MaxInt64 / nodeCount
nodeBottom := myPosition * nodeKeyspaceSize
nodeTop := (myPosition + 1) * nodeKeyspaceSize

myPartition, partition, totalPartitions, err := r.getPartitionPosition()

if err != nil && err.Error() != NOPARTITIONS {
logrus.Error(err)
}

// calculate my range for the given number
node_range := nodeTop - nodeBottom
nodeStep := node_range / totalPartitions
partitionBottom := nodeStep*myPartition + nodeBottom
partitionTop := nodeStep*(myPartition+1) + nodeBottom
return partitionBottom, partitionTop, partition.Id, err

}

func (r *ClassicPartitioner) ExpireRange(lowerBound int64, upperBound int64, ordinal int) {
p := &Partition{
Id: ordinal,
LastUsed: time.Time{},
}
r.pushPartition(p, false)
}
6 changes: 6 additions & 0 deletions app/partitioner/partitioner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package partitioner

type Partitioner interface {
GetRange(lockIncrement int64) (int64, int64, error)
ExpireRange(lowerBound int64, upperBound int64, ordinal int)
}
53 changes: 53 additions & 0 deletions app/partitioner/range_partitioner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package partitioner

import (
"github.com/hashicorp/memberlist"
"math"
"sort"
)

type RangePartitioner struct {
Nodes *memberlist.Memberlist
ringCache *TimedRingCache
}

func NewRangePartitioner(list *memberlist.Memberlist, ringCache *TimedRingCache) *RangePartitioner {
return &RangePartitioner{
Nodes: list,
ringCache: ringCache,
}
}

func (r *RangePartitioner) GetRange(lockIncrement int64) (int64, int64, int, error) {
myPosition, nodeCount := r.getMyPosition()

// Calculate the range that our node is responsible for
// Do this every time, so that we don't cache bad values if the
// cluster changes state
nodeKeyspaceSize := math.MaxInt64 / nodeCount
nodeBottom := myPosition * nodeKeyspaceSize
nodeTop := (myPosition + 1) * nodeKeyspaceSize

// Find the nearest locked range, if one exists
lowerBound, upperBound := r.ringCache.ReserveRange(nodeBottom, nodeTop, lockIncrement)

// Return the values
return lowerBound, upperBound, 0, nil
}

func (r *RangePartitioner) ExpireRange(lowerBound int64, upperBound int64, ordinal int) {
r.ringCache.ExpireRange(lowerBound, upperBound)
}

func (r *RangePartitioner) getMyPosition() (int64, int64) {
nodes := r.Nodes.Members()
var nodeNames []string
for _, node := range nodes {
nodeNames = append(nodeNames, node.Name)
}

sort.Strings(nodeNames)

myPos := sort.SearchStrings(nodeNames, r.Nodes.LocalNode().Name)
return int64(myPos), int64(len(nodeNames))
}
Loading