From aa39eab76ab36e16b6369297c4585b0611adbe9e Mon Sep 17 00:00:00 2001 From: SimonXming Date: Tue, 1 Aug 2017 18:32:36 +0800 Subject: [PATCH 1/4] Add basic support for redis queue. --- redis/opts.go | 37 +++++++++++ redis/redis.go | 173 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 210 insertions(+) create mode 100644 redis/opts.go create mode 100644 redis/redis.go diff --git a/redis/opts.go b/redis/opts.go new file mode 100644 index 0000000..d6b9edc --- /dev/null +++ b/redis/opts.go @@ -0,0 +1,37 @@ +package redis + +// Options defines Google Cloud Pubsub options. +type Options struct { + addr string + password string + db int + queueName string +} + +// Option configures the Google Cloud pubsub client. +type Option func(*Options) + +// WithProject configures the Pubsub client with the named project. +func WithRedisAddr(addr string) Option { + return func(opts *Options) { + opts.addr = addr + } +} + +func WithRedisPassword(password string) Option { + return func(opts *Options) { + opts.password = password + } +} + +func WithRedisDB(db int) Option { + return func(opts *Options) { + opts.db = db + } +} + +func WithRedisQueueName(queueName string) Option { + return func(opts *Options) { + opts.queueName = queueName + } +} diff --git a/redis/redis.go b/redis/redis.go new file mode 100644 index 0000000..de55acb --- /dev/null +++ b/redis/redis.go @@ -0,0 +1,173 @@ +package redis + +import ( + "context" + "encoding/json" + "errors" + "github.com/go-redis/redis" + "sync" + "time" + + "github.com/cncd/queue" +) + +const POP_TIMEOUT = 0 // 0 == Blocking forever + +var ErrDeadLine = errors.New("queue: deadline received") + +type entry struct { + item *queue.Task + done chan bool + retry int + error error + deadline time.Time +} + +type conn struct { + sync.Mutex + + opts *Options + client *redis.Client + running map[string]*entry + extension time.Duration +} + +func New(opts ...Option) (queue.Queue, error) { + conn := new(conn) + // init running + conn.running = map[string]*entry{} + conn.extension = time.Minute * 10 + // conn.extension = time.Second * 10 + + conn.opts = new(Options) + conn.opts.queueName = "task-queue" + for _, opt := range opts { + opt(conn.opts) + } + + conn.client = redis.NewClient(&redis.Options{ + Addr: conn.opts.addr, + Password: conn.opts.password, // no password set + DB: conn.opts.db, // use default DB + }) + + return conn, nil +} + +// Push pushes an task to the tail of this queue. +// 1. Task => Undone list(Redis) +func (c *conn) Push(ctx context.Context, task *queue.Task) error { + taskRaw, err := json.Marshal(task) + if err != nil { + return err + } + err = c.client.LPush(c.opts.queueName, taskRaw).Err() + if err != nil { + return err + } + go c.tracking() + return nil +} + +// 2. Undone list(Redis) => Task +func (c *conn) Poll(ctx context.Context, f queue.Filter) (*queue.Task, error) { + result, err := c.client.BLPop(POP_TIMEOUT, c.opts.queueName).Result() + if err != nil { + return nil, err + } + taskRawData := result[1] + + task := new(queue.Task) + err = json.Unmarshal([]byte(taskRawData), task) + if err != nil { + return nil, err + } + c.running[task.ID] = &entry{ + item: task, + done: make(chan bool), + deadline: time.Now().Add(c.extension), + } + + go c.tracking() + return task, nil +} + +// Extend extends the deadline for a task. +func (c *conn) Extend(ctx context.Context, id string) error { + c.Lock() + defer c.Unlock() + + task, ok := c.running[id] + if ok { + task.deadline = time.Now().Add(c.extension) + return nil + } + return queue.ErrNotFound +} + +// Done signals the task is complete. +func (c *conn) Done(ctx context.Context, id string) error { + return c.Error(ctx, id, nil) +} + +// Error signals the task is complete with errors. +func (c *conn) Error(ctx context.Context, id string, err error) error { + c.Lock() + task, ok := c.running[id] + if ok { + task.error = err + close(task.done) + delete(c.running, id) + } + c.Unlock() + return nil +} + +// Evict removes a pending task from the queue. +func (c *conn) Evict(ctx context.Context, id string) error { + return nil +} + +// Wait waits until the task is complete. +// 3. Return error when task is done +func (c *conn) Wait(ctx context.Context, id string) error { + c.Lock() + task, ok := c.running[id] + c.Unlock() + if ok { + select { + case <-ctx.Done(): + case <-task.done: + return task.error + } + } + return nil +} + +// Info returns internal queue information. +func (c *conn) Info(ctx context.Context) queue.InfoT { + c.Lock() + stats := queue.InfoT{} + stats.Stats.Running = len(c.running) + for _, entry := range c.running { + stats.Running = append(stats.Running, entry.item) + } + c.Unlock() + return stats +} + +// every call this method will checking if task.deadline is arrived. +func (c *conn) tracking() { + c.Lock() + defer c.Unlock() + + // TODO(bradrydzewski) move this to a helper function + // push items to the front of the queue if the item expires. + for id, task := range c.running { + if time.Now().After(task.deadline) { + task.error = ErrDeadLine + delete(c.running, id) + close(task.done) + } + } +} From 0d72213e3e263ee0373d22a5e6668317b844550c Mon Sep 17 00:00:00 2001 From: SimonXming Date: Thu, 3 Aug 2017 13:00:02 +0800 Subject: [PATCH 2/4] [feature] sync running state in redis. --- redis/opts.go | 29 +++++++++++------- redis/redis.go | 79 +++++++++++++++++++++++++++++++++++++------------- 2 files changed, 77 insertions(+), 31 deletions(-) diff --git a/redis/opts.go b/redis/opts.go index d6b9edc..df30fdf 100644 --- a/redis/opts.go +++ b/redis/opts.go @@ -1,37 +1,44 @@ package redis -// Options defines Google Cloud Pubsub options. +// Options defines Redis queue options. type Options struct { - addr string - password string - db int - queueName string + addr string + password string + db int + penddingQueueName string + hostIdentity string // For saving multiple-server running state in different redis key } -// Option configures the Google Cloud pubsub client. +// Option configures the Redis client. type Option func(*Options) // WithProject configures the Pubsub client with the named project. -func WithRedisAddr(addr string) Option { +func WithAddr(addr string) Option { return func(opts *Options) { opts.addr = addr } } -func WithRedisPassword(password string) Option { +func WithPassword(password string) Option { return func(opts *Options) { opts.password = password } } -func WithRedisDB(db int) Option { +func WithDB(db int) Option { return func(opts *Options) { opts.db = db } } -func WithRedisQueueName(queueName string) Option { +func WithPenddingQueueName(queueName string) Option { return func(opts *Options) { - opts.queueName = queueName + opts.penddingQueueName = queueName + } +} + +func WithHostIdentity(identity string) Option { + return func(opts *Options) { + opts.hostIdentity = identity } } diff --git a/redis/redis.go b/redis/redis.go index de55acb..d0255f7 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -4,7 +4,9 @@ import ( "context" "encoding/json" "errors" + "fmt" "github.com/go-redis/redis" + "log" "sync" "time" @@ -16,11 +18,11 @@ const POP_TIMEOUT = 0 // 0 == Blocking forever var ErrDeadLine = errors.New("queue: deadline received") type entry struct { - item *queue.Task - done chan bool - retry int - error error - deadline time.Time + Item *queue.Task `json:"item"` + done chan bool `json:"done"` + Retry int `json:"retry"` + Error error `json:"error"` + Deadline time.Time `json:"deadline"` } type conn struct { @@ -37,20 +39,35 @@ func New(opts ...Option) (queue.Queue, error) { // init running conn.running = map[string]*entry{} conn.extension = time.Minute * 10 - // conn.extension = time.Second * 10 conn.opts = new(Options) - conn.opts.queueName = "task-queue" + conn.opts.hostIdentity = "host01" + conn.opts.penddingQueueName = "pendding-queue" for _, opt := range opts { opt(conn.opts) } conn.client = redis.NewClient(&redis.Options{ Addr: conn.opts.addr, - Password: conn.opts.password, // no password set - DB: conn.opts.db, // use default DB + Password: conn.opts.password, + DB: conn.opts.db, }) + runningQueueKey := fmt.Sprintf("%s:running:queue", conn.opts.hostIdentity) + + allRunning, err := conn.client.HGetAll(runningQueueKey).Result() + if err != nil { + return nil, err + } + for _, taskRaw := range allRunning { + runningTask := new(entry) + err = json.Unmarshal([]byte(taskRaw), runningTask) + if err != nil { + return nil, err + } + runningTask.done = make(chan bool) + conn.running[runningTask.Item.ID] = runningTask + } return conn, nil } @@ -61,7 +78,7 @@ func (c *conn) Push(ctx context.Context, task *queue.Task) error { if err != nil { return err } - err = c.client.LPush(c.opts.queueName, taskRaw).Err() + err = c.client.LPush(c.opts.penddingQueueName, taskRaw).Err() if err != nil { return err } @@ -71,7 +88,7 @@ func (c *conn) Push(ctx context.Context, task *queue.Task) error { // 2. Undone list(Redis) => Task func (c *conn) Poll(ctx context.Context, f queue.Filter) (*queue.Task, error) { - result, err := c.client.BLPop(POP_TIMEOUT, c.opts.queueName).Result() + result, err := c.client.BRPop(POP_TIMEOUT, c.opts.penddingQueueName).Result() if err != nil { return nil, err } @@ -83,9 +100,20 @@ func (c *conn) Poll(ctx context.Context, f queue.Filter) (*queue.Task, error) { return nil, err } c.running[task.ID] = &entry{ - item: task, + Item: task, done: make(chan bool), - deadline: time.Now().Add(c.extension), + Deadline: time.Now().Add(c.extension), + } + + taskRaw, err := json.Marshal(c.running[task.ID]) + if err != nil { + return nil, err + } + runningQueueKey := fmt.Sprintf("%s:running:queue", c.opts.hostIdentity) + runningTaskKey := fmt.Sprintf("running:task:%s", task.ID) + err = c.client.HSet(runningQueueKey, runningTaskKey, taskRaw).Err() + if err != nil { + return nil, err } go c.tracking() @@ -99,7 +127,7 @@ func (c *conn) Extend(ctx context.Context, id string) error { task, ok := c.running[id] if ok { - task.deadline = time.Now().Add(c.extension) + task.Deadline = time.Now().Add(c.extension) return nil } return queue.ErrNotFound @@ -115,9 +143,10 @@ func (c *conn) Error(ctx context.Context, id string, err error) error { c.Lock() task, ok := c.running[id] if ok { - task.error = err + task.Error = err close(task.done) delete(c.running, id) + c.deleteTaskFromRunningQueue(id) } c.Unlock() return nil @@ -138,7 +167,7 @@ func (c *conn) Wait(ctx context.Context, id string) error { select { case <-ctx.Done(): case <-task.done: - return task.error + return task.Error } } return nil @@ -150,7 +179,7 @@ func (c *conn) Info(ctx context.Context) queue.InfoT { stats := queue.InfoT{} stats.Stats.Running = len(c.running) for _, entry := range c.running { - stats.Running = append(stats.Running, entry.item) + stats.Running = append(stats.Running, entry.Item) } c.Unlock() return stats @@ -164,10 +193,20 @@ func (c *conn) tracking() { // TODO(bradrydzewski) move this to a helper function // push items to the front of the queue if the item expires. for id, task := range c.running { - if time.Now().After(task.deadline) { - task.error = ErrDeadLine - delete(c.running, id) + if time.Now().After(task.Deadline) { + task.Error = ErrDeadLine close(task.done) + delete(c.running, id) + c.deleteTaskFromRunningQueue(id) } } } + +func (c *conn) deleteTaskFromRunningQueue(taskID string) { + runningQueueKey := fmt.Sprintf("%s:running:queue", c.opts.hostIdentity) + runningTaskKey := fmt.Sprintf("running:task:%s", taskID) + err := c.client.HDel(runningQueueKey, runningTaskKey).Err() + if err != nil { + log.Printf("queue: delete %s key %s error: %v\n", runningQueueKey, runningTaskKey, err) + } +} From a9b4046ab76d5bb8cea2b6dfeb12b4bb6f061569 Mon Sep 17 00:00:00 2001 From: SimonXming Date: Thu, 3 Aug 2017 15:03:50 +0800 Subject: [PATCH 3/4] Add UnitTest. --- redis/redis.go | 15 +++-- redis/redis_test.go | 139 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 150 insertions(+), 4 deletions(-) create mode 100644 redis/redis_test.go diff --git a/redis/redis.go b/redis/redis.go index d0255f7..6c9769b 100644 --- a/redis/redis.go +++ b/redis/redis.go @@ -3,7 +3,6 @@ package redis import ( "context" "encoding/json" - "errors" "fmt" "github.com/go-redis/redis" "log" @@ -15,8 +14,6 @@ import ( const POP_TIMEOUT = 0 // 0 == Blocking forever -var ErrDeadLine = errors.New("queue: deadline received") - type entry struct { Item *queue.Task `json:"item"` done chan bool `json:"done"` @@ -178,6 +175,8 @@ func (c *conn) Info(ctx context.Context) queue.InfoT { c.Lock() stats := queue.InfoT{} stats.Stats.Running = len(c.running) + penddingLength, _ := c.client.LLen(c.opts.penddingQueueName).Result() + stats.Stats.Pending = int(penddingLength) for _, entry := range c.running { stats.Running = append(stats.Running, entry.Item) } @@ -194,7 +193,15 @@ func (c *conn) tracking() { // push items to the front of the queue if the item expires. for id, task := range c.running { if time.Now().After(task.Deadline) { - task.Error = ErrDeadLine + taskRaw, err := json.Marshal(task.Item) + if err != nil { + log.Printf("re-added to pending queue error: %v \n", err) + } + err = c.client.LPush(c.opts.penddingQueueName, taskRaw).Err() + if err != nil { + log.Printf("re-added to pending queue error: %v \n", err) + } + close(task.done) delete(c.running, id) c.deleteTaskFromRunningQueue(id) diff --git a/redis/redis_test.go b/redis/redis_test.go new file mode 100644 index 0000000..159c8f2 --- /dev/null +++ b/redis/redis_test.go @@ -0,0 +1,139 @@ +package redis + +import ( + "context" + "github.com/alicebob/miniredis" + "github.com/cncd/queue" + "sync" + "testing" + "time" +) + +var noContext = context.Background() + +func TestRedisQueue(t *testing.T) { + want := &queue.Task{ID: "1"} + + s, err := miniredis.Run() + if err != nil { + panic(err) + } + defer s.Close() + + q, err := New( + WithAddr(s.Addr()), + WithPassword(""), + WithDB(0), + WithPenddingQueueName("pending-queue"), + ) + + q.Push(noContext, want) + info := q.Info(noContext) + if info.Stats.Pending != 1 { + t.Errorf("expect task in pending queue") + return + } + + if !s.Exists("pending-queue") { + t.Fatal("'pending-queue' should not have existed anymore") + } + + got, _ := q.Poll(noContext, func(*queue.Task) bool { return true }) + if got.ID != want.ID { + t.Errorf("expect task returned form queue") + return + } + info = q.Info(noContext) + if info.Stats.Pending != 0 { + t.Errorf("expect task removed from pending queue") + return + } + if info.Stats.Running != 1 { + t.Errorf("expect task in running queue") + return + } + + q.Done(noContext, got.ID) + info = q.Info(noContext) + if info.Stats.Pending != 0 { + t.Errorf("expect task removed from pending queue") + return + } + if info.Stats.Running != 0 { + t.Errorf("expect task removed from running queue") + return + } + +} + +func TestRedisQueueExpire(t *testing.T) { + want := &queue.Task{ID: "1"} + + s, err := miniredis.Run() + if err != nil { + panic(err) + } + defer s.Close() + + qQ, err := New( + WithAddr(s.Addr()), + WithPassword(""), + WithDB(0), + WithPenddingQueueName("pending-queue"), + ) + q := qQ.(*conn) + q.extension = 0 + q.Push(noContext, want) + info := q.Info(noContext) + if info.Stats.Pending != 1 { + t.Errorf("expect task in pending queue") + return + } + + got, _ := q.Poll(noContext, func(*queue.Task) bool { return true }) + if got.ID != want.ID { + t.Errorf("expect task returned form queue") + return + } + + q.tracking() + if info.Stats.Pending != 1 { + t.Errorf("expect task re-added to pending queue") + return + } +} + +func TestRedisQueueWait(t *testing.T) { + want := &queue.Task{ID: "1"} + s, err := miniredis.Run() + if err != nil { + panic(err) + } + defer s.Close() + + qQ, err := New( + WithAddr(s.Addr()), + WithPassword(""), + WithDB(0), + WithPenddingQueueName("pending-queue"), + ) + q := qQ.(*conn) + q.Push(noContext, want) + + got, _ := q.Poll(noContext, func(*queue.Task) bool { return true }) + if got.ID != want.ID { + t.Errorf("expect task returned form queue") + return + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + q.Wait(noContext, got.ID) + wg.Done() + }() + + <-time.After(time.Millisecond) + q.Done(noContext, got.ID) + wg.Wait() +} From d56b306445bc06afebcb79215e3a1534a1351be1 Mon Sep 17 00:00:00 2001 From: SimonXming Date: Thu, 3 Aug 2017 17:59:11 +0800 Subject: [PATCH 4/4] Update redis/opts.go comments. --- redis/opts.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/redis/opts.go b/redis/opts.go index df30fdf..c4e528e 100644 --- a/redis/opts.go +++ b/redis/opts.go @@ -6,37 +6,41 @@ type Options struct { password string db int penddingQueueName string - hostIdentity string // For saving multiple-server running state in different redis key + hostIdentity string } // Option configures the Redis client. type Option func(*Options) -// WithProject configures the Pubsub client with the named project. +// WithAddr configures Redis address. func WithAddr(addr string) Option { return func(opts *Options) { opts.addr = addr } } +// WithPassword configures Redis password. func WithPassword(password string) Option { return func(opts *Options) { opts.password = password } } +// WithDB configures Redis DB. func WithDB(db int) Option { return func(opts *Options) { opts.db = db } } +// WithPenddingQueueName configures Redis key where saveing pendding tasks. func WithPenddingQueueName(queueName string) Option { return func(opts *Options) { opts.penddingQueueName = queueName } } +// WithHostIdentity saving multiple-server running task in different redis key. func WithHostIdentity(identity string) Option { return func(opts *Options) { opts.hostIdentity = identity