From 4b6e5970fd82fdf7f6525c3c16759297c822d49c Mon Sep 17 00:00:00 2001 From: Mitch Capper Date: Sat, 8 Apr 2017 12:36:23 -0700 Subject: [PATCH] Read receipt support & logging of self messages Major additions to database driver (currently mysql only) to track metadata about messages. This currently allows us to support facebook read receipts on messages (will appear as eye emoji on the latest message they have seen) This also allows us to turn on self-events, this allows for logging into the slack channel messages sent by the facebook user on other devices. In the future this would allow for playing catch-up when disconnected from slack or facebook on a thread as we can figure out what messages we have or have not already taken care of. Manually rebased on top of new master as way too painful otherwise. --- bin/run_mysql.js | 205 ++++++++++++++++++++++++++++++++--------------- lib/facebot.js | 190 ++++++++++++++++++++++++++++++++++--------- 2 files changed, 289 insertions(+), 106 deletions(-) diff --git a/bin/run_mysql.js b/bin/run_mysql.js index 727b9bf..0d72b0f 100644 --- a/bin/run_mysql.js +++ b/bin/run_mysql.js @@ -13,52 +13,136 @@ var envVars = [ 'FACEBOOK_PASSWORD', 'AUTHORISED_USERNAME', 'DATABASE_URL', + 'METADATA_TRACKING', ]; envVars.forEach(function(name) { if (process.env[name] == null) throw new Error('Environment Variable ' + name + ' not set'); }); +var CREATE_SETTINGS_SQL = "CREATE TABLE settings ( id INT, settings_json TEXT, PRIMARY KEY(id) );"; +var CREATE_MESSAGES_SQL = "CREATE TABLE messages( fb_threadid VARCHAR(80), fb_message_id VARCHAR(80), slack_message_timestamp VARCHAR(80), fb_message_timestamp BIGINT, is_cur_read BOOL, PRIMARY KEY(fb_message_id), INDEX(fb_threadid) );"; +var pool; +function create_pool(callback){ + var settings = JSON.parse(process.env.DATABASE_URL); + settings.supportBigNumbers=true; + pool= mysql.createPool(settings); + pool.getConnection(function(err, client) { + if(err){ + return callback(new Error("Couldn't connect to mysql db: " + err.message)); + } + + createTableIfNeeded(client,"settings",CREATE_SETTINGS_SQL, function (err){ + if(err){ + client.release(); + return callback(new Error("Couldn't create the settings table: " + err.message)); + } + createTableIfNeeded(client,"messages",CREATE_MESSAGES_SQL, function (err){ + if(err){ + client.release(); + return callback(new Error("Couldn't create the messages table: " + err.message)); + } + }); + return callback(null); + }); + }); +} +//msg_obj should have the props with the column names +function insert_message(msg_obj,callback) { + pool.getConnection(function(err, client) { + if(err) + return callback(new Error("Couldn't connect to mysql db: " + err.message)); + client.query("INSERT IGNORE INTO messages(fb_threadid, fb_message_id, slack_message_timestamp, fb_message_timestamp, is_cur_read) VALUES(?,?,?,?,0)",[msg_obj.fb_threadid,msg_obj.fb_message_id,msg_obj.slack_message_timestamp,msg_obj.fb_message_timestamp], function(err, result){ + client.release(); + if(err) + return callback(new Error("Couldn't insert into the messages table: " + err.message)); + callback(null); + }); + }); +} +function message_exists(fb_message_id,callback) { + pool.getConnection(function(err, client) { + if(err) + return callback(new Error("Couldn't connect to mysql db: " + err.message)); -// Load the settings and JSON from mysql -function load_data(callback) { - var client = mysql.createConnection(JSON.parse(process.env.DATABASE_URL)); + client.query("SELECT fb_message_id FROM messages WHERE fb_message_id = ?",[fb_message_id], function(err, result){ + client.release(); + if(err) + return callback(new Error("Couldn't select from the messages table: " + err.message)); + callback(null,result.length != 0); + }); + }); +} +function get_cur_read_msg_on_thread(fb_threadid,callback) { + pool.getConnection(function(err, client) { + if(err) + return callback(new Error("Couldn't connect to mysql db: " + err.message)); - client.connect(function(err) { - if (err) { - return callback( - new Error("Couldn't connect to mysql db: " + err.message) - ); - } + client.query("SELECT slack_message_timestamp FROM messages WHERE is_cur_read=1 && fb_threadid = ?",[fb_threadid], function(err, result){ + client.release(); + if(err) + return callback(new Error("Couldn't select from the messages table: " + err.message)); + callback(null,result.length != 0 ? result[0].slack_message_timestamp : null); + }); + }); +} - client.query( - 'SELECT settings_json FROM settings WHERE id = 1', - function(err, result) { - if (err || result.length == 0) { - return callback(new Error('No settings in mysql table')); - } +//This will get the message that is on the thread that is less or equal to the fb timestamp the user read through and set that message as is_cur_read value and clear out the old one +function get_and_set_read_msg_on_thread(fb_threadid,fb_as_new_as_timestamp,callback){ + pool.getConnection(function(err, client) { + if(err) + return callback(new Error("Couldn't connect to mysql db: " + err.message)); + client.query("UPDATE messages SET is_cur_read=0 WHERE is_cur_read=1 && fb_threadid=?",[fb_threadid], function(err, result){ + if(err){ + client.release(); + return callback(new Error("Couldn't select from the messages table: " + err.message)); + } - try { - client.end(); - return callback(null, JSON.parse(result[0].settings_json)); - } catch (err) { - return callback( - 'Found results in mysql table, but failed to parse: ' + - err - ); + client.query("SELECT fb_message_id,slack_message_timestamp FROM messages WHERE fb_message_timestamp <= ? && fb_threadid = ? ORDER BY fb_message_timestamp desc LIMIT 1",[fb_as_new_as_timestamp,fb_threadid], function(err, result){ + if(err){ + client.release(); + return callback(new Error("Couldn't select from the messages table: " + err.message)); } + client.query("UPDATE messages SET is_cur_read=1 WHERE fb_message_id=?",[result.length != 0 ? result[0].fb_message_id : ""], function(err, res2){ + client.release(); + if(err) + return callback(new Error("Couldn't select from the messages table: " + err.message)); + callback(null,result.length != 0 ? result[0].slack_message_timestamp : null); + }); + }); + }); + }); + +} + +// Load the settings and JSON from mysql +function load_data(callback){ + + pool.getConnection(function(err, client) { + if(err){ + return callback(new Error("Couldn't connect to mysql db: " + err.message)); + } + + client.query("SELECT settings_json FROM settings WHERE id = 1", function(err, result){ + if(err || result.length == 0){ + client.release(); + return callback(new Error("No settings in mysql table")); } - ); + + try { + client.release(); + return callback(null, JSON.parse(result[0].settings_json)); + } catch (err){ + return callback("Found results in mysql table, but failed to parse: " + err); + } + }); }); } -function createTableIfNeeded(client, callback) { - client.query('SELECT * FROM settings LIMIT 1', function(err, result) { - if (err) { - return client.query( - 'CREATE TABLE settings (id INT, settings_json TEXT, PRIMARY KEY(id) )', - callback - ); +function createTableIfNeeded(client, table, create_sql, callback){ + client.query("SELECT * FROM " + table + " LIMIT 1", function(err, result){ + if(err) { + return client.query(create_sql, callback); } else { // table exists return callback(null); @@ -66,36 +150,20 @@ function createTableIfNeeded(client, callback) { }); } -function save_data(data, callback) { - var client = mysql.createConnection(JSON.parse(process.env.DATABASE_URL)); - - client.connect(function(err) { - if (err) { - return callback( - new Error("Couldn't connect to mysql db: " + err.message) - ); +function save_data(data, callback){ + pool.getConnection(function(err, client) { + if(err){ + return callback(new Error("Couldn't connect to mysql db: " + err.message)); } - createTableIfNeeded(client, function(err) { - if (err) { - return callback( - new Error( - "Couldn't create the settings table: " + err.message - ) - ); - } - var insertQuery = 'INSERT INTO settings(id, settings_json) VALUES (1, ?) ON DUPLICATE KEY UPDATE settings_json=VALUES(settings_json)'; - insertQuery = mysql.format(insertQuery, [JSON.stringify(data)]); - client.query(insertQuery, function(err, result) { - if (err) - return callback( - new Error( - "Couldn't insert/update settings table: " + - err.message - ) - ); - callback(); + + var insertQuery = "INSERT INTO settings(id, settings_json) VALUES (1, ?) ON DUPLICATE KEY UPDATE settings_json=VALUES(settings_json)"; + insertQuery = mysql.format(insertQuery,[JSON.stringify(data)]); + client.query(insertQuery, function(err, result){ + client.release(); + if(err) + return callback(new Error("Couldn't insert/update settings table: " + err.message)); + callback(); }); - }); }); } @@ -106,10 +174,15 @@ var settings = { debug_messages: process.env.DEBUG_MESSAGES || false, facebook: { email: process.env.FACEBOOK_EMAIL, - pass: process.env.FACEBOOK_PASSWORD, - }, -}; - -var facebot = new Facebot(settings, load_data, save_data); - -facebot.run(); + pass: process.env.FACEBOOK_PASSWORD + } +} +create_pool(function(err) { + if (err) + throw new Error("Error initializing pool of: " + err); + var meta_funcs = {}; + if (process.env.METADATA_TRACKING) + meta_funcs = {get_cur_read_msg_on_thread:get_cur_read_msg_on_thread,get_and_set_read_msg_on_thread:get_and_set_read_msg_on_thread,message_exists:message_exists,insert_message:insert_message}; + var facebot = new Facebot(settings, load_data, save_data, meta_funcs); + facebot.run(); +}); diff --git a/lib/facebot.js b/lib/facebot.js index 66bdeeb..c5a2780 100644 --- a/lib/facebot.js +++ b/lib/facebot.js @@ -8,11 +8,80 @@ var facebook = require('facebook-chat-api'); var fbUtil = require('./util'); var emoji_lib = require('js-emoji'); var emoji = new emoji_lib.EmojiConvertor(); +var extend = require('extend'); + +// https://github.com/mishk0/slack-bot-api/pull/32 +/** + * Posts a reaction (emoji) to a message by timestamp + * @param {string} id - channel ID + * @param {string} emoji - emoji string (without the : symbols) + * @param {string} ts - timestamp of the message you want to react to + * @param {object} params + * @returns {vow.Promise} + */ +slackbots.prototype.postReactionToChannel = function(id, emoji, ts, params) { + params = extend({ + channel: id, + name: emoji, + timestamp: ts + }, params || {}); + + return this._api('reactions.add', params); +}; + +/** + * Removes a reaction (emoji) by timestamp + * @param {string} id - channel ID + * @param {string} emoji - emoji string (without the : symbols) + * @param {string} ts - timestamp of the message you want to react to + * @param {object} params + * @returns {vow.Promise} + */ +slackbots.prototype.removeReactionFromChannel = function(id, emoji, ts, params) { + params = extend({ + channel: id, + name: emoji, + timestamp: ts + }, params || {}); + + return this._api('reactions.remove', params); +}; + +/** + * Returns a list of all reactions for a message (specified by timestamp) + * @param {string} id - channel ID + * @param {string} ts - timestamp of the message + * @param {object} params + * @returns {vow.Promise} + */ +slackbots.prototype.getReactions = function(id, ts, params) { + params = extend({ + channel: id, + timestamp: ts + }, params || {}); + + return this._api('reactions.get', params); +}; + +/** + * Returns a list of all items reacted to by a user + * @param {string} id - user ID + * @param {object} params + * @returns {vow.Promise} + */ +slackbots.prototype.listReactions = function(id, ts, params) { + params = extend({ + user: id + }, params || {}); + + return this._api('reactions.list', params); +}; // Load_data: function(callback(err, data)) // Save_data: function(data, callback(err)) // data: { appState: object, channelLinks: [] } -var Facebot = function Constructor(settings, load_data, save_data) { +// message_db: optional, if specified used for tracking message state +var Facebot = function Constructor(settings, load_data, save_data, message_db){ this.settings = settings; this.settings.name = this.settings.name || 'facebot'; this.user = null; @@ -20,7 +89,8 @@ var Facebot = function Constructor(settings, load_data, save_data) { this.load_data = load_data; this.save_data = save_data; - + this.message_db = message_db; + // array of { slack_channel: string id, fb_thread: string id } this.channelLinks = []; this.fb_users = {}; @@ -135,20 +205,34 @@ Facebot.prototype.saveData = function() { // Creates the FB api using either saved tokens or username // and password passed in as credentials -Facebot.prototype.createFBApi = function(credentials) { - return Q.nfcall(facebook, credentials).then(api => { - this.sendDebugMessage('Logged into facebook'); - - this.facebookApi = api; - api.setOptions({ - logLevel: 'error', - listenEvents: true, - }); - api.listen((err, fbmessage) => { - if (!err) this.handleFBNotification(fbmessage); - }); - }); -}; +Facebot.prototype.createFBApi = function(credentials){ + return Q.nfcall(facebook, credentials) + .then(api => { + this.sendDebugMessage("Logged into facebook") + + this.facebookApi = api; + this.ourFacebookId = api.getCurrentUserID(); + var opts = { + logLevel: "error", + listenEvents: true, + }; + if (this.message_db){ + opts.selfListen=true; + this.ourSenderInfo = {icon_url:`http://graph.facebook.com/${this.ourFacebookId}/picture?type=square`,username:"us"}; + fbUtil.findFBUser(this.facebookApi,this.ourFacebookId,true) + .then( friend => {this.ourSenderInfo.username=friend.name; } ) + .catch(function (error) {console.log("FIND FRIEND ERROR OF: " + error);}); + } + + api.setOptions(opts); + api.listen((err, fbmessage) => { + if(!err){ + //wait to make sure it had a chance to get added to db first... + setTimeout( ()=> this.handleFBNotification(fbmessage), 250 ); + } + }); + }); +} // loop that will continue to send the is typing indicator to a channel // until we hear back they are not typing, or 10 minutes have past @@ -169,25 +253,47 @@ Facebot.prototype.handleTypeNotification = function(fbmessage, link) { link.typing_start_time = Date.now() / 1000; this.typingLoop(link); } -}; -Facebot.prototype.postFBMessageToSlack = function(fbmessage, link) { - if (fbmessage.body !== undefined) { - var message_text = emoji.replace_emoticons_with_colons(fbmessage.body); - this.postMessage(link.slack_channel, message_text, { - username: link.fb_name, - icon_url: link.icon, +} +Facebot.prototype.postFBMessageToSlack = function (fbmessage, link) { + if (this.message_db) + promise = Q.nfcall(this.message_db.message_exists,fbmessage.messageID); + else + promise = Q.fcall(function () {return false;}); + promise.then( already_exists => { + if (already_exists) + return; + if (fbmessage.body !== undefined){ + var message_text = emoji.replace_emoticons_with_colons(fbmessage.body); + var sender_info = { username: link.fb_name, icon_url: link.icon }; + if (fbmessage.senderID == us.ourFacebookId) + sender_info = us.ourSenderInfo; + this.postMessage(link.slack_channel, + message_text, + sender_info) + .then( data => { + if (this.message_db) + this.message_db.insert_message({fb_threadid:fbmessage.threadID,fb_message_id:fbmessage.messageID,slack_message_timestamp:data.message.ts,fb_message_timestamp:fbmessage.timestamp},function(err){if (err) throw new Error("err adding of: " + err);}); }); - } - this.handleFBAttachments(fbmessage, link); -}; + } + this.handleFBAttachments(fbmessage, link); + }); +} +Facebot.prototype.handleReadReceipt = function(fbmessage, link){ + Q.nfcall(this.message_db.get_cur_read_msg_on_thread,fbmessage.threadID) + .then ( timestamp => {if (timestamp) this.removeReactionFromChannel(link.slack_channel,"eye",timestamp);return null;} ) + .then ( () => {return Q.nfcall(this.message_db.get_and_set_read_msg_on_thread,fbmessage.threadID,fbmessage.time);} ) + .then( timestamp => {return timestamp ? this.postReactionToChannel(link.slack_channel,"eye",timestamp) : null;} ) + .catch(function (error) {console.log("READ RECEIPT ERROR OF: " + error.error);}) + .done(); +} + // Handles any facebook messages/events received, formats them // and sends them through to the linked slack channels Facebot.prototype.handleFBNotification = function(fbmessage) { var threadID = undefined; - if (fbmessage.type == 'message') threadID = fbmessage.threadID; - if ( - fbmessage.type == 'typ' //not sure why typing messages appear on a different thread but they do so we need to use from... - ) + if (fbmessage.type == "message" || fbmessage.type == "read_receipt") + threadID = fbmessage.threadID; + if (fbmessage.type == "typ")//not sure why typing messages appear on a different thread but they do so we need to use from... threadID = fbmessage.from; if (!threadID) return; @@ -198,8 +304,12 @@ Facebot.prototype.handleFBNotification = function(fbmessage) { case 'typ': this.handleTypeNotification(fbmessage, link); break; - case 'message': - this.postFBMessageToSlack(fbmessage, link); + case "read_receipt": + if (this.message_db) + this.handleReadReceipt(fbmessage,link); + break; + case "message": + this.postFBMessageToSlack(fbmessage,link); break; } }); @@ -342,14 +452,14 @@ Facebot.prototype.postSlackMessagesToFB = function(message) { msg, link.fb_thread, (err, msgInfo) => { - if (err) - this.postMessage( - link.slack_channel, - 'Error sending last message: ' + err.error, - { as_user: true } - ); - } - ); + if(err) + this.postMessage(link.slack_channel, + "Error sending last message: " + err.error, + { as_user: true }); + else if (this.message_db) + this.message_db.insert_message({fb_threadid:link.fb_thread,fb_message_id:msgInfo.messageID,slack_message_timestamp:message.ts,fb_message_timestamp:msgInfo.timestamp},function(err){if (err) throw new Error("err adding of: " + err);}); + + }); }); } };