84 lines
2.6 KiB
Lua
84 lines
2.6 KiB
Lua
|
|
|
||
|
|
local redis_helper = require "redis_helper"
|
||
|
|
local cjson = require "cjson"
|
||
|
|
|
||
|
|
-- Safely load the http module
|
||
|
|
local has_http, http = pcall(require, "resty.http")
|
||
|
|
|
||
|
|
local function disconnect_stream(stream_key)
|
||
|
|
if not has_http then
|
||
|
|
ngx.log(ngx.WARN, "resty.http module not available, skipping stream disconnection")
|
||
|
|
return
|
||
|
|
end
|
||
|
|
|
||
|
|
local httpc = http.new()
|
||
|
|
local ome_url = os.getenv("OME_URL") or "http://ovenmediaengine:8081"
|
||
|
|
local ome_token = os.getenv("OME_API_TOKEN") or "your-api-token"
|
||
|
|
|
||
|
|
-- Get active streams from OME
|
||
|
|
local res, err = httpc:request_uri(ome_url .. "/v1/vhosts/default/apps/app/streams", {
|
||
|
|
method = "GET",
|
||
|
|
headers = {
|
||
|
|
["Authorization"] = "Bearer " .. ome_token,
|
||
|
|
["Content-Type"] = "application/json"
|
||
|
|
}
|
||
|
|
})
|
||
|
|
|
||
|
|
if not res then
|
||
|
|
ngx.log(ngx.ERR, "Failed to get streams: ", err)
|
||
|
|
return
|
||
|
|
end
|
||
|
|
|
||
|
|
local ok, data = pcall(cjson.decode, res.body)
|
||
|
|
if ok and data and data.response and data.response.streams then
|
||
|
|
for _, stream in ipairs(data.response.streams) do
|
||
|
|
if stream.name == stream_key then
|
||
|
|
-- Disconnect the stream
|
||
|
|
local del_res, del_err = httpc:request_uri(
|
||
|
|
ome_url .. "/v1/vhosts/default/apps/app/streams/" .. stream.name,
|
||
|
|
{
|
||
|
|
method = "DELETE",
|
||
|
|
headers = {
|
||
|
|
["Authorization"] = "Bearer " .. ome_token
|
||
|
|
}
|
||
|
|
}
|
||
|
|
)
|
||
|
|
|
||
|
|
if del_res and del_res.status == 200 then
|
||
|
|
ngx.log(ngx.INFO, "Disconnected stream: ", stream_key)
|
||
|
|
else
|
||
|
|
ngx.log(ngx.ERR, "Failed to disconnect stream: ", stream_key)
|
||
|
|
end
|
||
|
|
|
||
|
|
break
|
||
|
|
end
|
||
|
|
end
|
||
|
|
end
|
||
|
|
end
|
||
|
|
|
||
|
|
local function monitor_streams()
|
||
|
|
-- Check if Redis is available
|
||
|
|
local ok, keys = pcall(redis_helper.get_streams_to_disconnect)
|
||
|
|
if not ok then
|
||
|
|
ngx.log(ngx.WARN, "Redis not available yet")
|
||
|
|
return
|
||
|
|
end
|
||
|
|
|
||
|
|
for _, key in ipairs(keys or {}) do
|
||
|
|
disconnect_stream(key)
|
||
|
|
redis_helper.remove_stream_from_disconnect(key)
|
||
|
|
end
|
||
|
|
end
|
||
|
|
|
||
|
|
-- Start monitoring in a timer with error handling
|
||
|
|
local function start_monitoring()
|
||
|
|
local ok, err = ngx.timer.every(1, monitor_streams)
|
||
|
|
if not ok then
|
||
|
|
ngx.log(ngx.ERR, "Failed to create timer: ", err)
|
||
|
|
else
|
||
|
|
ngx.log(ngx.INFO, "Stream monitor started")
|
||
|
|
end
|
||
|
|
end
|
||
|
|
|
||
|
|
-- Delay start to ensure services are ready
|
||
|
|
ngx.timer.at(5, start_monitoring)
|