From 9b6b7f2587fed3dfa0a50eb031adeedd615551c4 Mon Sep 17 00:00:00 2001 From: Mike Crowe Date: Tue, 14 Jun 2016 00:05:56 -0400 Subject: [PATCH 1/5] Adding metadata support (data attached to job excluded from remove search) Added replace function to update future job --- index.js | 28 ++++++++++++++++++++++++++-- package.json | 2 +- readme.md | 45 ++++++++++++++++++++++++++++++++++++++++----- test/put.js | 14 ++++++++++++++ test/remove.js | 14 ++++++++++++++ test/replace.js | 22 ++++++++++++++++++++++ test/take.js | 21 +++++++++++++++++++++ 7 files changed, 138 insertions(+), 8 deletions(-) create mode 100644 test/replace.js diff --git a/index.js b/index.js index 24d3a71..53af79a 100644 --- a/index.js +++ b/index.js @@ -45,6 +45,7 @@ Yajob.prototype.put = function (attrs, opts) { opts = opts || {}; opts.schedule = opts.schedule || new Date(Date.now() + this._delay); opts.priority = opts.priority || 0; + opts.meta = opts.meta || {}; if (!Array.isArray(attrs)) { attrs = [attrs]; @@ -56,7 +57,8 @@ Yajob.prototype.put = function (attrs, opts) { attempts: 0, attrs, scheduledAt: opts.schedule, - priority: opts.priority + priority: opts.priority, + meta: opts.meta }; } @@ -65,6 +67,28 @@ Yajob.prototype.put = function (attrs, opts) { return jobs.then(c => c.insert(attrs.map(attrsToJob))); }; +Yajob.prototype.replace = function (attrs, opts) { + opts = opts || {}; + opts.schedule = opts.schedule || new Date(Date.now() + this._delay); + opts.priority = opts.priority || 0; + opts.meta = opts.meta || {}; + + function attrsToJob(attrs) { + return { + status: Yajob.status.new, + attempts: 0, + attrs, + scheduledAt: opts.schedule, + priority: opts.priority, + meta: opts.meta + }; + } + + const jobs = this._db.then(db => db.collection(this._tag)); + + return jobs.then(c => c.update({status: Yajob.status.new, attrs}, attrsToJob(attrs))); +}; + Yajob.prototype.take = function (count) { count = count || 1; @@ -111,7 +135,7 @@ Yajob.prototype.take = function (count) { try { for (let i = 0; i < batch.length; i++) { const job = batch[i]; - const done = yield job.attrs; + const done = yield Object.assign(job.attrs, job.meta); if (done === false) { const status = job.attempts < maxTrys ? Yajob.status.new : Yajob.status.failed; diff --git a/package.json b/package.json index ba7e106..971f952 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "yajob", - "version": "2.1.2", + "version": "2.1.3", "description": "Yet another job queue", "main": "index.js", "scripts": { diff --git a/readme.md b/readme.md index f8f97d5..3188b29 100644 --- a/readme.md +++ b/readme.md @@ -28,6 +28,41 @@ for (var mail of yield mails.take(100)) { Processed jobs removed from queue, when for-loop is ended or broken (either with `break` or exception). +### Updating pending events with metadata + +You may also attach metadata to future job and update as follows: + +```js +const yajob = require('yajob'); +const mails = yajob('localhost/queuedb') + .tag('mails'); + +var d = new Date(); +d.setHours(24,0,0,0); + +mails.put({ + from: 'floatdrop@gmail.com', + to: 'nodejs-dev@dev-null.com', +}, { + meta: {body: 'You have 1 new notification'}, + schedule: d +}); +// => Promise + +// Meanwhile, a new notification comes in + +mails.replace({ + from: 'floatdrop@gmail.com', + to: 'nodejs-dev@dev-null.com' +}, { + meta: {body: 'You have 2 new notification'}, + schedule: d +}); + +``` + +This will only send out a single email with the new body. + ### Skip jobs In some cases you will need to skip taken job. To do this pass into generator `false` value: @@ -55,13 +90,13 @@ const important = queue.tag('mail').sort({priority: -1}); Returns instance of queue, that stores data in MongoDB. -##### uri -Type: `String` +##### uri +Type: `String` MongoDB URI string. -##### options -Type: `Object` +##### options +Type: `Object` MongoDB [MongoClient.connect options](http://mongodb.github.io/node-mongodb-native/2.1/api/MongoClient.html). @@ -90,7 +125,7 @@ Returns `Promise` that resolves to a `Generator`, that will emit jobs one by one After all jobs are taken from batch - they are considered `done` and removed from queue. ##### count -Type: `Number` +Type: `Number` Default: `1` Maximum number of jobs to take from one batch request. diff --git a/test/put.js b/test/put.js index f7c8054..8d4cedf 100644 --- a/test/put.js +++ b/test/put.js @@ -15,6 +15,20 @@ test('put should add job to queue', async t => { } }); +test('put should add job to queue with meta', async t => { + const queueDb = await new QueueDb(); + const queue = yajob(queueDb.uri); + + try { + await queue.put({test: 'message'}, {meta: {param: 1}}); + const job = await queueDb.db.collection('default').find().toArray(); + t.same(job[0].attrs, {test: 'message'}); + t.same(job[0].meta, {param: 1}); + } finally { + await queueDb.close(); + } +}); + test('put take an Array as argument', async t => { const queueDb = await new QueueDb(); const queue = yajob(queueDb.uri); diff --git a/test/remove.js b/test/remove.js index 7685d15..06d4e29 100644 --- a/test/remove.js +++ b/test/remove.js @@ -15,3 +15,17 @@ test('removes job', async t => { await queueDb.close(); } }); + +test('removes job with meta', async t => { + const queueDb = await new QueueDb(); + const queue = yajob(queueDb.uri); + + try { + await queue.put({test: 'wow'}, {meta: {param: 1}}); + await queue.remove({test: 'wow'}); + const jobs = await queueDb.db.collection('default').find().toArray(); + t.is(jobs.length, 0, 'should remove job from queue'); + } finally { + await queueDb.close(); + } +}); diff --git a/test/replace.js b/test/replace.js new file mode 100644 index 0000000..915445e --- /dev/null +++ b/test/replace.js @@ -0,0 +1,22 @@ +import test from 'ava'; +import yajob from '..'; +import {QueueDb} from './_utils'; + +test('replace should add job to queue then update it', async t => { + const queueDb = await new QueueDb(); + const queue = yajob(queueDb.uri); + + try { + await queue.put({test: 'message'}, {meta: {param: 1}}); + const job = await queueDb.db.collection('default').find().toArray(); + t.same(job[0].attrs, {test: 'message'}); + t.same(job[0].meta, {param: 1}); + await queue.replace({test: 'message'}, {meta: {param: 2}}); + const job2 = await queueDb.db.collection('default').find().toArray(); + t.same(job2[0].attrs, {test: 'message'}); + t.same(job2[0].meta, {param: 2}); + } finally { + await queueDb.close(); + } +}); + diff --git a/test/take.js b/test/take.js index f8ec1a7..aeec23d 100644 --- a/test/take.js +++ b/test/take.js @@ -22,6 +22,27 @@ test('take one', async t => { } }); +test('take one with meta', async t => { + const queueDb = await new QueueDb(); + const queue = yajob(queueDb.uri); + + try { + await queue.put({test: 'wow'}, {meta: {param: 1}}); + + const promise = queue.take(); + t.is(typeof promise.then, 'function', 'should return a Promise'); + + const taken = Array.from(await promise); + console.log('TAKEN: ', taken); + t.same(taken, [{test: 'wow', param: 1}]); + + const jobs = Array.from(queue.take()); + t.is(jobs.length, 0, 'should remove job from queue'); + } finally { + await queueDb.close(); + } +}); + test('take two', async t => { const queueDb = await new QueueDb(); const queue = yajob(queueDb.uri); From 3d486752534269a732d84ff362aefc81aad40177 Mon Sep 17 00:00:00 2001 From: Mike Crowe Date: Tue, 14 Jun 2016 06:41:50 -0400 Subject: [PATCH 2/5] Reverted package.json and updated README to include replace API function and metadata --- package.json | 2 +- readme.md | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/package.json b/package.json index 971f952..ba7e106 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "yajob", - "version": "2.1.3", + "version": "2.1.2", "description": "Yet another job queue", "main": "index.js", "scripts": { diff --git a/readme.md b/readme.md index 3188b29..210a0ca 100644 --- a/readme.md +++ b/readme.md @@ -117,6 +117,23 @@ Type: `Object` * `schedule` - `Date`, when job should be available to `take` * `priority` - `Number`, that represents priority of job + * `meta` - `Object`, optional metadata attached to job and returned in taken object + +### replace(attrs, [options]) + +Update a pending job in the queue. Returns `Promise`. + +##### attrs +Type: `Object` + +Data, that will be attached to job. + +##### options +Type: `Object` + + * `schedule` - `Date`, when job should be available to `take` + * `priority` - `Number`, that represents priority of job + * `meta` - `Object`, optional metadata attached to job and returned in taken object ### take([count]) From ddbfd23e77b9e81d9a8d012cd280018fbaba73b7 Mon Sep 17 00:00:00 2001 From: Mike Crowe Date: Tue, 14 Jun 2016 06:48:34 -0400 Subject: [PATCH 3/5] Updated README to include replace API function and metadata --- readme.md | 15 ++++++++++++++- test/take.js | 1 - 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/readme.md b/readme.md index 210a0ca..f60f3d6 100644 --- a/readme.md +++ b/readme.md @@ -42,7 +42,7 @@ d.setHours(24,0,0,0); mails.put({ from: 'floatdrop@gmail.com', - to: 'nodejs-dev@dev-null.com', + to: 'nodejs-dev@dev-null.com' }, { meta: {body: 'You have 1 new notification'}, schedule: d @@ -59,6 +59,19 @@ mails.replace({ schedule: d }); +// Now, when you take the job in the future: + +let job = yield mails.take(); +console.log(job); +``` + +This would print out: +``` +{ + from: 'floatdrop@gmail.com', + to: 'nodejs-dev@dev-null.com', + body: 'You have 2 new notification' +} ``` This will only send out a single email with the new body. diff --git a/test/take.js b/test/take.js index aeec23d..cb0b9b1 100644 --- a/test/take.js +++ b/test/take.js @@ -33,7 +33,6 @@ test('take one with meta', async t => { t.is(typeof promise.then, 'function', 'should return a Promise'); const taken = Array.from(await promise); - console.log('TAKEN: ', taken); t.same(taken, [{test: 'wow', param: 1}]); const jobs = Array.from(queue.take()); From aa62e8daa8cf6901f01160572ee190f68a48565b Mon Sep 17 00:00:00 2001 From: Mike Crowe Date: Wed, 15 Jun 2016 13:08:15 -0400 Subject: [PATCH 4/5] Adding {upsert: true} to replace to insert record if it doesn't exist --- index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/index.js b/index.js index 53af79a..4b2a212 100644 --- a/index.js +++ b/index.js @@ -86,7 +86,7 @@ Yajob.prototype.replace = function (attrs, opts) { const jobs = this._db.then(db => db.collection(this._tag)); - return jobs.then(c => c.update({status: Yajob.status.new, attrs}, attrsToJob(attrs))); + return jobs.then(c => c.update({status: Yajob.status.new, attrs}, attrsToJob(attrs)), {upsert: true}); }; Yajob.prototype.take = function (count) { From bdafb276a9074c09af8c6d02cdbabd506259abcc Mon Sep 17 00:00:00 2001 From: Mike Crowe Date: Wed, 15 Jun 2016 16:05:39 -0400 Subject: [PATCH 5/5] Updated replace method to fix upsert issue (replace creates job if it doesn't exist) --- index.js | 2 +- test/replace.js | 13 +++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/index.js b/index.js index 4b2a212..9d2bb43 100644 --- a/index.js +++ b/index.js @@ -86,7 +86,7 @@ Yajob.prototype.replace = function (attrs, opts) { const jobs = this._db.then(db => db.collection(this._tag)); - return jobs.then(c => c.update({status: Yajob.status.new, attrs}, attrsToJob(attrs)), {upsert: true}); + return jobs.then(c => c.update({status: Yajob.status.new, attrs}, attrsToJob(attrs), {upsert: true, w: 1})); }; Yajob.prototype.take = function (count) { diff --git a/test/replace.js b/test/replace.js index 915445e..6080e0c 100644 --- a/test/replace.js +++ b/test/replace.js @@ -20,3 +20,16 @@ test('replace should add job to queue then update it', async t => { } }); +test('replace should add job to queue if it does not exist', async t => { + const queueDb = await new QueueDb(); + const queue = yajob(queueDb.uri); + + try { + await queue.replace({test: 'message'}, {meta: {param: 2}}); + const job2 = await queueDb.db.collection('default').find().toArray(); + t.same(job2[0].attrs, {test: 'message'}); + t.same(job2[0].meta, {param: 2}); + } finally { + await queueDb.close(); + } +});