local redis_helper = require "redis_helper" local cjson = require "cjson" -- Safely load the http module local has_http, http = pcall(require, "resty.http") local function disconnect_stream(stream_key) if not has_http then ngx.log(ngx.WARN, "resty.http module not available, skipping stream disconnection") return end local httpc = http.new() local ome_url = os.getenv("OME_URL") or "http://ovenmediaengine:8081" local ome_token = os.getenv("OME_API_TOKEN") or "your-api-token" -- Get active streams from OME local res, err = httpc:request_uri(ome_url .. "/v1/vhosts/default/apps/app/streams", { method = "GET", headers = { ["Authorization"] = "Bearer " .. ome_token, ["Content-Type"] = "application/json" } }) if not res then ngx.log(ngx.ERR, "Failed to get streams: ", err) return end local ok, data = pcall(cjson.decode, res.body) if ok and data and data.response and data.response.streams then for _, stream in ipairs(data.response.streams) do if stream.name == stream_key then -- Disconnect the stream local del_res, del_err = httpc:request_uri( ome_url .. "/v1/vhosts/default/apps/app/streams/" .. stream.name, { method = "DELETE", headers = { ["Authorization"] = "Bearer " .. ome_token } } ) if del_res and del_res.status == 200 then ngx.log(ngx.INFO, "Disconnected stream: ", stream_key) else ngx.log(ngx.ERR, "Failed to disconnect stream: ", stream_key) end break end end end end local function monitor_streams() -- Check if Redis is available local ok, keys = pcall(redis_helper.get_streams_to_disconnect) if not ok then ngx.log(ngx.WARN, "Redis not available yet") return end for _, key in ipairs(keys or {}) do disconnect_stream(key) redis_helper.remove_stream_from_disconnect(key) end end -- Start monitoring in a timer with error handling local function start_monitoring() local ok, err = ngx.timer.every(1, monitor_streams) if not ok then ngx.log(ngx.ERR, "Failed to create timer: ", err) else ngx.log(ngx.INFO, "Stream monitor started") end end -- Delay start to ensure services are ready ngx.timer.at(5, start_monitoring)