#include "StatsService.h" #include "../controllers/StreamController.h" #include "../services/RedisHelper.h" #include "../services/OmeClient.h" #include "../services/RestreamService.h" #include #include #include using namespace drogon; // Macro to simplify JSON integer assignments #define JSON_INT(json, field, value) json[field] = static_cast(value) StatsService::~StatsService() { shutdown(); } void StatsService::initialize() { LOG_INFO << "Initializing Stats Service..."; running_ = true; } void StatsService::startPolling() { if (!running_) { LOG_WARN << "Stats service not initialized, cannot start polling"; return; } LOG_INFO << "Starting stats polling timer..."; if (auto loop = drogon::app().getLoop()) { try { // Do an immediate poll pollOmeStats(); // Then set up the timer timerId_ = loop->runEvery( pollInterval_.count(), [this]() { if (!running_) return; try { pollOmeStats(); } catch (const std::exception& e) { LOG_ERROR << "Error in stats polling: " << e.what(); } } ); LOG_INFO << "Stats polling timer started with " << pollInterval_.count() << "s interval"; } catch (const std::exception& e) { LOG_ERROR << "Failed to create stats timer: " << e.what(); } } else { LOG_ERROR << "Event loop not available for stats polling"; } } void StatsService::shutdown() { LOG_INFO << "Shutting down Stats Service..."; running_ = false; if (timerId_.has_value()) { if (auto loop = drogon::app().getLoop()) { loop->invalidateTimer(timerId_.value()); } timerId_.reset(); } } int64_t StatsService::getUniqueViewerCount(const std::string& streamKey) { try { auto redis = services::RedisHelper::instance().getConnection(); if (!redis) return 0; std::vector keys; redis->keys("viewer_token:*", std::back_inserter(keys)); return std::count_if(keys.begin(), keys.end(), [&redis, &streamKey](const auto& tokenKey) { auto storedKey = redis->get(tokenKey); return storedKey.has_value() && storedKey.value() == streamKey; }); } catch (const std::exception& e) { LOG_ERROR << "Error getting unique viewer count: " << e.what(); return 0; } } void StatsService::pollOmeStats() { LOG_INFO << "Polling OvenMediaEngine for active streams..."; // Get active streams from OME OmeClient::getInstance().getActiveStreams([this](bool success, const Json::Value& json) { if (success && json.isMember("response")) { LOG_INFO << "OME Active Streams Response: " << json["response"].toStyledString(); std::set activeStreamKeys; // Handle both array and object responses from OME if (json["response"].isArray()) { for (const auto& stream : json["response"]) { if (stream.isString()) { activeStreamKeys.insert(stream.asString()); } } } else if (json["response"].isMember("streams") && json["response"]["streams"].isArray()) { for (const auto& stream : json["response"]["streams"]) { if (stream.isString()) { activeStreamKeys.insert(stream.asString()); } else if (stream.isMember("name")) { activeStreamKeys.insert(stream["name"].asString()); } } } LOG_INFO << "Found " << activeStreamKeys.size() << " active streams from OME"; // Update each active stream for (const auto& streamKey : activeStreamKeys) { LOG_INFO << "Processing active stream: " << streamKey; // IMMEDIATELY update database to mark as live and get realm ID auto dbClient = app().getDbClient(); *dbClient << "UPDATE realms SET is_live = true, viewer_count = 0, " "updated_at = CURRENT_TIMESTAMP WHERE stream_key = $1 RETURNING id" << streamKey >> [streamKey](const orm::Result& r) { LOG_INFO << "Successfully marked realm as live: " << streamKey; // Attempt reconnection for any disconnected restream destinations if (!r.empty()) { int64_t realmId = r[0]["id"].as(); RestreamService::getInstance().attemptReconnections(streamKey, realmId); } } >> [streamKey](const orm::DrogonDbException& e) { LOG_ERROR << "Failed to update realm live status: " << e.base().what(); }; // Then update detailed stats updateStreamStats(streamKey); } // Mark all non-active streams as offline auto dbClient = app().getDbClient(); *dbClient << "SELECT stream_key FROM realms WHERE is_live = true" >> [activeStreamKeys](const orm::Result& r) { auto db = app().getDbClient(); for (const auto& row : r) { std::string key = row["stream_key"].as(); if (activeStreamKeys.find(key) == activeStreamKeys.end()) { LOG_INFO << "Marking realm as offline: " << key; *db << "UPDATE realms SET is_live = false, viewer_count = 0, " "updated_at = CURRENT_TIMESTAMP WHERE stream_key = $1" << key >> [key](const orm::Result&) { LOG_INFO << "Marked realm as offline: " << key; } >> [](const orm::DrogonDbException& e) { LOG_ERROR << "Failed to mark realm offline: " << e.base().what(); }; } } } >> [](const orm::DrogonDbException& e) { LOG_ERROR << "Failed to query live realms: " << e.base().what(); }; } else { LOG_ERROR << "Failed to get active streams from OME or empty response"; } }); } void StatsService::updateStreamStats(const std::string& streamKey) { fetchStatsFromOme(streamKey, [this, streamKey](bool success, const StreamStats& stats) { if (success) { StreamStats updatedStats = stats; // Only count viewer tokens when stream is actually live // Offline streams should show 0 viewers (tokens may linger for 5 min after disconnect) updatedStats.uniqueViewers = stats.isLive ? getUniqueViewerCount(streamKey) : 0; storeStatsInRedis(streamKey, updatedStats); // Update realm in database updateRealmLiveStatus(streamKey, updatedStats); // Only broadcast if stream has meaningful data or is live if (updatedStats.isLive || updatedStats.totalBytesIn > 0 || updatedStats.uniqueViewers > 0) { Json::Value msg; msg["type"] = "stats_update"; msg["stream_key"] = streamKey; auto& s = msg["stats"]; JSON_INT(s, "connections", updatedStats.uniqueViewers); JSON_INT(s, "raw_connections", updatedStats.currentConnections); s["bitrate"] = updatedStats.bitrate; s["resolution"] = updatedStats.resolution; s["fps"] = updatedStats.fps; s["codec"] = updatedStats.codec; s["is_live"] = updatedStats.isLive; JSON_INT(s, "bytes_in", updatedStats.totalBytesIn); JSON_INT(s, "bytes_out", updatedStats.totalBytesOut); // Protocol breakdown auto& pc = s["protocol_connections"]; JSON_INT(pc, "webrtc", updatedStats.protocolConnections.webrtc); JSON_INT(pc, "hls", updatedStats.protocolConnections.hls); JSON_INT(pc, "llhls", updatedStats.protocolConnections.llhls); JSON_INT(pc, "dash", updatedStats.protocolConnections.dash); StreamWebSocketController::broadcastStatsUpdate(msg); } } }); } void StatsService::updateRealmLiveStatus(const std::string& streamKey, const StreamStats& stats) { auto dbClient = app().getDbClient(); // Cast to int32 to match PostgreSQL integer type int32_t viewerCount = static_cast(stats.uniqueViewers); // Update realm's live status and viewer count *dbClient << "UPDATE realms SET is_live = $1, viewer_count = $2, updated_at = CURRENT_TIMESTAMP WHERE stream_key = $3" << stats.isLive << viewerCount << streamKey >> [streamKey, stats](const orm::Result&) { LOG_INFO << "Updated realm status for stream " << streamKey << " - Live: " << stats.isLive << ", Viewers: " << stats.uniqueViewers; } >> [streamKey](const orm::DrogonDbException& e) { LOG_ERROR << "Failed to update realm status for " << streamKey << ": " << e.base().what(); }; } void StatsService::fetchStatsFromOme(const std::string& streamKey, std::function callback) { LOG_DEBUG << "Fetching stats for stream: " << streamKey; // First, try to get the stream stats OmeClient::getInstance().getStreamStats(streamKey, [this, callback, streamKey](bool success, const Json::Value& json) { StreamStats stats; bool streamExists = false; if (success && json.isMember("response") && !json["response"].isNull()) { try { const auto& data = json["response"]; streamExists = true; // Parse connections if (data.isMember("connections")) { const auto& conns = data["connections"]; int64_t totalConns = 0; for (const auto& protocolName : conns.getMemberNames()) { int64_t count = conns[protocolName].asInt64(); auto& pc = stats.protocolConnections; if (protocolName == "webrtc") pc.webrtc = count; else if (protocolName == "hls") pc.hls = count; else if (protocolName == "llhls") pc.llhls = count; else if (protocolName == "dash") pc.dash = count; totalConns += count; } stats.currentConnections = totalConns; stats.totalConnections = totalConns; } // Check multiple indicators for live status bool hasInput = false; // Check for input field if (data.isMember("input") && !data["input"].isNull()) { hasInput = true; const auto& input = data["input"]; // Get bitrate from input tracks (OME returns bytes/sec, convert to bits/sec) if (input.isMember("tracks") && input["tracks"].isArray()) { for (const auto& track : input["tracks"]) { if (track["type"].asString() == "video" && track.isMember("bitrate")) { stats.bitrate = track["bitrate"].asDouble() * 8; // Convert bytes/sec to bits/sec } } } } // Alternative: Check lastThroughputIn (OME returns bytes/sec, convert to bits/sec) if (!hasInput && data.isMember("lastThroughputIn")) { double throughput = data["lastThroughputIn"].asDouble(); if (throughput > 0) { hasInput = true; stats.bitrate = throughput * 8; // Convert bytes/sec to bits/sec } } // Alternative: Check avgThroughputIn (OME returns bytes/sec, convert to bits/sec) if (!hasInput && data.isMember("avgThroughputIn")) { double avgThroughput = data["avgThroughputIn"].asDouble(); if (avgThroughput > 0) { hasInput = true; stats.bitrate = avgThroughput * 8; // Convert bytes/sec to bits/sec } } // Check bytes counters if (data.isMember("totalBytesIn")) { stats.totalBytesIn = data["totalBytesIn"].asInt64(); if (stats.totalBytesIn > 0) { hasInput = true; } } if (data.isMember("totalBytesOut")) { stats.totalBytesOut = data["totalBytesOut"].asInt64(); } // Stream is live if it has input or active bitrate stats.isLive = hasInput || stats.bitrate > 0; LOG_DEBUG << "Stream " << streamKey << " - hasInput: " << hasInput << ", bitrate: " << stats.bitrate << ", totalBytesIn: " << stats.totalBytesIn << ", isLive: " << stats.isLive; } catch (const std::exception& e) { LOG_ERROR << "Failed to parse stats: " << e.what(); stats.isLive = false; } } else { // Stream doesn't exist in OME stats.isLive = false; LOG_DEBUG << "Stream " << streamKey << " not found in OME"; } stats.lastUpdated = std::chrono::system_clock::now(); // If stream exists, try to get detailed stream info if (streamExists) { OmeClient::getInstance().getStreamInfo(streamKey, [callback, stats](bool infoSuccess, const Json::Value& infoJson) mutable { // Parse stream metadata if available if (infoSuccess && infoJson.isMember("response")) { try { const auto& response = infoJson["response"]; LOG_DEBUG << "Stream info response: " << response.toStyledString(); // Check if stream has input (another way to verify it's live) if (response.isMember("input") && !response["input"].isNull()) { stats.isLive = true; // Try to get codec from input tracks first if (response["input"].isMember("tracks") && response["input"]["tracks"].isArray()) { for (const auto& track : response["input"]["tracks"]) { if (track["type"].asString() == "video") { if (track.isMember("codec")) { std::string codec = track["codec"].asString(); // Clean up codec string if (codec == "H264" || codec == "h264") { stats.codec = "H.264"; } else if (codec == "H265" || codec == "h265") { stats.codec = "H.265"; } else if (codec == "VP8" || codec == "vp8") { stats.codec = "VP8"; } else if (codec == "VP9" || codec == "vp9") { stats.codec = "VP9"; } else { stats.codec = codec; } } if (track.isMember("width") && track.isMember("height")) { stats.resolution = std::to_string(track["width"].asInt()) + "x" + std::to_string(track["height"].asInt()); } if (track.isMember("framerate")) { stats.fps = track["framerate"].asDouble(); } else if (track.isMember("frameRate")) { stats.fps = track["frameRate"].asDouble(); } break; } } } } // If no codec found in input, try output tracks if (stats.codec.empty() || stats.codec == "N/A") { if (response.isMember("tracks") && response["tracks"].isArray()) { for (const auto& track : response["tracks"]) { if (track["type"].asString() == "video") { if (track.isMember("codec")) { std::string codec = track["codec"].asString(); if (codec == "H264" || codec == "h264") { stats.codec = "H.264"; } else if (codec == "H265" || codec == "h265") { stats.codec = "H.265"; } else if (codec == "VP8" || codec == "vp8") { stats.codec = "VP8"; } else if (codec == "VP9" || codec == "vp9") { stats.codec = "VP9"; } else { stats.codec = codec; } } if (stats.resolution == "N/A" && track.isMember("width") && track.isMember("height")) { stats.resolution = std::to_string(track["width"].asInt()) + "x" + std::to_string(track["height"].asInt()); } if (stats.fps == 0 && track.isMember("framerate")) { stats.fps = track["framerate"].asDouble(); } else if (stats.fps == 0 && track.isMember("frameRate")) { stats.fps = track["frameRate"].asDouble(); } break; } } } } // Set defaults if still empty if (stats.codec.empty()) { stats.codec = "Unknown"; } } catch (const std::exception& e) { LOG_ERROR << "Failed to parse stream info: " << e.what(); } } callback(true, stats); }); } else { // Stream doesn't exist, return offline stats callback(true, stats); } }); } void StatsService::storeStatsInRedis(const std::string& streamKey, const StreamStats& stats) { Json::Value json; JSON_INT(json, "connections", stats.currentConnections); JSON_INT(json, "unique_viewers", stats.uniqueViewers); JSON_INT(json, "total_connections", stats.totalConnections); JSON_INT(json, "bytes_in", stats.totalBytesIn); JSON_INT(json, "bytes_out", stats.totalBytesOut); json["bitrate"] = stats.bitrate; json["codec"] = stats.codec; json["resolution"] = stats.resolution; json["fps"] = stats.fps; json["is_live"] = stats.isLive; JSON_INT(json, "last_updated", std::chrono::duration_cast( stats.lastUpdated.time_since_epoch() ).count() ); // Protocol connections Json::Value pc; JSON_INT(pc, "webrtc", stats.protocolConnections.webrtc); JSON_INT(pc, "hls", stats.protocolConnections.hls); JSON_INT(pc, "llhls", stats.protocolConnections.llhls); JSON_INT(pc, "dash", stats.protocolConnections.dash); json["protocol_connections"] = pc; // Store connection drop timestamp if recent auto timeSinceDrop = std::chrono::duration_cast( std::chrono::system_clock::now() - stats.lastConnectionDrop).count(); if (timeSinceDrop < 60) { JSON_INT(json, "last_connection_drop", std::chrono::duration_cast( stats.lastConnectionDrop.time_since_epoch() ).count() ); } RedisHelper::storeKey("stream_stats:" + streamKey, Json::FastWriter().write(json), 10); } void StatsService::getStreamStats(const std::string& streamKey, std::function callback) { std::string jsonStr = RedisHelper::getKey("stream_stats:" + streamKey); if (jsonStr.empty()) { // Fetch fresh stats from OME and populate uniqueViewers LOG_DEBUG << "No cached stats, fetching from OME for " << streamKey; fetchStatsFromOme(streamKey, [this, callback, streamKey](bool success, const StreamStats& stats) { if (success) { StreamStats updatedStats = stats; // Only count viewer tokens when stream is actually live updatedStats.uniqueViewers = stats.isLive ? getUniqueViewerCount(streamKey) : 0; callback(true, updatedStats); } else { callback(false, stats); } }); return; } try { Json::Value json; Json::Reader reader; if (reader.parse(jsonStr, json)) { StreamStats stats; stats.currentConnections = json["connections"].asInt64(); stats.uniqueViewers = json["unique_viewers"].asInt64(); stats.totalConnections = json["total_connections"].asInt64(); stats.totalBytesIn = json["bytes_in"].asInt64(); stats.totalBytesOut = json["bytes_out"].asInt64(); stats.bitrate = json["bitrate"].asDouble(); stats.codec = json["codec"].asString(); stats.resolution = json["resolution"].asString(); stats.fps = json["fps"].asDouble(); stats.isLive = json["is_live"].asBool(); // Parse protocol connections if (json.isMember("protocol_connections")) { const auto& pc = json["protocol_connections"]; stats.protocolConnections.webrtc = pc["webrtc"].asInt64(); stats.protocolConnections.hls = pc["hls"].asInt64(); stats.protocolConnections.llhls = pc["llhls"].asInt64(); stats.protocolConnections.dash = pc["dash"].asInt64(); } stats.lastUpdated = std::chrono::system_clock::time_point( std::chrono::seconds(json["last_updated"].asInt64()) ); // Verify is_live from database (source of truth from webhooks) // This prevents stale cache from overriding the webhook-updated DB state auto dbClient = app().getDbClient(); *dbClient << "SELECT is_live FROM realms WHERE stream_key = $1" << streamKey >> [callback, stats](const orm::Result& r) mutable { if (!r.empty()) { bool dbIsLive = r[0]["is_live"].as(); // If database says live but cache says offline, trust database // (webhooks update DB immediately, cache may be stale) if (dbIsLive && !stats.isLive) { LOG_DEBUG << "Overriding stale cache: DB says live, cache says offline"; stats.isLive = true; } } callback(true, stats); } >> [callback, stats](const orm::DrogonDbException& e) { LOG_ERROR << "Failed to verify is_live from DB: " << e.base().what(); // Fall back to cached value on DB error callback(true, stats); }; LOG_DEBUG << "Retrieved cached stats for " << streamKey; return; // Callback handled async } else { // Fallback to fresh fetch if cached data is corrupted fetchStatsFromOme(streamKey, [this, callback, streamKey](bool success, const StreamStats& stats) { if (success) { StreamStats updatedStats = stats; // Only count viewer tokens when stream is actually live updatedStats.uniqueViewers = stats.isLive ? getUniqueViewerCount(streamKey) : 0; callback(true, updatedStats); } else { callback(false, stats); } }); } } catch (const std::exception& e) { LOG_ERROR << "Failed to parse cached stats: " << e.what(); // Fallback to fresh fetch fetchStatsFromOme(streamKey, [this, callback, streamKey](bool success, const StreamStats& stats) { if (success) { StreamStats updatedStats = stats; // Only count viewer tokens when stream is actually live updatedStats.uniqueViewers = stats.isLive ? getUniqueViewerCount(streamKey) : 0; callback(true, updatedStats); } else { callback(false, stats); } }); } }