Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions rima.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@
-- Index 0: TREE { task_id }
-- Index 1: TREE { key }
--
-- Space 4: Fast tasks monitoring
-- Tuple: { key (NUM), added_tasks_count (NUM), deleted_tasks_count (NUM) }
-- Index 0: TREE { key }
--

local EXPIRATION_TIME = 30 * 60 -- seconds
local FAST_TASKS_STAT_EXP_TIME = 60 * 60 * 24 * 365 -- seconds (1 year)

--
-- Put task to the queue.
Expand Down Expand Up @@ -72,11 +77,120 @@ function rima_put_sync(key, data, prio)
return rima_put_impl(key, data, prio, box.time())
end

--
-- Get a key for fast tasks monitoring
--
local function rima_get_monitoring_key(t)
if t == nil or t == 0 then
t = box.time()
end
return t - (t % 60)
end

--
-- Increment fast tasks count
--
local function rima_inc_fast_tasks_count()
local key = rima_get_monitoring_key()
local update_result = box.update(4, key, '+p', 1, 1)

if update_result == nil then
-- no stat for current key
box.insert(4, key, 1, 0)
end

-- increment total number of fast tasks in queue
update_result = box.update(4, 0, '+p', 1, 1)
if update_result == nil then
box.insert(4, 0, 1)
end
end

--
-- Decrement fast tasks count
--
local function rima_dec_fast_tasks_count()
local key = rima_get_monitoring_key()
local update_result = box.update(4, key, '+p', 2, 1)

if update_result == nil then
-- no stat for current key
box.insert(4, key, 0, 1)
end

-- decrement total number of fast tasks
box.update(4, 0, '-p', 1, 1) -- ignore case when no tasks was in queue
end

--
-- Remove old records from tarantool
--
local function rima_clear_expired_stat()
local cur_key = rima_get_monitoring_key(box.time())
local expired_key = cur_key - FAST_TASKS_STAT_EXP_TIME
local iter = box.space[4].index[0]:iterator(box.index.LE, expired_key)

for tuple in iter do
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not all tuples will be removed!
Iterators will be corrupted.

box.delete(4, tuple[0])
end
end

--
-- Get statistic from monitoring for given time interval
--
function rima_get_monitoring_statistics(start_time, end_time)
if start_time ~= nil then
start_time = box.unpack('i', start_time)
end
if end_time ~= nil then
end_time = box.unpack('i', end_time)
end

rima_clear_expired_stat()

if start_time == nil or start_time == 0 then
-- request all statistics, but 0 is global tasks counter
start_time = 1
else
start_time = rima_get_monitoring_key(start_time)
end

end_time = rima_get_monitoring_key(end_time)
if end_time < start_time then
end_time, start_time = start_time, end_time
end

local tuple = box.select(4, 0, 0)
local currently_in_queue = 0
if tuple ~= nil then
currently_in_queue = box.unpack('i', tuple[1])
end

local total_added, total_deleted = 0, 0
local iter = box.space[4].index[0]:iterator(box.index.GE, start_time)
for tuple in iter do
if box.unpack('i', tuple[0]) > end_time then
break
end

total_added = total_added + box.unpack('i', tuple[1])
total_deleted = total_deleted + box.unpack('i', tuple[2])
end

-- string will be returned
return box.cjson.encode({
fast_tasks_in_queue = currently_in_queue,
fast_tasks_added = total_added,
fast_tasks_deleted = total_deleted,
})
end

--
-- Put fetch single mail task to the queue.
--
function rima_put_fetchmail(key, data)
box.auto_increment(3, key, data, box.time())
rima_inc_fast_tasks_count()
end

local function get_prio_key(prio, source)
Expand Down Expand Up @@ -132,6 +246,7 @@ function rima_get_fetchmail()
local tuples = { box.select_limit(3, 1, 0, 1000, key) }
for _, tuple in pairs(tuples) do
tuple = box.delete(3, box.unpack('l', tuple[0]))
rima_dec_fast_tasks_count()
if tuple ~= nil then
table.insert(result, { box.unpack('i', tuple[3]), tuple[2] })
n = 1
Expand Down