From 212ab2e3aae63ad762d5108cb1d2907930d1424f Mon Sep 17 00:00:00 2001 From: StabbyCutyou Date: Sun, 29 Mar 2015 13:37:25 -0400 Subject: [PATCH 1/2] Adding interface for partition strategies, removing the need to use cfg, adding in new RangePartition strategy, new TimedRingCache for handling locked keyspace ranges --- app/partitioner/classic_partitioner.go | 146 ++++++++++++++++++++++ app/partitioner/partitioner.go | 6 + app/partitioner/range_partitioner.go | 53 ++++++++ app/partitioner/ring_cache.go | 161 +++++++++++++++++++++++++ app/partitions.go | 2 + dynamiq.go | 28 +++++ 6 files changed, 396 insertions(+) create mode 100644 app/partitioner/classic_partitioner.go create mode 100644 app/partitioner/partitioner.go create mode 100644 app/partitioner/range_partitioner.go create mode 100644 app/partitioner/ring_cache.go diff --git a/app/partitioner/classic_partitioner.go b/app/partitioner/classic_partitioner.go new file mode 100644 index 0000000..a632124 --- /dev/null +++ b/app/partitioner/classic_partitioner.go @@ -0,0 +1,146 @@ +package partitioner + +import ( + "errors" + "github.com/Sirupsen/logrus" + "github.com/Tapjoy/lane" + "github.com/hashicorp/memberlist" + "math" + "math/rand" + "sort" + "sync" + "time" +) + +const NOPARTITIONS string = "no available partitions" + +type ClassicPartitioner struct { + sync.RWMutex + + MinPartitions int + MaxPartitions int + VisibilityTimeout float64 + Nodes *memberlist.Memberlist + partitionCount int + partitions *lane.PQueue +} + +type Partition struct { + Id int + LastUsed time.Time +} + +func NewClassicPartitioner(list *memberlist.Memberlist, minPartitions int, maxPartitions int, visibilityTimeout float64) *ClassicPartitioner { + partitioner := &ClassicPartitioner{ + Nodes: list, + MinPartitions: minPartitions, + MaxPartitions: maxPartitions, + VisibilityTimeout: visibilityTimeout, + partitionCount: 0, + partitions: lane.NewPQueue(lane.MINPQ), + } + partitioner.makePartitions(minPartitions) + return partitioner +} + +func (r *ClassicPartitioner) makePartitions(partitionsToMake int) { + var initialTime time.Time + offset := r.partitionCount + for partitionId := offset; partitionId < offset+partitionsToMake; partitionId++ { + if r.MaxPartitions > partitionId { + partition := new(Partition) + partition.Id = partitionId + partition.LastUsed = initialTime + r.partitions.Push(partition, rand.Int63n(100000)) + r.partitionCount = r.partitionCount + 1 + } + } +} + +func (r *ClassicPartitioner) getMyPosition() (int64, int64) { + nodes := r.Nodes.Members() + var nodeNames []string + for _, node := range nodes { + nodeNames = append(nodeNames, node.Name) + } + + sort.Strings(nodeNames) + + myPos := sort.SearchStrings(nodeNames, r.Nodes.LocalNode().Name) + return int64(myPos), int64(len(nodeNames)) +} + +func (r *ClassicPartitioner) getPartitionPosition() (int64, *Partition, int64, error) { + //iterate over the partitions and then increase or decrease the number of partitions + + //TODO move loging out of the sync operation for better throughput + myPartition := -1 + + var err error + poppedPartition, _ := r.partitions.Pop() + var workingPartition *Partition + if poppedPartition != nil { + workingPartition = poppedPartition.(*Partition) + } else { + // this seems a little scary + return int64(myPartition), workingPartition, int64(r.partitionCount), errors.New(NOPARTITIONS) + } + if time.Since(workingPartition.LastUsed).Seconds() > r.VisibilityTimeout { + myPartition = workingPartition.Id + } else { + r.partitions.Push(workingPartition, workingPartition.LastUsed.UnixNano()) + r.Lock() + defer r.Unlock() + if r.partitionCount < r.MaxPartitions { + workingPartition = new(Partition) + workingPartition.Id = r.partitionCount + myPartition = workingPartition.Id + r.partitionCount = r.partitionCount + 1 + } else { + err = errors.New(NOPARTITIONS) + } + } + return int64(myPartition), workingPartition, int64(r.partitionCount), err +} + +func (r *ClassicPartitioner) pushPartition(partition *Partition, lock bool) { + if lock { + partition.LastUsed = time.Now() + r.partitions.Push(partition, partition.LastUsed.UnixNano()) + } else { + partition.LastUsed = time.Now().Add(-(time.Duration(r.VisibilityTimeout) * time.Second)) + r.partitions.Push(partition, partition.LastUsed.UnixNano()) + } +} + +func (r *ClassicPartitioner) GetRange(lockIncrement int64) (int64, int64, int, error) { + //get the node position and the node count + myPosition, nodeCount := r.getMyPosition() + + //calculate the range that our node is responsible for + nodeKeyspaceSize := math.MaxInt64 / nodeCount + nodeBottom := myPosition * nodeKeyspaceSize + nodeTop := (myPosition + 1) * nodeKeyspaceSize + + myPartition, partition, totalPartitions, err := r.getPartitionPosition() + + if err != nil && err.Error() != NOPARTITIONS { + logrus.Error(err) + } + + // calculate my range for the given number + node_range := nodeTop - nodeBottom + nodeStep := node_range / totalPartitions + partitionBottom := nodeStep*myPartition + nodeBottom + partitionTop := nodeStep*(myPartition+1) + nodeBottom + return partitionBottom, partitionTop, partition.Id, err + +} + +func (r *ClassicPartitioner) ExpireRange(lowerBound int64, upperBound int64, ordinal int) { + p := &Partition{ + Id: ordinal, + LastUsed: time.Time{}, + } + r.pushPartition(p, false) +} diff --git a/app/partitioner/partitioner.go b/app/partitioner/partitioner.go new file mode 100644 index 0000000..f371339 --- /dev/null +++ b/app/partitioner/partitioner.go @@ -0,0 +1,6 @@ +package partitioner + +type Partitioner interface { + GetRange(lockIncrement int64) (int64, int64, error) + ExpireRange(lowerBound int64, upperBound int64, ordinal int) +} diff --git a/app/partitioner/range_partitioner.go b/app/partitioner/range_partitioner.go new file mode 100644 index 0000000..78db73c --- /dev/null +++ b/app/partitioner/range_partitioner.go @@ -0,0 +1,53 @@ +package partitioner + +import ( + "github.com/hashicorp/memberlist" + "math" + "sort" +) + +type RangePartitioner struct { + Nodes *memberlist.Memberlist + ringCache *TimedRingCache +} + +func NewRangePartitioner(list *memberlist.Memberlist, ringCache *TimedRingCache) *RangePartitioner { + return &RangePartitioner{ + Nodes: list, + ringCache: ringCache, + } +} + +func (r *RangePartitioner) GetRange(lockIncrement int64) (int64, int64, int, error) { + myPosition, nodeCount := r.getMyPosition() + + // Calculate the range that our node is responsible for + // Do this every time, so that we don't cache bad values if the + // cluster changes state + nodeKeyspaceSize := math.MaxInt64 / nodeCount + nodeBottom := myPosition * nodeKeyspaceSize + nodeTop := (myPosition + 1) * nodeKeyspaceSize + + // Find the nearest locked range, if one exists + lowerBound, upperBound := r.ringCache.ReserveRange(nodeBottom, nodeTop, lockIncrement) + + // Return the values + return lowerBound, upperBound, 0, nil +} + +func (r *RangePartitioner) ExpireRange(lowerBound int64, upperBound int64, ordinal int) { + r.ringCache.ExpireRange(lowerBound, upperBound) +} + +func (r *RangePartitioner) getMyPosition() (int64, int64) { + nodes := r.Nodes.Members() + var nodeNames []string + for _, node := range nodes { + nodeNames = append(nodeNames, node.Name) + } + + sort.Strings(nodeNames) + + myPos := sort.SearchStrings(nodeNames, r.Nodes.LocalNode().Name) + return int64(myPos), int64(len(nodeNames)) +} diff --git a/app/partitioner/ring_cache.go b/app/partitioner/ring_cache.go new file mode 100644 index 0000000..e9ab77c --- /dev/null +++ b/app/partitioner/ring_cache.go @@ -0,0 +1,161 @@ +package partitioner + +import ( + "github.com/Sirupsen/logrus" + "sync" + "time" +) + +type RingEntry struct { + value int64 + timeStamp time.Time +} + +type TimedRingCache struct { + sync.Mutex + ring []RingEntry + expirationDuration time.Duration + gcInterval time.Duration +} + +func NewTimedRingCache(expirationDuration time.Duration, gcInterval time.Duration) *TimedRingCache { + trc := &TimedRingCache{ + expirationDuration: expirationDuration, + gcInterval: gcInterval, + } + trc.beginExpiring() + return trc +} + +func (rc *TimedRingCache) push(value int64, timeStamp time.Time) { + entry := RingEntry{ + value: value, + timeStamp: timeStamp, + } + + rc.ring = append(rc.ring, entry) +} + +func (rc *TimedRingCache) LockedRange() (int64, int64) { + rc.Lock() + defer rc.Unlock() + if len(rc.ring) >= 1 { + return rc.ring[0].value, rc.ring[len(rc.ring)-1].value + } else { + return 0, 0 + } +} + +func (rc *TimedRingCache) ReserveRange(lowerLimit int64, upperLimit int64, rangeIncrease int64) (int64, int64) { + rc.Lock() + defer rc.Unlock() + + var lowerBound int64 = 0 + var upperBound int64 = 0 + + if len(rc.ring) == 0 { + logrus.Info("LEN 0!!!!!!") + // First entry, they get what they want + lowerBound = lowerLimit + upperBound = lowerBound + rangeIncrease + if upperBound > upperLimit { + logrus.Info("UPPERBOUND > UPPERLIMIT!!!!!!") + upperBound = upperLimit + } + logrus.Info("ABOUT TO PUSH", upperBound) + rc.push(upperBound, time.Now()) + return lowerBound, upperBound + } + + head := rc.ring[len(rc.ring)-1].value + tail := rc.ring[0].value + + if len(rc.ring) == 1 { + logrus.Info("LEN 1!!!!!!", head, tail) + lowerBound = head + 1 + // + if head == upperLimit { + lowerBound = lowerLimit + } + upperBound = lowerBound + rangeIncrease + if upperBound > upperLimit { + logrus.Info("UPPERBOUND > UPPERLIMIT!!!!!!") + upperBound = upperLimit + } + } else { + // Tail will likely never be "lowerLimit", but it will possibly be the amount + // of the rangeIncrease +1 due to what happens above + if head == upperLimit && (tail == lowerLimit || tail == rangeIncrease+1) { + // fully locked + logrus.Info("FULLY LOCKED!!!!!!!") + return -1, -1 + } + + if head < tail || (head > tail && head == upperLimit) { + logrus.Info("HEAD < TAIL!!!!!", head, tail) + // We were uppbounded, the tail started to fall off, we can loop the tail ranges + // back onto the head. + // Grow from head + 1 to to tail - 1, by rangeIncrease + + //This isn't working for the following case: we've hit upperLimit + // but haven't yet begun wrapping. The head == upperlimit gets it close + // still seeing 0,0 ranges, ranges that aren't right (off by -1) + lowerBound = head + 1 + if head == upperLimit { + lowerBound = lowerLimit + } + upperBound = lowerBound + rangeIncrease + if upperBound > tail-1 { + logrus.Info("UPPERBOUND > TAIL-1!!!!!") + upperBound = tail - 1 + } + } else if head > tail { + logrus.Info("HEAD > TAIL!!!!!", head, tail) + // There was room at the head for future growth + // Grow from head + 1 to upperLimit by rangeIncrease + lowerBound = head + 1 + upperBound = lowerBound + rangeIncrease + if upperBound > upperLimit { + logrus.Info("UPPERBOUND > UPPERLIMIT!!!!!") + upperBound = upperLimit + } + } + } + logrus.Info("ABOUT TO PUSH", upperBound) + rc.push(upperBound, time.Now()) + return lowerBound, upperBound +} + +func (rc *TimedRingCache) ExpireRange(lowerBound int64, upperBound int64) { + rc.Lock() + defer rc.Unlock() + position := -1 + for i, x := range rc.ring { + if x.value == upperBound { + position = i + } + } + + if position != -1 { + rc.ring[position].timeStamp = time.Time{} + } +} + +func (rc *TimedRingCache) beginExpiring() { + go func() { + for _ = range time.Tick(rc.gcInterval) { + logrus.Info("Checking expiration...", time.Now(), len(rc.ring)) + if len(rc.ring) > 0 && time.Since(rc.ring[0].timeStamp) >= rc.expirationDuration { + rc.Lock() + // This is effectively a pop off of the head, pos 0, of a slice + for len(rc.ring) > 0 && time.Since(rc.ring[0].timeStamp) >= rc.expirationDuration { + // If the expirationDuration has expired, pop it + // Keep going until nothing is expired or we're at 0 + rc.ring = append(rc.ring[:0], rc.ring[1:]...) + logrus.Info("Expiration Occured!") + } + rc.Unlock() + } + } + }() +} diff --git a/app/partitions.go b/app/partitions.go index f8c85e2..f71f7f4 100644 --- a/app/partitions.go +++ b/app/partitions.go @@ -41,6 +41,7 @@ func InitPartitions(cfg *Config, queueName string) *Partitions { func (part *Partitions) PartitionCount() int { return part.partitionCount } + func (part *Partitions) GetPartition(cfg *Config, queueName string, list *memberlist.Memberlist) (int, int, *Partition, error) { //get the node position and the node count nodePosition, nodeCount := getNodePosition(list) @@ -114,6 +115,7 @@ func (part *Partitions) getPartitionPosition(cfg *Config, queueName string) (int } return myPartition, workingPartition, part.partitionCount, err } + func (part *Partitions) PushPartition(cfg *Config, queueName string, partition *Partition, lock bool) { if lock { partition.LastUsed = time.Now() diff --git a/dynamiq.go b/dynamiq.go index 35bdd0f..d2aa487 100644 --- a/dynamiq.go +++ b/dynamiq.go @@ -4,6 +4,8 @@ import ( "flag" "github.com/Sirupsen/logrus" "github.com/Tapjoy/dynamiq/app" + "github.com/Tapjoy/dynamiq/app/partitioner" + "time" ) func main() { @@ -22,5 +24,31 @@ func main() { http_api := app.HTTP_API_V1{} + timedRingCache := partitioner.NewTimedRingCache(time.Second*1, time.Millisecond*1000) + + go func() { + for i := 0; i < 35; i++ { + x, y := timedRingCache.ReserveRange(1, 200, 7) + logrus.Info("AAAAAAAAA Range is ", x, y) + time.Sleep(time.Millisecond * 100) + } + }() + + go func() { + for i := 0; i < 35; i++ { + x, y := timedRingCache.ReserveRange(1, 200, 7) + logrus.Info("BBBBBBBBBB Range is ", x, y) + time.Sleep(time.Millisecond * 100) + } + }() + + go func() { + for i := 0; i < 35; i++ { + x, y := timedRingCache.ReserveRange(1, 200, 7) + logrus.Info("CCCCCCCCCCCC Range is ", x, y) + time.Sleep(time.Millisecond * 100) + } + }() + http_api.InitWebserver(list, cfg) } From 64cdd6d892be778d6ef868f61b8583c1dafeb746 Mon Sep 17 00:00:00 2001 From: StabbyCutyou Date: Mon, 30 Mar 2015 08:56:59 -0400 Subject: [PATCH 2/2] More work --- app/config.go | 27 ++++++++++++----- app/httpinterface_v1.go | 12 ++++---- app/partitioner/ring_cache.go | 2 +- app/queue.go | 26 +++++++++------- dynamiq.go | 56 ++++++++++++++++------------------- 5 files changed, 67 insertions(+), 56 deletions(-) diff --git a/app/config.go b/app/config.go index f969848..1cf7e81 100644 --- a/app/config.go +++ b/app/config.go @@ -5,7 +5,9 @@ import ( "errors" "fmt" "github.com/Sirupsen/logrus" + "github.com/Tapjoy/dynamiq/app/partitioner" "github.com/Tapjoy/dynamiq/app/stats" + "github.com/hashicorp/memberlist" "github.com/tpjg/goriakpbc" "math/rand" "strconv" @@ -36,6 +38,7 @@ type Config struct { Queues *Queues RiakPool *riak.Client Topics *Topics + Nodes *memberlist.Memberlist } type Core struct { @@ -75,6 +78,8 @@ func GetCoreConfig(config_file *string) (*Config, error) { logrus.Fatal(err) } + cfg.Nodes = InitMember(&cfg) + logrus.Info("Nodes ?", cfg.Nodes) cfg.RiakPool = initRiakPool(&cfg) cfg.Queues = loadQueuesConfig(&cfg) switch cfg.Stats.Type { @@ -120,11 +125,15 @@ func loadQueuesConfig(cfg *Config) *Queues { name := string(elem[:]) // Get the Riak RdtMap of settings for this queue configMap, _ := configBucket.FetchMap(queueConfigRecordName(name)) + visTimeout, _ := cfg.GetVisibilityTimeout(name) + ringCache := partitioner.NewTimedRingCache(time.Second*time.Duration(visTimeout), time.Second*1) + // Pre-warm the settings object queue := &Queue{ - Name: name, - Config: configMap, - Parts: InitPartitions(cfg, name), + Name: name, + Config: configMap, + Partitioner: partitioner.NewRangePartitioner(cfg.Nodes, ringCache), + Parts: InitPartitions(cfg, name), } // TODO: We should be handling errors here // Set the queue in the queue map @@ -143,11 +152,15 @@ func (cfg *Config) InitializeQueue(queueName string) error { } // Add to the known set of queues err = cfg.addToKnownQueues(queueName) + visTimeout, _ := cfg.GetVisibilityTimeout(queueName) + ringCache := partitioner.NewTimedRingCache(time.Second*time.Duration(visTimeout), time.Second*1) + // Now, add the queue into our memory-cache of data cfg.Queues.QueueMap[queueName] = &Queue{ - Name: queueName, - Parts: InitPartitions(cfg, queueName), - Config: configMap, + Name: queueName, + Parts: InitPartitions(cfg, queueName), + Partitioner: partitioner.NewRangePartitioner(cfg.Nodes, ringCache), + Config: configMap, } return err } @@ -254,7 +267,7 @@ func (cfg *Config) getQueueSetting(paramName string, queueName string) (string, } } } - + if value == "" { // Read from riak client := cfg.RiakConnection() diff --git a/app/httpinterface_v1.go b/app/httpinterface_v1.go index d9d6688..bc4be73 100644 --- a/app/httpinterface_v1.go +++ b/app/httpinterface_v1.go @@ -5,7 +5,6 @@ import ( "fmt" "github.com/Sirupsen/logrus" "github.com/go-martini/martini" - "github.com/hashicorp/memberlist" "github.com/martini-contrib/binding" "github.com/martini-contrib/render" "net/http" @@ -67,12 +66,12 @@ func dynamiqMartini(cfg *Config) *martini.ClassicMartini { type HTTP_API_V1 struct { } -func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { +func (h HTTP_API_V1) InitWebserver(cfg *Config) { // tieing our Queue to HTTP interface == bad we should move this somewhere else // Queues.Queues is dumb. Need a better name-chain queues := cfg.Queues // also tieing topics this is next for refactor - topics := InitTopics(cfg, queues) + topics := InitTopics(cfg, queues) cfg.Topics = &topics m := dynamiqMartini(cfg) m.Use(render.Renderer()) @@ -82,7 +81,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { // STATUS / STATISTICS API BLOCK m.Get("/status/servers", func() string { return_string := "" - for _, member := range list.Members() { + for _, member := range cfg.Nodes.Members() { return_string += fmt.Sprintf("Member: %s %s\n", member.Name, member.Addr) } return return_string @@ -102,10 +101,9 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { } }) - m.Delete("/queues/:queue", func(r render.Render, params martini.Params) { var present bool - _, present = queues.QueueMap[params["queue"]] + _, present = queues.QueueMap[params["queue"]] if present == true { queues.DeleteQueue(params["queue"], cfg) deleted := true @@ -290,7 +288,7 @@ func (h HTTP_API_V1) InitWebserver(list *memberlist.Memberlist, cfg *Config) { if batchSize <= 0 { r.JSON(422, fmt.Sprint("Batchsizes must be non-negative integers greater than 0")) } - messages, err := queues.QueueMap[params["queue"]].Get(cfg, list, batchSize) + messages, err := queues.QueueMap[params["queue"]].Get(cfg, cfg.Nodes, batchSize) if err != nil && err.Error() != NOPARTITIONS { // We're choosing to ignore nopartitions issues for now and treat them as normal 200s diff --git a/app/partitioner/ring_cache.go b/app/partitioner/ring_cache.go index e9ab77c..74b643a 100644 --- a/app/partitioner/ring_cache.go +++ b/app/partitioner/ring_cache.go @@ -144,7 +144,7 @@ func (rc *TimedRingCache) ExpireRange(lowerBound int64, upperBound int64) { func (rc *TimedRingCache) beginExpiring() { go func() { for _ = range time.Tick(rc.gcInterval) { - logrus.Info("Checking expiration...", time.Now(), len(rc.ring)) + //logrus.Info("Checking expiration...", time.Now(), len(rc.ring)) if len(rc.ring) > 0 && time.Since(rc.ring[0].timeStamp) >= rc.expirationDuration { rc.Lock() // This is effectively a pop off of the head, pos 0, of a slice diff --git a/app/queue.go b/app/queue.go index 98c634b..c871d23 100644 --- a/app/queue.go +++ b/app/queue.go @@ -4,6 +4,7 @@ import ( "crypto/rand" "fmt" "github.com/Sirupsen/logrus" + "github.com/Tapjoy/dynamiq/app/partitioner" "github.com/Tapjoy/dynamiq/app/stats" "github.com/hashicorp/memberlist" "github.com/tpjg/goriakpbc" @@ -39,7 +40,8 @@ type Queue struct { // name of the queue Name string // the partitions of the queue - Parts *Partitions + Parts *Partitions + Partitioner *partitioner.RangePartitioner // Individual settings for the queue Config *riak.RDtMap // Mutex for protecting rw access to the Config object @@ -148,13 +150,13 @@ func (queue *Queue) Get(cfg *Config, list *memberlist.Memberlist, batchsize int6 } // get the top and bottom partitions - partBottom, partTop, partition, err := queue.Parts.GetPartition(cfg, queue.Name, list) - + //partBottom, partTop, partition, err := queue.Parts.GetPartition(cfg, queue.Name, list) + partBottom, partTop, partition, err := queue.Partitioner.GetRange(5000000) if err != nil { return nil, err } //get a list of batchsize message ids - messageIds, _, err := bucket.IndexQueryRangePage("id_int", strconv.Itoa(partBottom), strconv.Itoa(partTop), uint32(batchsize), "") + messageIds, _, err := bucket.IndexQueryRangePage("id_int", strconv.FormatInt(partBottom, 10), strconv.FormatInt(partTop, 10), uint32(batchsize), "") defer queue.setQueueDepthApr(cfg.Stats.Client, list, queue.Name, messageIds) if err != nil { @@ -165,9 +167,10 @@ func (queue *Queue) Get(cfg *Config, list *memberlist.Memberlist, batchsize int6 // return the partition to the parts heap, but only lock it when we have messages if messageCount > 0 { - defer queue.Parts.PushPartition(cfg, queue.Name, partition, true) + //defer queue.Parts.PushPartition(cfg, queue.Name, partition, true) } else { - defer queue.Parts.PushPartition(cfg, queue.Name, partition, false) + //defer queue.Parts.PushPartition(cfg, queue.Name, partition, false) + defer queue.Partitioner.ExpireRange(partBottom, partTop, partition) } defer incrementReceiveCount(cfg.Stats.Client, queue.Name, messageCount) defer recordFillRatio(cfg.Stats.Client, queue.Name, batchsize, messageCount) @@ -380,11 +383,14 @@ func initQueueFromRiak(cfg *Config, queueName string) { bucket, _ := client.NewBucketType("maps", CONFIGURATION_BUCKET) config, _ := bucket.FetchMap(queueConfigRecordName(queueName)) - + visTimeout, _ := cfg.GetVisibilityTimeout(queueName) + ringCache := partitioner.NewTimedRingCache(time.Second*time.Duration(visTimeout), time.Second*1) + logrus.Info("Nodes??", cfg.Nodes) queue := Queue{ - Name: queueName, - Parts: InitPartitions(cfg, queueName), - Config: config, + Name: queueName, + Parts: InitPartitions(cfg, queueName), + Partitioner: partitioner.NewRangePartitioner(cfg.Nodes, ringCache), + Config: config, } // This is adding a new member to the collection, it shouldn't need a lock? diff --git a/dynamiq.go b/dynamiq.go index d2aa487..c2e3c9e 100644 --- a/dynamiq.go +++ b/dynamiq.go @@ -4,8 +4,6 @@ import ( "flag" "github.com/Sirupsen/logrus" "github.com/Tapjoy/dynamiq/app" - "github.com/Tapjoy/dynamiq/app/partitioner" - "time" ) func main() { @@ -20,35 +18,31 @@ func main() { } logrus.SetLevel(cfg.Core.LogLevel) - list := app.InitMember(cfg) - http_api := app.HTTP_API_V1{} - timedRingCache := partitioner.NewTimedRingCache(time.Second*1, time.Millisecond*1000) - - go func() { - for i := 0; i < 35; i++ { - x, y := timedRingCache.ReserveRange(1, 200, 7) - logrus.Info("AAAAAAAAA Range is ", x, y) - time.Sleep(time.Millisecond * 100) - } - }() - - go func() { - for i := 0; i < 35; i++ { - x, y := timedRingCache.ReserveRange(1, 200, 7) - logrus.Info("BBBBBBBBBB Range is ", x, y) - time.Sleep(time.Millisecond * 100) - } - }() - - go func() { - for i := 0; i < 35; i++ { - x, y := timedRingCache.ReserveRange(1, 200, 7) - logrus.Info("CCCCCCCCCCCC Range is ", x, y) - time.Sleep(time.Millisecond * 100) - } - }() - - http_api.InitWebserver(list, cfg) + //go func() { + // for i := 0; i < 35; i++ { + // x, y := timedRingCache.ReserveRange(1, 200, 7) + // logrus.Info("AAAAAAAAA Range is ", x, y) + // time.Sleep(time.Millisecond * 100) + // } + //}() + // + //go func() { + // for i := 0; i < 35; i++ { + // x, y := timedRingCache.ReserveRange(1, 200, 7) + // logrus.Info("BBBBBBBBBB Range is ", x, y) + // time.Sleep(time.Millisecond * 100) + // } + //}() + // + //go func() { + // for i := 0; i < 35; i++ { + // x, y := timedRingCache.ReserveRange(1, 200, 7) + // logrus.Info("CCCCCCCCCCCC Range is ", x, y) + // time.Sleep(time.Millisecond * 100) + // } + //}() + + http_api.InitWebserver(cfg) }