This commit is contained in:
doomtube 2025-08-09 13:51:36 -04:00
parent 875a53f499
commit d812c6aeab
8 changed files with 2688 additions and 113 deletions

View file

@ -4,6 +4,7 @@
#include "../services/OmeClient.h"
#include <drogon/HttpClient.h>
#include <drogon/utils/Utilities.h>
#include <set>
using namespace drogon;
@ -17,29 +18,40 @@ StatsService::~StatsService() {
void StatsService::initialize() {
LOG_INFO << "Initializing Stats Service...";
running_ = true;
}
void StatsService::startPolling() {
if (!running_) {
LOG_WARN << "Stats service not initialized, cannot start polling";
return;
}
drogon::app().registerBeginningAdvice([this]() {
LOG_INFO << "Starting stats polling timer...";
if (auto loop = drogon::app().getLoop()) {
try {
timerId_ = loop->runEvery(
pollInterval_.count(),
[this]() {
if (!running_) return;
try {
pollOmeStats();
} catch (const std::exception& e) {
LOG_ERROR << "Error in stats polling: " << e.what();
}
LOG_INFO << "Starting stats polling timer...";
if (auto loop = drogon::app().getLoop()) {
try {
// Do an immediate poll
pollOmeStats();
// Then set up the timer
timerId_ = loop->runEvery(
pollInterval_.count(),
[this]() {
if (!running_) return;
try {
pollOmeStats();
} catch (const std::exception& e) {
LOG_ERROR << "Error in stats polling: " << e.what();
}
);
LOG_INFO << "Stats polling timer started with " << pollInterval_.count() << "s interval";
} catch (const std::exception& e) {
LOG_ERROR << "Failed to create stats timer: " << e.what();
}
}
);
LOG_INFO << "Stats polling timer started with " << pollInterval_.count() << "s interval";
} catch (const std::exception& e) {
LOG_ERROR << "Failed to create stats timer: " << e.what();
}
});
} else {
LOG_ERROR << "Event loop not available for stats polling";
}
}
void StatsService::shutdown() {
@ -73,27 +85,82 @@ int64_t StatsService::getUniqueViewerCount(const std::string& streamKey) {
}
void StatsService::pollOmeStats() {
LOG_INFO << "Polling OvenMediaEngine for active streams...";
// Get active streams from OME
OmeClient::getInstance().getActiveStreams([this](bool success, const Json::Value& json) {
if (success && json["response"].isArray()) {
for (const auto& stream : json["response"]) {
if (stream.isString()) {
updateStreamStats(stream.asString());
if (success && json.isMember("response")) {
LOG_INFO << "OME Active Streams Response: " << json["response"].toStyledString();
std::set<std::string> activeStreamKeys;
// Handle both array and object responses from OME
if (json["response"].isArray()) {
for (const auto& stream : json["response"]) {
if (stream.isString()) {
activeStreamKeys.insert(stream.asString());
}
}
} else if (json["response"].isMember("streams") && json["response"]["streams"].isArray()) {
for (const auto& stream : json["response"]["streams"]) {
if (stream.isString()) {
activeStreamKeys.insert(stream.asString());
} else if (stream.isMember("name")) {
activeStreamKeys.insert(stream["name"].asString());
}
}
}
LOG_INFO << "Found " << activeStreamKeys.size() << " active streams from OME";
// Update each active stream
for (const auto& streamKey : activeStreamKeys) {
LOG_INFO << "Processing active stream: " << streamKey;
// IMMEDIATELY update database to mark as live
auto dbClient = app().getDbClient();
*dbClient << "UPDATE realms SET is_live = true, viewer_count = 0, "
"updated_at = CURRENT_TIMESTAMP WHERE stream_key = $1"
<< streamKey
>> [streamKey](const orm::Result&) {
LOG_INFO << "Successfully marked realm as live: " << streamKey;
}
>> [streamKey](const orm::DrogonDbException& e) {
LOG_ERROR << "Failed to update realm live status: " << e.base().what();
};
// Then update detailed stats
updateStreamStats(streamKey);
}
// Mark all non-active streams as offline
auto dbClient = app().getDbClient();
*dbClient << "SELECT stream_key FROM realms WHERE is_live = true"
>> [activeStreamKeys](const orm::Result& r) {
auto db = app().getDbClient();
for (const auto& row : r) {
std::string key = row["stream_key"].as<std::string>();
if (activeStreamKeys.find(key) == activeStreamKeys.end()) {
LOG_INFO << "Marking realm as offline: " << key;
*db << "UPDATE realms SET is_live = false, viewer_count = 0, "
"updated_at = CURRENT_TIMESTAMP WHERE stream_key = $1"
<< key
>> [key](const orm::Result&) {
LOG_INFO << "Marked realm as offline: " << key;
}
>> [](const orm::DrogonDbException& e) {
LOG_ERROR << "Failed to mark realm offline: " << e.base().what();
};
}
}
}
>> [](const orm::DrogonDbException& e) {
LOG_ERROR << "Failed to query live realms: " << e.base().what();
};
} else {
LOG_ERROR << "Failed to get active streams from OME or empty response";
}
});
// Poll known stream keys from Redis
services::RedisHelper::instance().keysAsync("stream_key:*",
[this](const std::vector<std::string>& keys) {
for (const auto& key : keys) {
if (auto pos = key.find(':'); pos != std::string::npos) {
updateStreamStats(key.substr(pos + 1));
}
}
}
);
}
void StatsService::updateStreamStats(const std::string& streamKey) {
@ -107,7 +174,7 @@ void StatsService::updateStreamStats(const std::string& streamKey) {
// Update realm in database
updateRealmLiveStatus(streamKey, updatedStats);
// Only broadcast if stream has meaningful data
// Only broadcast if stream has meaningful data or is live
if (updatedStats.isLive || updatedStats.totalBytesIn > 0 || updatedStats.uniqueViewers > 0) {
Json::Value msg;
msg["type"] = "stats_update";
@ -140,11 +207,14 @@ void StatsService::updateStreamStats(const std::string& streamKey) {
void StatsService::updateRealmLiveStatus(const std::string& streamKey, const StreamStats& stats) {
auto dbClient = app().getDbClient();
// Cast to int32 to match PostgreSQL integer type
int32_t viewerCount = static_cast<int32_t>(stats.uniqueViewers);
// Update realm's live status and viewer count
*dbClient << "UPDATE realms SET is_live = $1, viewer_count = $2 WHERE stream_key = $3"
<< stats.isLive << stats.uniqueViewers << streamKey
*dbClient << "UPDATE realms SET is_live = $1, viewer_count = $2, updated_at = CURRENT_TIMESTAMP WHERE stream_key = $3"
<< stats.isLive << viewerCount << streamKey
>> [streamKey, stats](const orm::Result&) {
LOG_DEBUG << "Updated realm status for stream " << streamKey
LOG_INFO << "Updated realm status for stream " << streamKey
<< " - Live: " << stats.isLive
<< ", Viewers: " << stats.uniqueViewers;
}
@ -158,13 +228,16 @@ void StatsService::fetchStatsFromOme(const std::string& streamKey,
std::function<void(bool, const StreamStats&)> callback) {
LOG_DEBUG << "Fetching stats for stream: " << streamKey;
// First, try to get the stream stats
OmeClient::getInstance().getStreamStats(streamKey, [this, callback, streamKey](bool success, const Json::Value& json) {
StreamStats stats;
bool streamExists = false;
if (success && json.isMember("response") && !json["response"].isNull()) {
try {
const auto& data = json["response"];
streamExists = true;
// Parse connections
if (data.isMember("connections")) {
const auto& conns = data["connections"];
@ -186,54 +259,173 @@ void StatsService::fetchStatsFromOme(const std::string& streamKey,
stats.totalConnections = totalConns;
}
// Bitrate
stats.bitrate = data.isMember("lastThroughputIn") ?
data["lastThroughputIn"].asDouble() :
(data.isMember("avgThroughputIn") ? data["avgThroughputIn"].asDouble() : 0);
// Byte counters
if (data.isMember("totalBytesIn")) stats.totalBytesIn = data["totalBytesIn"].asInt64();
if (data.isMember("totalBytesOut")) stats.totalBytesOut = data["totalBytesOut"].asInt64();
stats.isLive = (stats.bitrate > 0 || stats.currentConnections > 0);
LOG_DEBUG << "OME stats response: " << json.toStyledString();
// Check multiple indicators for live status
bool hasInput = false;
// Check for input field
if (data.isMember("input") && !data["input"].isNull()) {
hasInput = true;
const auto& input = data["input"];
// Get bitrate from input tracks
if (input.isMember("tracks") && input["tracks"].isArray()) {
for (const auto& track : input["tracks"]) {
if (track["type"].asString() == "video" && track.isMember("bitrate")) {
stats.bitrate = track["bitrate"].asDouble();
}
}
}
}
// Alternative: Check lastThroughputIn
if (!hasInput && data.isMember("lastThroughputIn")) {
double throughput = data["lastThroughputIn"].asDouble();
if (throughput > 0) {
hasInput = true;
stats.bitrate = throughput;
}
}
// Alternative: Check avgThroughputIn
if (!hasInput && data.isMember("avgThroughputIn")) {
double avgThroughput = data["avgThroughputIn"].asDouble();
if (avgThroughput > 0) {
hasInput = true;
stats.bitrate = avgThroughput;
}
}
// Check bytes counters
if (data.isMember("totalBytesIn")) {
stats.totalBytesIn = data["totalBytesIn"].asInt64();
if (stats.totalBytesIn > 0) {
hasInput = true;
}
}
if (data.isMember("totalBytesOut")) {
stats.totalBytesOut = data["totalBytesOut"].asInt64();
}
// Stream is live if it has input or active bitrate
stats.isLive = hasInput || stats.bitrate > 0;
LOG_DEBUG << "Stream " << streamKey
<< " - hasInput: " << hasInput
<< ", bitrate: " << stats.bitrate
<< ", totalBytesIn: " << stats.totalBytesIn
<< ", isLive: " << stats.isLive;
} catch (const std::exception& e) {
LOG_ERROR << "Failed to parse stats: " << e.what();
stats.isLive = false;
}
} else {
// Stream doesn't exist in OME
stats.isLive = false;
LOG_DEBUG << "Stream " << streamKey << " not found in OME";
}
stats.lastUpdated = std::chrono::system_clock::now();
// Now fetch stream info for resolution/codec/fps
OmeClient::getInstance().getStreamInfo(streamKey, [callback, stats](bool infoSuccess, const Json::Value& infoJson) mutable {
// Parse stream metadata if available
if (infoSuccess && infoJson.isMember("response") && infoJson["response"].isMember("tracks")) {
try {
for (const auto& track : infoJson["response"]["tracks"]) {
if (track["type"].asString() == "video") {
if (track.isMember("codec")) {
stats.codec = track["codec"].asString();
// If stream exists, try to get detailed stream info
if (streamExists) {
OmeClient::getInstance().getStreamInfo(streamKey, [callback, stats](bool infoSuccess, const Json::Value& infoJson) mutable {
// Parse stream metadata if available
if (infoSuccess && infoJson.isMember("response")) {
try {
const auto& response = infoJson["response"];
LOG_DEBUG << "Stream info response: " << response.toStyledString();
// Check if stream has input (another way to verify it's live)
if (response.isMember("input") && !response["input"].isNull()) {
stats.isLive = true;
// Try to get codec from input tracks first
if (response["input"].isMember("tracks") && response["input"]["tracks"].isArray()) {
for (const auto& track : response["input"]["tracks"]) {
if (track["type"].asString() == "video") {
if (track.isMember("codec")) {
std::string codec = track["codec"].asString();
// Clean up codec string
if (codec == "H264" || codec == "h264") {
stats.codec = "H.264";
} else if (codec == "H265" || codec == "h265") {
stats.codec = "H.265";
} else if (codec == "VP8" || codec == "vp8") {
stats.codec = "VP8";
} else if (codec == "VP9" || codec == "vp9") {
stats.codec = "VP9";
} else {
stats.codec = codec;
}
}
if (track.isMember("width") && track.isMember("height")) {
stats.resolution = std::to_string(track["width"].asInt()) + "x" +
std::to_string(track["height"].asInt());
}
if (track.isMember("framerate")) {
stats.fps = track["framerate"].asDouble();
} else if (track.isMember("frameRate")) {
stats.fps = track["frameRate"].asDouble();
}
break;
}
}
}
if (track.isMember("width") && track.isMember("height")) {
stats.resolution = std::to_string(track["width"].asInt()) + "x" +
std::to_string(track["height"].asInt());
}
if (track.isMember("framerate")) {
stats.fps = track["framerate"].asDouble();
}
break;
}
// If no codec found in input, try output tracks
if (stats.codec.empty() || stats.codec == "N/A") {
if (response.isMember("tracks") && response["tracks"].isArray()) {
for (const auto& track : response["tracks"]) {
if (track["type"].asString() == "video") {
if (track.isMember("codec")) {
std::string codec = track["codec"].asString();
if (codec == "H264" || codec == "h264") {
stats.codec = "H.264";
} else if (codec == "H265" || codec == "h265") {
stats.codec = "H.265";
} else if (codec == "VP8" || codec == "vp8") {
stats.codec = "VP8";
} else if (codec == "VP9" || codec == "vp9") {
stats.codec = "VP9";
} else {
stats.codec = codec;
}
}
if (stats.resolution == "N/A" && track.isMember("width") && track.isMember("height")) {
stats.resolution = std::to_string(track["width"].asInt()) + "x" +
std::to_string(track["height"].asInt());
}
if (stats.fps == 0 && track.isMember("framerate")) {
stats.fps = track["framerate"].asDouble();
} else if (stats.fps == 0 && track.isMember("frameRate")) {
stats.fps = track["frameRate"].asDouble();
}
break;
}
}
}
}
// Set defaults if still empty
if (stats.codec.empty()) {
stats.codec = "Unknown";
}
} catch (const std::exception& e) {
LOG_ERROR << "Failed to parse stream info: " << e.what();
}
} catch (const std::exception& e) {
LOG_ERROR << "Failed to parse stream info: " << e.what();
}
}
callback(true, stats);
});
} else {
// Stream doesn't exist, return offline stats
callback(true, stats);
});
}
});
}
@ -287,7 +479,7 @@ void StatsService::getStreamStats(const std::string& streamKey,
fetchStatsFromOme(streamKey, [this, callback, streamKey](bool success, const StreamStats& stats) {
if (success) {
StreamStats updatedStats = stats;
// FIX: Set uniqueViewers on cache miss!
// Set uniqueViewers on cache miss
updatedStats.uniqueViewers = getUniqueViewerCount(streamKey);
callback(true, updatedStats);
} else {