#include "StatsService.h" #include "../controllers/StreamController.h" #include "../services/RedisHelper.h" #include "../services/OmeClient.h" #include #include using namespace drogon; // Macro to simplify JSON integer assignments #define JSON_INT(json, field, value) json[field] = static_cast(value) StatsService::~StatsService() { shutdown(); } void StatsService::initialize() { LOG_INFO << "Initializing Stats Service..."; running_ = true; drogon::app().registerBeginningAdvice([this]() { LOG_INFO << "Starting stats polling timer..."; if (auto loop = drogon::app().getLoop()) { try { timerId_ = loop->runEvery( pollInterval_.count(), [this]() { if (!running_) return; try { pollOmeStats(); } catch (const std::exception& e) { LOG_ERROR << "Error in stats polling: " << e.what(); } } ); LOG_INFO << "Stats polling timer started with " << pollInterval_.count() << "s interval"; } catch (const std::exception& e) { LOG_ERROR << "Failed to create stats timer: " << e.what(); } } }); } void StatsService::shutdown() { LOG_INFO << "Shutting down Stats Service..."; running_ = false; if (timerId_.has_value()) { if (auto loop = drogon::app().getLoop()) { loop->invalidateTimer(timerId_.value()); } timerId_.reset(); } } int64_t StatsService::getUniqueViewerCount(const std::string& streamKey) { try { auto redis = services::RedisHelper::instance().getConnection(); if (!redis) return 0; std::vector keys; redis->keys("viewer_token:*", std::back_inserter(keys)); return std::count_if(keys.begin(), keys.end(), [&redis, &streamKey](const auto& tokenKey) { auto storedKey = redis->get(tokenKey); return storedKey.has_value() && storedKey.value() == streamKey; }); } catch (const std::exception& e) { LOG_ERROR << "Error getting unique viewer count: " << e.what(); return 0; } } void StatsService::pollOmeStats() { // Get active streams from OME OmeClient::getInstance().getActiveStreams([this](bool success, const Json::Value& json) { if (success && json["response"].isArray()) { for (const auto& stream : json["response"]) { if (stream.isString()) { updateStreamStats(stream.asString()); } } } }); // Poll known stream keys from Redis services::RedisHelper::instance().keysAsync("stream_key:*", [this](const std::vector& keys) { for (const auto& key : keys) { if (auto pos = key.find(':'); pos != std::string::npos) { updateStreamStats(key.substr(pos + 1)); } } } ); } void StatsService::updateStreamStats(const std::string& streamKey) { fetchStatsFromOme(streamKey, [this, streamKey](bool success, const StreamStats& stats) { if (success) { StreamStats updatedStats = stats; updatedStats.uniqueViewers = getUniqueViewerCount(streamKey); storeStatsInRedis(streamKey, updatedStats); // Update realm in database updateRealmLiveStatus(streamKey, updatedStats); // Only broadcast if stream has meaningful data if (updatedStats.isLive || updatedStats.totalBytesIn > 0 || updatedStats.uniqueViewers > 0) { Json::Value msg; msg["type"] = "stats_update"; msg["stream_key"] = streamKey; auto& s = msg["stats"]; JSON_INT(s, "connections", updatedStats.uniqueViewers); JSON_INT(s, "raw_connections", updatedStats.currentConnections); s["bitrate"] = updatedStats.bitrate; s["resolution"] = updatedStats.resolution; s["fps"] = updatedStats.fps; s["codec"] = updatedStats.codec; s["is_live"] = updatedStats.isLive; JSON_INT(s, "bytes_in", updatedStats.totalBytesIn); JSON_INT(s, "bytes_out", updatedStats.totalBytesOut); // Protocol breakdown auto& pc = s["protocol_connections"]; JSON_INT(pc, "webrtc", updatedStats.protocolConnections.webrtc); JSON_INT(pc, "hls", updatedStats.protocolConnections.hls); JSON_INT(pc, "llhls", updatedStats.protocolConnections.llhls); JSON_INT(pc, "dash", updatedStats.protocolConnections.dash); StreamWebSocketController::broadcastStatsUpdate(msg); } } }); } void StatsService::updateRealmLiveStatus(const std::string& streamKey, const StreamStats& stats) { auto dbClient = app().getDbClient(); // Update realm's live status and viewer count *dbClient << "UPDATE realms SET is_live = $1, viewer_count = $2 WHERE stream_key = $3" << stats.isLive << stats.uniqueViewers << streamKey >> [streamKey, stats](const orm::Result&) { LOG_DEBUG << "Updated realm status for stream " << streamKey << " - Live: " << stats.isLive << ", Viewers: " << stats.uniqueViewers; } >> [streamKey](const orm::DrogonDbException& e) { LOG_ERROR << "Failed to update realm status for " << streamKey << ": " << e.base().what(); }; } void StatsService::fetchStatsFromOme(const std::string& streamKey, std::function callback) { LOG_DEBUG << "Fetching stats for stream: " << streamKey; OmeClient::getInstance().getStreamStats(streamKey, [this, callback, streamKey](bool success, const Json::Value& json) { StreamStats stats; if (success && json.isMember("response") && !json["response"].isNull()) { try { const auto& data = json["response"]; // Parse connections if (data.isMember("connections")) { const auto& conns = data["connections"]; int64_t totalConns = 0; for (const auto& protocolName : conns.getMemberNames()) { int64_t count = conns[protocolName].asInt64(); auto& pc = stats.protocolConnections; if (protocolName == "webrtc") pc.webrtc = count; else if (protocolName == "hls") pc.hls = count; else if (protocolName == "llhls") pc.llhls = count; else if (protocolName == "dash") pc.dash = count; totalConns += count; } stats.currentConnections = totalConns; stats.totalConnections = totalConns; } // Bitrate stats.bitrate = data.isMember("lastThroughputIn") ? data["lastThroughputIn"].asDouble() : (data.isMember("avgThroughputIn") ? data["avgThroughputIn"].asDouble() : 0); // Byte counters if (data.isMember("totalBytesIn")) stats.totalBytesIn = data["totalBytesIn"].asInt64(); if (data.isMember("totalBytesOut")) stats.totalBytesOut = data["totalBytesOut"].asInt64(); stats.isLive = (stats.bitrate > 0 || stats.currentConnections > 0); LOG_DEBUG << "OME stats response: " << json.toStyledString(); } catch (const std::exception& e) { LOG_ERROR << "Failed to parse stats: " << e.what(); stats.isLive = false; } } else { stats.isLive = false; } stats.lastUpdated = std::chrono::system_clock::now(); // Now fetch stream info for resolution/codec/fps OmeClient::getInstance().getStreamInfo(streamKey, [callback, stats](bool infoSuccess, const Json::Value& infoJson) mutable { // Parse stream metadata if available if (infoSuccess && infoJson.isMember("response") && infoJson["response"].isMember("tracks")) { try { for (const auto& track : infoJson["response"]["tracks"]) { if (track["type"].asString() == "video") { if (track.isMember("codec")) { stats.codec = track["codec"].asString(); } if (track.isMember("width") && track.isMember("height")) { stats.resolution = std::to_string(track["width"].asInt()) + "x" + std::to_string(track["height"].asInt()); } if (track.isMember("framerate")) { stats.fps = track["framerate"].asDouble(); } break; } } } catch (const std::exception& e) { LOG_ERROR << "Failed to parse stream info: " << e.what(); } } callback(true, stats); }); }); } void StatsService::storeStatsInRedis(const std::string& streamKey, const StreamStats& stats) { Json::Value json; JSON_INT(json, "connections", stats.currentConnections); JSON_INT(json, "unique_viewers", stats.uniqueViewers); JSON_INT(json, "total_connections", stats.totalConnections); JSON_INT(json, "bytes_in", stats.totalBytesIn); JSON_INT(json, "bytes_out", stats.totalBytesOut); json["bitrate"] = stats.bitrate; json["codec"] = stats.codec; json["resolution"] = stats.resolution; json["fps"] = stats.fps; json["is_live"] = stats.isLive; JSON_INT(json, "last_updated", std::chrono::duration_cast( stats.lastUpdated.time_since_epoch() ).count() ); // Protocol connections Json::Value pc; JSON_INT(pc, "webrtc", stats.protocolConnections.webrtc); JSON_INT(pc, "hls", stats.protocolConnections.hls); JSON_INT(pc, "llhls", stats.protocolConnections.llhls); JSON_INT(pc, "dash", stats.protocolConnections.dash); json["protocol_connections"] = pc; // Store connection drop timestamp if recent auto timeSinceDrop = std::chrono::duration_cast( std::chrono::system_clock::now() - stats.lastConnectionDrop).count(); if (timeSinceDrop < 60) { JSON_INT(json, "last_connection_drop", std::chrono::duration_cast( stats.lastConnectionDrop.time_since_epoch() ).count() ); } RedisHelper::storeKey("stream_stats:" + streamKey, Json::FastWriter().write(json), 10); } void StatsService::getStreamStats(const std::string& streamKey, std::function callback) { std::string jsonStr = RedisHelper::getKey("stream_stats:" + streamKey); if (jsonStr.empty()) { // Fetch fresh stats from OME and populate uniqueViewers LOG_DEBUG << "No cached stats, fetching from OME for " << streamKey; fetchStatsFromOme(streamKey, [this, callback, streamKey](bool success, const StreamStats& stats) { if (success) { StreamStats updatedStats = stats; // FIX: Set uniqueViewers on cache miss! updatedStats.uniqueViewers = getUniqueViewerCount(streamKey); callback(true, updatedStats); } else { callback(false, stats); } }); return; } try { Json::Value json; Json::Reader reader; if (reader.parse(jsonStr, json)) { StreamStats stats; stats.currentConnections = json["connections"].asInt64(); stats.uniqueViewers = json["unique_viewers"].asInt64(); stats.totalConnections = json["total_connections"].asInt64(); stats.totalBytesIn = json["bytes_in"].asInt64(); stats.totalBytesOut = json["bytes_out"].asInt64(); stats.bitrate = json["bitrate"].asDouble(); stats.codec = json["codec"].asString(); stats.resolution = json["resolution"].asString(); stats.fps = json["fps"].asDouble(); stats.isLive = json["is_live"].asBool(); // Parse protocol connections if (json.isMember("protocol_connections")) { const auto& pc = json["protocol_connections"]; stats.protocolConnections.webrtc = pc["webrtc"].asInt64(); stats.protocolConnections.hls = pc["hls"].asInt64(); stats.protocolConnections.llhls = pc["llhls"].asInt64(); stats.protocolConnections.dash = pc["dash"].asInt64(); } stats.lastUpdated = std::chrono::system_clock::time_point( std::chrono::seconds(json["last_updated"].asInt64()) ); callback(true, stats); LOG_DEBUG << "Retrieved cached stats for " << streamKey; } else { // Fallback to fresh fetch if cached data is corrupted fetchStatsFromOme(streamKey, [this, callback, streamKey](bool success, const StreamStats& stats) { if (success) { StreamStats updatedStats = stats; updatedStats.uniqueViewers = getUniqueViewerCount(streamKey); callback(true, updatedStats); } else { callback(false, stats); } }); } } catch (const std::exception& e) { LOG_ERROR << "Failed to parse cached stats: " << e.what(); // Fallback to fresh fetch fetchStatsFromOme(streamKey, [this, callback, streamKey](bool success, const StreamStats& stats) { if (success) { StreamStats updatedStats = stats; updatedStats.uniqueViewers = getUniqueViewerCount(streamKey); callback(true, updatedStats); } else { callback(false, stats); } }); } }