Em 2013, cheguei ao Mail.ru Group e estava resolvendo um problema no qual precisava de uma fila. Existem muitas ferramentas diferentes para a criação de filas, mas decidi primeiro descobrir o que a empresa já tem. Ouvi dizer que existe esse produto - Tarantool. Descobri como funciona e parecia-me que um corretor de filas poderia ser perfeitamente integrado a ele.
Tarantool — — , . , C, Tarantool, … 250 , , .
Tarantool. , .
, Tarantool , , . , Tarantool, 2.2.
, Beanstalkd. , ( ), . - .
: , ; : , ( put
); , ( take
).
. put
ready. take
taken. taken (ack
) , ready (release
).
:
Tarantool — , , LuaJIT-. , init.lua, , box.cfg()
, .
. :
require'strict'.on()
box.cfg{}
require'console'.start()
os.exit()
, - . , . 10-15
strict. Lua , . , Tarantool DEBUG
, strict
.
tarantool
:
tarantool init.lua
- :
2020-07-09 20:00:11.344 [30043] main/102/init.lua C> Tarantool 2.2.3-1-g98ecc909a
2020-07-09 20:00:11.345 [30043] main/102/init.lua C> log level 5
2020-07-09 20:00:11.346 [30043] main/102/init.lua I> mapping 268435456 bytes for memtx tuple arena...
2020-07-09 20:00:11.347 [30043] main/102/init.lua I> mapping 134217728 bytes for vinyl tuple arena...
2020-07-09 20:00:11.370 [30043] main/102/init.lua I> instance uuid 38c59892-263e-42de-875c-8f67539191a3
2020-07-09 20:00:11.371 [30043] main/102/init.lua I> initializing an empty data directory
2020-07-09 20:00:11.408 [30043] main/102/init.lua I> assigned id 1 to replica 38c59892-263e-42de-875c-8f67539191a3
2020-07-09 20:00:11.408 [30043] main/102/init.lua I> cluster uuid 7723bdf4-24e8-4957-bd6c-6ab502a1911c
2020-07-09 20:00:11.425 [30043] snapshot/101/main I> saving snapshot `./00000000000000000000.snap.inprogress'
2020-07-09 20:00:11.437 [30043] snapshot/101/main I> done
2020-07-09 20:00:11.439 [30043] main/102/init.lua I> ready to accept requests
2020-07-09 20:00:11.439 [30043] main/104/checkpoint_daemon I> scheduled next checkpoint for Thu Jul 9 21:11:59 2020
tarantool>
queue.lua
. , init.lua
, .
queue
init.lua
:
require'strict'.on()
box.cfg{}
queue = require 'queue'
require'console'.start()
os.exit()
queue.lua
.
, - . (space) — . , - . , , , (if_not_exists
). Tarantool ( ). .
. id
, - . , . , id
. , , .
box.schema.create_space('queue',{ if_not_exists = true; })
box.space.queue:format( {
{ name = 'id'; type = 'number' },
{ name = 'status'; type = 'string' },
{ name = 'data'; type = '*' },
} );
box.space.queue:create_index('primary', {
parts = { 1,'number' };
if_not_exists = true;
})
queue
, , . : (put
) (take
).
. . , : . : R=READY
T=TAKEN
.
local queue = {}
local STATUS = {}
STATUS.READY = 'R'
STATUS.TAKEN = 'T'
function queue.put(...)
end
function queue.take(...)
end
return queue
put
? . id
READY
. , clock.realtime
. , ( , , ). , , , . , id
, . , , .
:
local clock = require 'clock'
function gen_id()
local new_id
repeat
new_id = clock.realtime64()
until not box.space.queue:get(new_id)
return new_id
end
function queue.put(...)
local id = gen_id()
return box.space.queue:insert{ id, STATUS.READY, { ... } }
end
put
, Tarantool . , , (). . , Tarantool , MessagePack, .
tarantool> queue.put("hello")
---
- [1594325382148311477, 'R', ['hello']]
...
tarantool> queue.put("my","data",1,2,3)
---
- [1594325394527830491, 'R', ['my', 'data', 1, 2, 3]]
...
tarantool> queue.put({ complex = { struct = "data" }})
---
- [1594325413166109943, 'R', [{'complex': {'struct': 'data'}}]]
...
, , . , .
tarantool> box.space.queue:select()
---
- - [1594325382148311477, 'R', ['hello']]
- [1594325394527830491, 'R', ['my', 'data', 1, 2, 3]]
- [1594325413166109943, 'R', [{'complex': {'struct': 'data'}}]]
...
— take
. . , , READY
. , , , . . Tarantool, key-value , , : , , .
, , — . . — id
. .
box.space.queue:create_index('status', {
parts = { 2, 'string', 1, 'number' };
if_not_exists = true;
})
. , pairs
. . , : , . , READY
. . - , , . , , take
, . , .
function queue.take()
local found = box.space.queue.index.status
:pairs({STATUS.READY},{ iterator = 'EQ' }):nth(1)
if found then
return box.space.queue
:update( {found.id}, {{'=', 2, STATUS.TAKEN }})
end
return
end
, Tarantool . , , , update
, . , . :
local F = {}
for no,def in pairs(box.space.queue:format()) do
F[no] = def.name
F[def.name] = no
end
:
box.space.queue:format( {
{ name = 'id'; type = 'number' },
{ name = 'status'; type = 'string' },
{ name = 'data'; type = '*' },
} );
local F = {}
for no,def in pairs(box.space.queue:format()) do
F[no] = def.name
F[def.name] = no
end
box.space.queue:create_index('primary', {
parts = { F.id, 'number' };
if_not_exists = true;
})
box.space.queue:create_index('status', {
parts = { F.status, 'string', F.id, 'number' };
if_not_exists = true;
})
take
:
function queue.take(...)
for _,t in
box.space.queue.index.status
:pairs({ STATUS.READY },{ iterator='EQ' })
do
return box.space.queue:update({t.id},{
{ '=', F.status, STATUS.TAKEN }
})
end
return
end
, . take
. , box.space.queue:truncate()
:
tarantool> queue.put("my","data",1,2,3)
---
- [1594325927025602515, 'R', ['my', 'data', 1, 2, 3]]
...
tarantool> queue.take()
---
- [1594325927025602515, 'T', ['my', 'data', 1, 2, 3]]
...
tarantool> queue.take()
---
...
take
, . take
, , ready- ( R
) . , select
:
tarantool> box.space.queue:select()
---
- - [1594325927025602515, 'T', ['my', 'data', 1, 2, 3]]
...
, , , , - . - . : ack
release
. id
. , . . , ready
.
function queue.ack(id)
local t = assert(box.space.queue:get{id},"Task not exists")
if t and t.status == STATUS.TAKEN then
return box.space.queue:delete{t.id}
else
error("Task not taken")
end
end
function queue.release(id)
local t = assert(box.space.queue:get{id},"Task not exists")
if t and t.status == STATUS.TAKEN then
return box.space.queue:update({t.id},{{'=', F.status, STATUS.READY }})
else
error("Task not taken")
end
end
, . , . R
. take
. , . take
. . , -.
tarantool> queue.put("task 1")
---
- [1594326185712343931, 'R', ['task 1']]
...
tarantool> queue.put("task 2")
---
- [1594326187061434882, 'R', ['task 2']]
...
tarantool> task = queue.take() return task
---
- [1594326185712343931, 'T', ['task 1']]
...
tarantool> queue.release(task.id)
---
- [1594326185712343931, 'R', ['task 1']]
...
tarantool> task = queue.take() return task
---
- [1594326185712343931, 'T', ['task 1']]
...
tarantool> queue.ack(task.id)
---
- [1594326185712343931, 'T', ['task 1']]
...
tarantool> task = queue.take() return task
---
- [1594326187061434882, 'T', ['task 2']]
...
tarantool> queue.ack(task.id)
---
- [1594326187061434882, 'T', ['task 2']]
...
tarantool> task = queue.take() return task
---
- null
...
. , . , , . take
, , . , , , , CPU.
while true do
local task = queue.take()
if task then
-- ...
end
end
, «» ( channel
). . , FIFO- . , , . Lua-, - , , .
: N , , . , , - . , . put
. , put
. , put
, put
, . . Go, :
take
. — : . , . , , .
, . «» , , .
local fiber = require 'fiber'
queue._wait = fiber.channel()
function queue.take(timeout)
if not timeout then timeout = 0 end
local now = fiber.time()
local found
while not found do
found = box.space.queue.index.status
:pairs({STATUS.READY},{ iterator = 'EQ' }):nth(1)
if not found then
local left = (now + timeout) - fiber.time()
if left <= 0 then return end
queue._wait:get(left)
end
end
return box.space.queue
:update( {found.id}, {{'=', F.status, STATUS.TAKEN }})
end
: take
, , . , . , , .
, init.lua
fiber
:
fiber = require 'fiber'
, . 0,1 . , 0,1 . . take
3. take
. , 3 . , .
tarantool> do
box.space.queue:truncate()
fiber.create(function()
fiber.sleep(0.1)
queue.put("task 3")
end)
local start = fiber.time()
return queue.take(3), { wait = fiber.time() - start }
end
---
- [1594326905489650533, 'T', ['task 3']]
- wait: 3.0017817020416
...
, take
. put
. , true
.
, put
, . , . . . , , , . . , , , .
function queue.put(...)
local id = gen_id()
if queue._wait:has_readers() then
queue._wait:put(true,0)
end
return box.space.queue:insert{ id, STATUS.READY, { ... } }
end
take
. 0,1 . take
. , . , .
tarantool> do
box.space.queue:truncate()
fiber.create(function()
fiber.sleep(0.1)
queue.put("task 4")
end)
local start = fiber.time()
return queue.take(3), { wait = fiber.time() - start }
end
---
- [1594327004302379957, 'T', ['task 4']]
- wait: 0.10164666175842
...
, . . init.lua
box.cfg
listen
— , . . , , . .
require'strict'.on()
fiber = require 'fiber'
box.cfg{
listen = '127.0.0.1:3301'
}
box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true })
queue = require 'queue'
require'console'.start()
os.exit()
-producer
. Tarantool , Tarantool.
#!/usr/bin/env tarantool
if #arg < 1 then
error("Need arguments",0)
end
local netbox = require 'net.box'
local conn = netbox.connect('127.0.0.1:3301')
local yaml = require 'yaml'
local res = conn:call('queue.put',{unpack(arg)})
print(yaml.encode(res))
conn:close()
$ tarantool producer.lua "hi"
--- [1594327270675788959, 'R', ['hi']]
...
(consumer
) , take
. , . . , .
#!/usr/bin/env tarantool
local netbox = require 'net.box'
local conn = netbox.connect('127.0.0.1:3301')
local yaml = require 'yaml'
while true do
local task = conn:call('queue.take', { 1 })
if task then
print("Got task: ", yaml.encode(task))
conn:call('queue.release', { task.id })
else
print "No more tasks"
end
end
- .
$ tarantool consumer.lua
Got task:
--- [1594327270675788959, 'T', ['hi']]
...
ER_EXACT_MATCH: Invalid key part count in an exact match (expected 1, got 0)
. , , : . , , , , .
$ tarantool consumer.lua
No more tasks
No more tasks
select
, .
tarantool> box.space.queue:select()
---
- - [1594327004302379957, 'T', ['task 3']]
- [1594327270675788959, 'T', ['hi']]
...
, .
Tarantool . , .
local log = require 'log'
box.session.on_connect(function()
log.info( "connected %s from %s", box.session.id(), box.session.peer() )
end)
box.session.on_disconnect(function()
log.info( "disconnected %s from %s", box.session.id(), box.session.peer() )
end)
2020-07-09 20:52:09.107 [32604] main/115/main I> connected 2 from 127.0.0.1:36652
2020-07-09 20:52:10.260 [32604] main/116/main I> disconnected 2 from nil
2020-07-09 20:52:10.823 [32604] main/116/main I> connected 3 from 127.0.0.1:36654
2020-07-09 20:52:11.541 [32604] main/115/main I> disconnected 3 from nil
session id
, , IP . , . session.peer()
getpeername(2)
. , (getpeername
). C . Tarantool box.session.storage
— , , , . , , , . .
box.session.on_connect(function()
box.session.storage.peer = box.session.peer()
log.info( "connected %s from %s", box.session.id(), box.session.storage.peer )
end)
box.session.on_disconnect(function()
log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer )
end)
. - . « ». , , . , , take
.
queue.taken = {}; --
queue.bysid = {}; --
function queue.take(timeout)
if not timeout then timeout = 0 end
local now = fiber.time()
local found
while not found do
found = box.space.queue.index.status
:pairs({STATUS.READY},{ iterator = 'EQ' }):nth(1)
if not found then
local left = (now + timeout) - fiber.time()
if left <= 0 then return end
queue._wait:get(left)
end
end
local sid = box.session.id()
log.info("Register %s by %s", found.id, sid)
queue.taken[ found.id ] = sid
queue.bysid[ sid ] = queue.bysid[ sid ] or {}
queue.bysid[ sid ][ found.id ] = true
return box.space.queue
:update( {found.id}, {{'=', F.status, STATUS.TAKEN }})
end
, . , ack
release
. . , , . , : « , ».
local function get_task( id )
if not id then error("Task id required", 2) end
local t = box.space.queue:get{id}
if not t then
error(string.format( "Task {%s} was not found", id ), 2)
end
if not queue.taken[id] then
error(string.format( "Task %s not taken by anybody", id ), 2)
end
if queue.taken[id] ~= box.session.id() then
error(string.format( "Task %s taken by %d. Not you (%d)",
id, queue.taken[id], box.session.id() ), 2)
end
return t
end
ack
release
. get_task
, , . .
function queue.ack(id)
local t = get_task(id)
queue.taken[ t.id ] = nil
queue.bysid[ box.session.id() ][ t.id ] = nil
return box.space.queue:delete{t.id}
end
function queue.release(id)
local t = get_task(id)
if queue._wait:has_readers() then queue._wait:put(true,0) end
queue.taken[ t.id ] = nil
queue.bysid[ box.session.id() ][ t.id ] = nil
return box.space.queue
:update({t.id},{{'=', F.status, STATUS.READY }})
end
R
SQL Lua-c:
box.execute[[ update "queue" set "status" = 'R' where "status" = 'T' ]]
box.space.queue.index.status:pairs({'T'}):each(function(t) box.space.queue:update({t.id},{{'=',2,'R'}}) end)
consumer
, task ID required
.
$ tarantool consumer.lua
Got task:
--- [1594327004302379957, 'T', ['task 3']]
...
ER_PROC_LUA: queue.lua:113: Task id required
. Tarantool, . , . . , . , . :tomap{ names_only = true }
:
function queue.put(...)
--- ...
return box.space.queue
:insert{ id, STATUS.READY, { ... } }
:tomap{ names_only = true }
end
function queue.take(timeout)
--- ...
return box.space.queue
:update( {found.id}, {{'=', F.status, STATUS.TAKEN }})
:tomap{ names_only = true }
end
function queue.ack(id)
--- ...
return box.space.queue:delete{t.id}:tomap{ names_only = true }
end
function queue.release(id)
--- ...
return box.space.queue
:update({t.id},{{'=', F.status, STATUS.READY }})
:tomap{ names_only = true }
end
return queue
, .
$ tarantool consumer.lua
Got task:
--- {'status': 'T', 'data': ['hi'], 'id': 1594327270675788959}
...
ER_PROC_LUA: queue.lua:117: Task 1594327270675788959ULL not taken by anybody
, . , ID . - ULL.
LuaJIT: FFI (Foreign Function Interface). . , 1
.
tarantool> t = {}
tarantool> t[1] = 1
tarantool> t["1"] = 2
tarantool> t[1LL] = 3
tarantool> t[1ULL] = 4
tarantool> t[1ULL] = 5
tarantool> t
---
- 1: 1
1: 5
1: 4
'1': 2
1: 3
...
, 2
( + ). 3
(, , LL). , : 1
, 2
, 3
, 4
, 5
. , , .
tarantool> return t[1], t['1'], t[1LL], t[1ULL]
---
- 1
- 2
- null
- null
...
, . Lua- (number
string
), LL (long long
) ULL (unsigned long long
) — . cdata
. C. Lua- cdata
, . , , . ULL , .
. , . - . MessagePack. Tarantool , Tarantool. , .
local msgpack = require 'msgpack'
local function keypack( key )
return msgpack.encode( key )
end
local function keyunpack( data )
return msgpack.decode( data )
end
take
. get_task
, , , int64
. keypack
, MessagePack. , , get_task
, ack
release
.
function queue.take(timeout)
if not timeout then timeout = 0 end
local now = fiber.time()
local found
while not found do
found = box.space.queue.index.status
:pairs({STATUS.READY},{ iterator = 'EQ' }):nth(1)
if not found then
local left = (now + timeout) - fiber.time()
if left <= 0 then return end
queue._wait:get(left)
end
end
local sid = box.session.id()
log.info("Register %s by %s", found.id, sid)
local key = keypack( found.id )
queue.taken[ key ] = sid
queue.bysid[ sid ] = queue.bysid[ sid ] or {}
queue.bysid[ sid ][ key ] = true
return box.space.queue
:update( {found.id}, {{'=', F.status, STATUS.TAKEN }})
:tomap{ names_only = true }
end
local function get_task( id )
if not id then error("Task id required", 2) end
id = tonumber64(id)
local key = keypack(id)
local t = box.space.queue:get{id}
if not t then
error(string.format( "Task {%s} was not found", id ), 2)
end
if not queue.taken[key] then
error(string.format( "Task %s not taken by anybody", id ), 2)
end
if queue.taken[key] ~= box.session.id() then
error(string.format( "Task %s taken by %d. Not you (%d)",
id, queue.taken[key], box.session.id() ), 2)
end
return t, key
end
function queue.ack(id)
local t, key = get_task(id)
queue.taken[ key ] = nil
queue.bysid[ box.session.id() ][ key ] = nil
return box.space.queue:delete{t.id}:tomap{ names_only = true }
end
function queue.release(id)
local t, key = get_task(id)
queue.taken[ key ] = nil
queue.bysid[ box.session.id() ][ key ] = nil
if queue._wait:has_readers() then queue._wait:put(true,0) end
return box.space.queue
:update({t.id},{{'=', F.status, STATUS.READY }})
:tomap{ names_only = true }
end
, , , - . — ready
. , take
. session.storage
, .
box.session.on_disconnect(function()
log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer )
box.session.storage.destroyed = true
local sid = box.session.id()
local bysid = queue.bysid[ sid ]
if bysid then
while next(bysid) do
for key, id in pairs(bysid) do
log.info("Autorelease %s by disconnect", id);
queue.taken[key] = nil
bysid[key] = nil
local t = box.space.queue:get(id)
if t then
if queue._wait:has_readers() then queue._wait:put(true,0) end
box.space.queue:update({t.id},{{'=', F.status, STATUS.READY }})
end
end
end
queue.bysid[ sid ] = nil
end
end)
function queue.take(timeout)
if not timeout then timeout = 0 end
local now = fiber.time()
local found
while not found do
found = box.space.queue.index.status
:pairs({STATUS.READY},{ iterator = 'EQ' }):nth(1)
if not found then
local left = (now + timeout) - fiber.time()
if left <= 0 then return end
queue._wait:get(left)
end
end
if box.session.storage.destroyed then return end
local sid = box.session.id()
log.info("Register %s by %s", found.id, sid)
local key = keypack( found.id )
queue.taken[ key ] = sid
queue.bysid[ sid ] = queue.bysid[ sid ] or {}
queue.bysid[ sid ][ key ] = found.id
return box.space.queue
:update( {found.id}, {{'=', F.status, STATUS.TAKEN }})
:tomap{ names_only = true }
end
:
tarantoolctl connect 127.0.0.1:3301 <<< 'queue.take()'
, , , , — ( ), taken
. : .
while true do
local t = box.space.queue.index.status:pairs({STATUS.TAKEN}):nth(1)
if not t then break end
box.space.queue:update({ t.id }, {{'=', F.status, STATUS.READY }})
log.info("Autoreleased %s at start", t.id)
end
, .
. . , . put
:W=WAITING
.
box.space.queue:format( {
{ name = 'id'; type = 'number' },
{ name = 'status'; type = 'string' },
{ name = 'runat'; type = 'number' },
{ name = 'data'; type = '*' },
} )
box.space.queue:create_index('runat', {
parts = { F.runat, 'number', F.id, 'number' };
if_not_exists = true;
})
STATUS.WAITING = 'W'
, ( ):
box.space.queue.drop()
box.snapshot()
.
put
release
delay
. delay
, WAITING
, . . . , . , .
function queue.put(data, opts)
local id = gen_id()
local runat = 0
local status = STATUS.READY
if opts and opts.delay then
runat = clock.realtime() + tonumber(opts.delay)
status = STATUS.WAITING
else
if queue._wait:has_readers() then
queue._wait:put(true,0)
end
end
return box.space.queue
:insert{ id, status, runat, data }
:tomap{ names_only=true }
end
function queue.release(id, opts)
local t, key = get_task(id)
queue.taken[ key ] = nil
queue.bysid[ box.session.id() ][ key ] = nil
local runat = 0
local status = STATUS.READY
if opts and opts.delay then
runat = clock.realtime() + tonumber(opts.delay)
status = STATUS.WAITING
else
if queue._wait:has_readers() then queue._wait:put(true,0) end
end
return box.space.queue
:update({t.id},{{ '=', F.status, status },{ '=', F.runat, runat }})
:tomap{ names_only = true }
end
- , . , , .
. take
, . , , . , , queue.runat
.
queue._runat = fiber.create(function()
fiber.name('queue.runat')
while true do
local remaining
local now = clock.realtime()
for _,t in box.space.queue.index.runat
:pairs( { 0 }, { iterator = 'GT' })
do
if t.runat > now then
remaining = t.runat - now
break
else
if t.status == STATUS.WAITING then
log.info("Runat: W->R %s",t.id)
if queue._wait:has_readers() then queue._wait:put(true,0) end
box.space.queue:update({ t.id }, {
{'=', F.status, STATUS.READY },
{'=', F.runat, 0 },
})
else
log.error("Runat: bad status %s for %s", t.status, t.id)
box.space.queue:update({ t.id },{{ '=', F.runat, 0 }})
end
end
end
if not remaining or remaining > 1 then remaining = 1 end
fiber.sleep(remaining)
end
end)
, , : . .
: , , .
function queue.stats()
return {
total = box.space.queue:len(),
ready = box.space.queue.index.status:count({STATUS.READY}),
waiting = box.space.queue.index.status:count({STATUS.WAITING}),
taken = box.space.queue.index.status:count({STATUS.TAKEN}),
}
end
tarantool> queue.stats()
---
- ready: 10
taken: 2
waiting: 5
total: 17
...
tarantool> local clock = require 'clock' local s = clock.time() local r = queue.stats() return r, clock.time() - s
---
- ready: 10
taken: 2
waiting: 5
total: 17
- 0.00057339668273926
...
. , . — . , - , , . stats
. , . index:count
— full-scan . .
queue._stats = {}
for k,v in pairs(STATUS) do
queue._stats[v] = 0LL
end
for _,t in box.space.queue:pairs() do
queue._stats[ t[F.status] ] = (queue._stats[ t[F.status] ] or 0LL)+1
end
function queue.stats()
return {
total = box.space.queue:len(),
ready = queue._stats[ STATUS.READY ],
waiting = queue._stats[ STATUS.WAITING ],
taken = queue._stats[ STATUS.TAKEN ],
}
end
, . . , . , , , . Tarantool . . space:update
space:delete
, . , . .
box.space.queue:on_replace(function(old,new)
if old then
queue._stats[ old[ F.status ] ] = queue._stats[ old[ F.status ] ] - 1
end
if new then
queue._stats[ new[ F.status ] ] = queue._stats[ new[ F.status ] ] + 1
end
end)
, , : space:truncate()
. — _truncate
.
box.space._truncate:on_replace(function(old,new)
if new.id == box.space.queue.id then
for k,v in pairs(queue._stats) do
queue._stats[k] = 0LL
end
end
end)
. , , . Tarantool . , C.
graphite UDP:
local socket = require 'socket'
local errno = require 'errno'
local graphite_host = '127.0.0.1'
local graphite_port = 2003
local ai = socket.getaddrinfo(graphite_host, graphite_port, 1, { type = 'SOCK_DGRAM' })
local addr,port
for _,info in pairs(ai) do
addr,port = info.host,info.port
break
end
if not addr then error("Failed to resolve host") end
queue._monitor = fiber.create(function()
fiber.name('queue.monitor')
fiber.yield()
local remote = socket('AF_INET', 'SOCK_DGRAM', 'udp')
while true do
for k,v in pairs(queue.stats()) do
local msg = string.format("queue.stats.%s %s %s\n", k, tonumber(v), math.floor(fiber.time()))
local res = remote:sendto(addr, port, msg)
if not res then
log.error("Failed to send: %s", errno.strerror(errno()))
end
end
fiber.sleep(1)
end
end)
TCP:
local socket = require 'socket'
local errno = require 'errno'
local graphite_host = '127.0.0.1'
local graphite_port = 2003
queue._monitor = fiber.create(function()
fiber.name('queue.monitor')
fiber.yield()
while true do
local remote = require 'socket'.tcp_connect(graphite_host, graphite_port)
if not remote then
log.error("Failed to connect to graphite %s",errno.strerror())
fiber.sleep(1)
else
while true do
local data = {}
for k,v in pairs(queue.stats()) do
table.insert(data,string.format("queue.stats.%s %s %s\n",k,tonumber(v),math.floor(fiber.time())))
end
data = table.concat(data,'')
if not remote:send(data) then
log.error("%s",errno.strerror())
break
end
fiber.sleep(1)
end
end
end
end)
, Tarantool, . , , . , .
Lua - require
, package.loaded
. require
, . Lua , package.loaded[...]
require
. , , . - :
require'strict'.on()
fiber = require 'fiber'
box.cfg{
listen = '127.0.0.1:3301'
}
box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true })
local not_first_run = rawget(_G,'_NOT_FIRST_RUN')
_NOT_FIRST_RUN = true
if not_first_run then
for k,v in pairs(package.loaded) do
if not preloaded[k] then
package.loaded[k] = nil
end
end
else
preloaded = {}
for k,v in pairs(package.loaded) do
preloaded[k] = true
end
end
queue = require 'queue'
require'console'.start()
os.exit()
, package.reload, . , , , : package.reload()
.
require'strict'.on()
fiber = require 'fiber'
box.cfg{
listen = '127.0.0.1:3301'
}
box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true })
require 'package.reload'
queue = require 'queue'
require'console'.start()
os.exit()
, -. , . , — . , .
local queue = {}
local old = rawget(_G,'queue')
if old then
queue.taken = old.taken
queue.bysid = old.bysid
queue._triggers = old._triggers
queue._stats = old._stats
queue._wait = old._wait
queue._runch = old._runch
queue._runat = old._runat
else
queue.taken = {}
queue.bysid = {}
queue._triggers = {}
queue._stats = {}
queue._wait = fiber.channel()
queue._runch = fiber.cond()
while true do
local t = box.space.queue.index.status:pairs({STATUS.TAKEN}):nth(1)
if not t then break end
box.space.queue:update({ t.id }, {{'=', F.status, STATUS.READY }})
log.info("Autoreleased %s at start", t.id)
end
for k,v in pairs(STATUS) do
queue._stats[v] = 0LL
end
for _,t in box.space.queue:pairs() do
queue._stats[ t[F.status] ] = (queue._stats[ t[F.status] ] or 0LL)+1
end
log.info("Perform initial stat counts %s", box.tuple.new{ queue._stats })
end
. , . , . . . .
queue._triggers.on_replace = box.space.queue:on_replace(function(old,new)
if old then
queue._stats[ old[ F.status ] ] = queue._stats[ old[ F.status ] ] - 1
end
if new then
queue._stats[ new[ F.status ] ] = queue._stats[ new[ F.status ] ] + 1
end
end, queue._triggers.on_replace)
queue._triggers.on_truncate = box.space._truncate:on_replace(function(old,new)
if new.id == box.space.queue.id then
for k,v in pairs(queue._stats) do
queue._stats[k] = 0LL
end
end
end, queue._triggers.on_truncate)
queue._triggers.on_connect = box.session.on_connect(function()
box.session.storage.peer = box.session.peer()
log.info( "connected %s from %s", box.session.id(), box.session.storage.peer )
end, queue._triggers.on_connect)
queue._triggers.on_disconnect = box.session.on_disconnect(function()
log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer )
box.session.storage.destroyed = true
local sid = box.session.id()
local bysid = queue.bysid[ sid ]
if bysid then
while next(bysid) do
for key, id in pairs(bysid) do
log.info("Autorelease %s by disconnect", id);
queue.taken[key] = nil
bysid[key] = nil
local t = box.space.queue:get(id)
if t then
if queue._wait:has_readers() then queue._wait:put(true,0) end
box.space.queue:update({t.id},{{'=', F.status, STATUS.READY }})
end
end
end
queue.bysid[ sid ] = nil
end
end, queue._triggers.on_disconnect)
— . , . while ... true
, . , , , fiber.cond
: condition variable, .
. , fiber.kill
, : kill
. : , . . : .
queue._runat = fiber.create(function(queue, gen, old_fiber)
fiber.name('queue.runat.'..gen)
while package.reload.count == gen and old_fiber and old_fiber:status() ~= 'dead' do
log.info("Waiting for old to die")
queue._runch:wait(0.1)
end
log.info("Started...")
while package.reload.count == gen do
local remaining
local now = clock.realtime()
for _,t in box.space.queue.index.runat
:pairs( {0}, { iterator = 'GT' })
do
if t.runat > now then
remaining = t.runat - now
break
else
if t.status == STATUS.WAITING then
log.info("Runat: W->R %s",t.id)
if queue._wait:has_readers() then queue._wait:put(true,0) end
box.space.queue:update({ t.id }, {
{ '=', F.status, STATUS.READY },
{ '=', F.runat, 0 },
})
else
log.error("Runat: bad status %s for %s", t.status, t.id)
box.space.queue:update({ t.id },{{ '=', F.runat, 0 }})
end
end
end
if not remaining or remaining > 1 then remaining = 1 end
queue._runch:wait(remaining)
end
queue._runch:broadcast()
log.info("Finished")
end, queue, package.reload.count, queue._runat)
queue._runch:broadcast()
: , . :
if not fiber.self().storage.console then
require'console'.start()
os.exit()
end
, , Graphite TCP, . 20 . . 300 .
queue.lua
:
local clock = require 'clock'
local errno = require 'errno'
local fiber = require 'fiber'
local log = require 'log'
local msgpack = require 'msgpack'
local socket = require 'socket'
box.schema.create_space('queue',{ if_not_exists = true; })
box.space.queue:format( {
{ name = 'id'; type = 'number' },
{ name = 'status'; type = 'string' },
{ name = 'runat'; type = 'number' },
{ name = 'data'; type = '*' },
} );
local F = {}
for no,def in pairs(box.space.queue:format()) do
F[no] = def.name
F[def.name] = no
end
box.space.queue:create_index('primary', {
parts = { F.id, 'number' };
if_not_exists = true;
})
box.space.queue:create_index('status', {
parts = { F.status, 'string', F.id, 'number' };
if_not_exists = true;
})
box.space.queue:create_index('runat', {
parts = { F.runat, 'number', F.id, 'number' };
if_not_exists = true;
})
local STATUS = {}
STATUS.READY = 'R'
STATUS.TAKEN = 'T'
STATUS.WAITING = 'W'
local queue = {}
local old = rawget(_G,'queue')
if old then
queue.taken = old.taken
queue.bysid = old.bysid
queue._triggers = old._triggers
queue._stats = old._stats
queue._wait = old._wait
queue._runch = old._runch
queue._runat = old._runat
else
queue.taken = {}
queue.bysid = {}
queue._triggers = {}
queue._stats = {}
queue._wait = fiber.channel()
queue._runch = fiber.cond()
while true do
local t = box.space.queue.index.status:pairs({STATUS.TAKEN}):nth(1)
if not t then break end
box.space.queue:update({ t.id }, {{'=', F.status, STATUS.READY }})
log.info("Autoreleased %s at start", t.id)
end
for k,v in pairs(STATUS) do queue._stats[v] = 0LL end
for _,t in box.space.queue:pairs() do
queue._stats[ t[F.status] ] = (queue._stats[ t[F.status] ] or 0LL)+1
end
log.info("Perform initial stat counts %s", box.tuple.new{ queue._stats })
end
local function gen_id()
local new_id
repeat
new_id = clock.realtime64()
until not box.space.queue:get(new_id)
return new_id
end
local function keypack( key )
return msgpack.encode( key )
end
local function keyunpack( data )
return msgpack.decode( data )
end
queue._triggers.on_replace = box.space.queue:on_replace(function(old,new)
if old then
queue._stats[ old[ F.status ] ] = queue._stats[ old[ F.status ] ] - 1
end
if new then
queue._stats[ new[ F.status ] ] = queue._stats[ new[ F.status ] ] + 1
end
end, queue._triggers.on_replace)
queue._triggers.on_truncate = box.space._truncate:on_replace(function(old,new)
if new.id == box.space.queue.id then
for k,v in pairs(queue._stats) do
queue._stats[k] = 0LL
end
end
end, queue._triggers.on_truncate)
queue._triggers.on_connect = box.session.on_connect(function()
box.session.storage.peer = box.session.peer()
end, queue._triggers.on_connect)
queue._triggers.on_disconnect = box.session.on_disconnect(function()
box.session.storage.destroyed = true
local sid = box.session.id()
local bysid = queue.bysid[ sid ]
if bysid then
log.info( "disconnected %s from %s", box.session.id(), box.session.storage.peer )
while next(bysid) do
for key, id in pairs(bysid) do
log.info("Autorelease %s by disconnect", id);
queue.taken[key] = nil
bysid[key] = nil
local t = box.space.queue:get(id)
if t then
if queue._wait:has_readers() then queue._wait:put(true,0) end
box.space.queue:update({t.id},{{'=', F.status, STATUS.READY }})
end
end
end
queue.bysid[ sid ] = nil
end
end, queue._triggers.on_disconnect)
queue._runat = fiber.create(function(queue, gen, old_fiber)
fiber.name('queue.runat.'..gen)
while package.reload.count == gen and old_fiber and old_fiber:status() ~= 'dead' do
log.info("Waiting for old to die")
queue._runch:wait(0.1)
end
log.info("Started...")
while package.reload.count == gen do
local remaining
local now = clock.realtime()
for _,t in box.space.queue.index.runat
:pairs( {0}, { iterator = 'GT' })
do
if t.runat > now then
remaining = t.runat - now
break
else
if t.status == STATUS.WAITING then
log.info("Runat: W->R %s",t.id)
if queue._wait:has_readers() then queue._wait:put(true,0) end
box.space.queue:update({ t.id }, {
{ '=', F.status, STATUS.READY },
{ '=', F.runat, 0 },
})
else
log.error("Runat: bad status %s for %s", t.status, t.id)
box.space.queue:update({ t.id },{{ '=', F.runat, 0 }})
end
end
end
if not remaining or remaining > 1 then remaining = 1 end
queue._runch:wait(remaining)
end
queue._runch:broadcast()
log.info("Finished")
end, queue, package.reload.count, queue._runat)
queue._runch:broadcast()
local graphite_host = '127.0.0.1'
local graphite_port = 2003
queue._monitor = fiber.create(function(gen)
fiber.name('queue.mon.'..gen)
fiber.yield()
while package.reload.count == gen do
local remote = require 'socket'.tcp_connect(graphite_host, graphite_port)
if not remote then
log.error("Failed to connect to graphite %s",errno.strerror())
fiber.sleep(1)
else
while package.reload.count == gen do
local data = {}
for k,v in pairs(queue.stats()) do
table.insert(data,string.format("queue.stats.%s %s %s\n",k,tonumber(v),math.floor(fiber.time())))
end
data = table.concat(data,'')
if not remote:send(data) then
log.error("%s",errno.strerror())
break
end
fiber.sleep(1)
end
end
end
end, package.reload.count)
function queue.put(data, opts)
local id = gen_id()
local runat = 0
local status = STATUS.READY
if opts and opts.delay then
runat = clock.realtime() + tonumber(opts.delay)
status = STATUS.WAITING
else
if queue._wait:has_readers() then
queue._wait:put(true,0)
end
end
return box.space.queue
:insert{ id, status, runat, data }
:tomap{ names_only=true }
end
function queue.take(timeout)
if not timeout then timeout = 0 end
local now = fiber.time()
local found
while not found do
found = box.space.queue.index.status
:pairs({STATUS.READY},{ iterator = 'EQ' }):nth(1)
if not found then
local left = (now + timeout) - fiber.time()
if left <= 0 then return end
queue._wait:get(left)
end
end
if box.session.storage.destroyed then return end
local sid = box.session.id()
log.info("Register %s by %s", found.id, sid)
local key = keypack( found.id )
queue.taken[ key ] = sid
queue.bysid[ sid ] = queue.bysid[ sid ] or {}
queue.bysid[ sid ][ key ] = found.id
return box.space.queue
:update( {found.id}, {{'=', F.status, STATUS.TAKEN }})
:tomap{ names_only = true }
end
local function get_task( id )
if not id then error("Task id required", 2) end
id = tonumber64(id)
local key = keypack(id)
local t = box.space.queue:get{id}
if not t then
error(string.format( "Task {%s} was not found", id ), 2)
end
if not queue.taken[key] then
error(string.format( "Task %s not taken by anybody", id ), 2)
end
if queue.taken[key] ~= box.session.id() then
error(string.format( "Task %s taken by %d. Not you (%d)",
id, queue.taken[key], box.session.id() ), 2)
end
return t, key
end
function queue.ack(id)
local t, key = get_task(id)
queue.taken[ key ] = nil
queue.bysid[ box.session.id() ][ key ] = nil
return box.space.queue:delete{t.id}:tomap{ names_only = true }
end
function queue.release(id, opts)
local t, key = get_task(id)
queue.taken[ key ] = nil
queue.bysid[ box.session.id() ][ key ] = nil
local runat = 0
local status = STATUS.READY
if opts and opts.delay then
runat = clock.realtime() + tonumber(opts.delay)
status = STATUS.WAITING
else
if queue._wait:has_readers() then queue._wait:put(true,0) end
end
return box.space.queue
:update({t.id},{{'=', F.status, status },{ '=', F.runat, runat }})
:tomap{ names_only = true }
end
function queue.stats()
return {
total = box.space.queue:len(),
ready = queue._stats[ STATUS.READY ],
waiting = queue._stats[ STATUS.WAITING ],
taken = queue._stats[ STATUS.TAKEN ],
}
end
return queue
init.lua
:
require'strict'.on()
fiber = require 'fiber'
require 'package.reload'
box.cfg{
listen = '127.0.0.1:3301'
}
box.schema.user.grant('guest', 'super', nil, nil, { if_not_exists = true })
queue = require 'queue'
if not fiber.self().storage.console then
require'console'.start()
os.exit()
end