diff --git a/app/partitions.go b/app/partitions.go index f8c85e2..7b8694e 100644 --- a/app/partitions.go +++ b/app/partitions.go @@ -158,28 +158,29 @@ func (part *Partitions) syncPartitions(cfg *Config, queueName string) { } // Partition Aging logic - // pop a partition - var workingPartition *Partition - poppedPartition, _ := part.partitions.Pop() - if poppedPartition != nil { - workingPartition = poppedPartition.(*Partition) - } else { - // this seems a little scary. we do a similiar thing in getPartitionPosition - return - } - part.partitionCount = part.partitionCount - 1 - - // check if the partition is older than the max age ( but not a fresh partition ) - // if true pop the next partition, continue until this condition - for time.Since(workingPartition.LastUsed).Seconds() > maxPartitionAge && part.partitionCount >= minPartitions { - poppedPartition, _ = part.partitions.Pop() - if poppedPartition != nil { - workingPartition = poppedPartition.(*Partition) + + if part.partitionCount >= minPartitions { + // Begin emptying the partitions + + var partArray = make([]*Partition, 0) + for k := 0; k < part.partitionCount; k++ { + var poppedPartition, _ = part.partitions.Pop() + if poppedPartition != nil { + var checkPart = poppedPartition.(*Partition) + if time.Since(checkPart.LastUsed).Seconds() < maxPartitionAge { + // If it hasn't expired, append to our list of parts + checkPart.Id = len(partArray) + 1 + partArray = append(partArray, checkPart) + } + } } - part.partitionCount = part.partitionCount - 1 + + for i := 0; i < len(partArray); i++ { + part.partitions.Push(partArray[i], partArray[i].LastUsed.UnixNano()) + } + // Adjust part.partitionCount + part.partitionCount = len(partArray) } - //when false push the last popped partition - part.partitions.Push(workingPartition, workingPartition.LastUsed.UnixNano()) - part.partitionCount = part.partitionCount + 1 + part.Unlock() }