#include "StreamController.h" #include "../services/DatabaseService.h" #include "../services/StatsService.h" #include "../services/RedisHelper.h" #include "../services/OmeClient.h" #include "../services/AuthService.h" #include "../services/RestreamService.h" #include "../common/HttpHelpers.h" #include "../common/AuthHelpers.h" #include #include #include #include #include #include using namespace drogon::orm; // Helper functions at the top namespace { HttpResponsePtr jsonOk(const Json::Value& data) { return jsonResp(data); } // Quick JSON builder for common patterns Json::Value json(std::initializer_list> items) { Json::Value j; for (const auto& [key, value] : items) { j[key] = value; } return j; } } // Static member definitions std::mutex StreamWebSocketController::connectionsMutex_; std::unordered_map> StreamWebSocketController::tokenConnections_; std::unordered_set StreamWebSocketController::connections_; void StreamController::health(const HttpRequestPtr &, std::function &&callback) { callback(jsonOk(json({ {"status", "ok"}, {"timestamp", Json::Int64(std::chrono::duration_cast( std::chrono::system_clock::now().time_since_epoch() ).count())} }))); } void StreamController::validateStreamKey(const HttpRequestPtr &, std::function &&callback, const std::string &key) { // Now validate against realms table auto dbClient = app().getDbClient(); *dbClient << "SELECT 1 FROM realms WHERE stream_key = $1 AND is_active = true" << key >> [callback, key](const Result &r) { bool valid = !r.empty(); if (valid) { // Store in Redis RedisHelper::storeKey("stream_key:" + key, "1", 86400); } callback(jsonOk(json({{"valid", valid}}))); } >> [callback](const DrogonDbException &e) { LOG_ERROR << "Database error: " << e.base().what(); callback(jsonOk(json({{"valid", false}}))); }; } void StreamController::disconnectStream(const HttpRequestPtr &req, std::function &&callback, const std::string &streamKey) { UserInfo user = getUserFromRequest(req); if (user.id == 0) { callback(jsonError("Unauthorized", k401Unauthorized)); return; } // Check if user owns this stream or is admin auto dbClient = app().getDbClient(); *dbClient << "SELECT user_id FROM realms WHERE stream_key = $1 AND is_active = true" << streamKey >> [user, callback, streamKey](const Result& r) { if (r.empty() || (r[0]["user_id"].as() != user.id && !user.isAdmin)) { callback(jsonError("Forbidden", k403Forbidden)); return; } OmeClient::getInstance().disconnectStream(streamKey, [callback](bool success) { if (success) { callback(jsonOk(json({ {"success", true}, {"message", "Stream disconnected"} }))); } else { callback(jsonError("Failed to disconnect stream")); } }); } >> DB_ERROR(callback, "disconnect stream"); } void StreamController::getStreamStats(const HttpRequestPtr &, std::function &&callback, const std::string &streamKey) { StatsService::getInstance().getStreamStats(streamKey, [callback](bool success, const StreamStats& stats) { if (success) { Json::Value json; json["success"] = true; auto& s = json["stats"]; s["connections"] = static_cast(stats.uniqueViewers); s["total_connections"] = static_cast(stats.totalConnections); s["bytes_in"] = static_cast(stats.totalBytesIn); s["bytes_out"] = static_cast(stats.totalBytesOut); s["bitrate"] = stats.bitrate; s["codec"] = stats.codec; s["resolution"] = stats.resolution; s["fps"] = stats.fps; s["is_live"] = stats.isLive; if (stats.totalBytesIn > 0) { s["data_rate_in"] = stats.bitrate / 1000.0; } if (stats.totalBytesOut > 0) { s["data_rate_out"] = stats.totalBytesOut / 1024.0 / 1024.0; } // Protocol breakdown auto& pc = s["protocol_connections"]; pc["webrtc"] = static_cast(stats.protocolConnections.webrtc); pc["hls"] = static_cast(stats.protocolConnections.hls); pc["llhls"] = static_cast(stats.protocolConnections.llhls); pc["dash"] = static_cast(stats.protocolConnections.dash); callback(jsonResp(json)); } else { callback(jsonError("Failed to retrieve stream stats")); } }); } void StreamController::getActiveStreams(const HttpRequestPtr &, std::function &&callback) { OmeClient::getInstance().getActiveStreams([callback](bool success, const Json::Value& omeResponse) { if (success) { LOG_INFO << "Active streams: " << omeResponse["response"].toStyledString(); callback(jsonOk(json({ {"success", true}, {"streams", omeResponse["response"]} }))); } else { callback(jsonError("Failed to get active streams from OME")); } }); } void StreamController::issueViewerToken(const HttpRequestPtr &, std::function &&callback, const std::string &streamKey) { // Validate against realms auto dbClient = app().getDbClient(); *dbClient << "SELECT 1 FROM realms WHERE stream_key = $1 AND is_active = true" << streamKey >> [callback, streamKey](const Result& r) { if (r.empty()) { callback(jsonResp({}, k404NotFound)); return; } auto bytes = drogon::utils::genRandomString(32); std::string token = drogon::utils::base64Encode( (const unsigned char*)bytes.data(), bytes.length() ); RedisHelper::storeKeyAsync("viewer_token:" + token, streamKey, 30, [callback, token](bool stored) { if (!stored) { callback(jsonResp({}, k500InternalServerError)); return; } auto resp = HttpResponse::newHttpResponse(); Cookie cookie("viewer_token", token); cookie.setPath("/"); cookie.setHttpOnly(true); cookie.setSecure(false); cookie.setMaxAge(300); resp->addCookie(cookie); Json::Value body; body["success"] = true; body["viewer_token"] = token; body["expires_in"] = 30; resp->setContentTypeCode(CT_APPLICATION_JSON); resp->setBody(Json::FastWriter().write(body)); callback(resp); } ); } >> [callback](const DrogonDbException& e) { LOG_ERROR << "Database error: " << e.base().what(); callback(jsonResp({}, k500InternalServerError)); }; } void StreamController::heartbeat(const HttpRequestPtr &req, std::function &&callback, const std::string &streamKey) { auto token = req->getCookie("viewer_token"); if (token.empty()) { callback(jsonResp({}, k403Forbidden)); return; } RedisHelper::getKeyAsync("viewer_token:" + token, [callback, streamKey, token](const std::string& storedStreamKey) { if (storedStreamKey != streamKey) { callback(jsonResp({}, k403Forbidden)); return; } // Refresh token TTL to 5 minutes on heartbeat services::RedisHelper::instance().expireAsync("viewer_token:" + token, 300, [callback](bool success) { if (!success) { callback(jsonResp({}, k500InternalServerError)); return; } callback(jsonOk(json({ {"success", true}, {"renewed", true} }))); } ); } ); } // WebSocket implementation void StreamWebSocketController::handleNewMessage(const WebSocketConnectionPtr&, std::string &&message, const WebSocketMessageType &type) { if (type == WebSocketMessageType::Text) { Json::Value msg; Json::Reader reader; if (reader.parse(message, msg) && msg["type"].asString() == "subscribe") { LOG_INFO << "Client subscribed to stream updates"; } } } void StreamWebSocketController::handleNewConnection(const HttpRequestPtr &req, const WebSocketConnectionPtr& wsConnPtr) { LOG_INFO << "New WebSocket connection established"; // Allow anonymous connections for receiving public broadcasts (stream_live/stream_offline) // These are used by the home page to get instant updates std::lock_guard lock(connectionsMutex_); connections_.insert(wsConnPtr); auto token = req->getCookie("viewer_token"); if (!token.empty()) { // If viewer token is provided, validate and track it RedisHelper::getKeyAsync("viewer_token:" + token, [wsConnPtr, token](const std::string& streamKey) { if (!streamKey.empty()) { std::lock_guard lock(connectionsMutex_); tokenConnections_[token].insert(wsConnPtr); LOG_INFO << "WebSocket authenticated for stream: " << streamKey; } else { LOG_DEBUG << "WebSocket with invalid/expired viewer token - treating as anonymous"; } } ); } else { LOG_DEBUG << "Anonymous WebSocket connection (no viewer token)"; } } void StreamWebSocketController::handleConnectionClosed(const WebSocketConnectionPtr& wsConnPtr) { LOG_INFO << "WebSocket connection closed"; std::lock_guard lock(connectionsMutex_); std::string tokenToDelete; for (auto& [token, conns] : tokenConnections_) { if (conns.erase(wsConnPtr)) { if (conns.empty()) { tokenToDelete = token; } break; } } connections_.erase(wsConnPtr); if (!tokenToDelete.empty()) { tokenConnections_.erase(tokenToDelete); RedisHelper::deleteKeyAsync("viewer_token:" + tokenToDelete, [tokenToDelete](bool success) { if (success) { LOG_INFO << "Deleted viewer token on disconnect: " << tokenToDelete; } else { LOG_WARN << "Failed to delete viewer token: " << tokenToDelete; } } ); } } void StreamWebSocketController::broadcastKeyUpdate(const std::string& userId, const std::string& newKey) { Json::Value msg; msg["type"] = "key_regenerated"; msg["user_id"] = userId; msg["stream_key"] = newKey; auto msgStr = Json::FastWriter().write(msg); std::lock_guard lock(connectionsMutex_); for (const auto& conn : connections_) { if (conn->connected()) { conn->send(msgStr); } } } void StreamWebSocketController::broadcastStatsUpdate(const Json::Value& stats) { std::string jsonStr = Json::FastWriter().write(stats); std::lock_guard lock(connectionsMutex_); for (const auto& conn : connections_) { if (conn->connected()) { conn->send(jsonStr); } } } // OvenMediaEngine Webhook Handlers void StreamController::handleOmeWebhook(const HttpRequestPtr &req, std::function &&callback) { auto jsonPtr = req->getJsonObject(); if (!jsonPtr) { LOG_WARN << "OME webhook received with invalid JSON"; callback(jsonError("Invalid JSON", k400BadRequest)); return; } const auto& payload = *jsonPtr; std::string eventType = payload.get("eventType", "").asString(); LOG_INFO << "OME Webhook received: " << eventType; LOG_DEBUG << "OME Webhook payload: " << payload.toStyledString(); // Extract stream information std::string streamName; if (payload.isMember("stream") && payload["stream"].isMember("name")) { streamName = payload["stream"]["name"].asString(); } else if (payload.isMember("streamName")) { streamName = payload["streamName"].asString(); } if (streamName.empty()) { LOG_WARN << "OME webhook missing stream name"; callback(jsonOk(json({{"success", true}, {"message", "Acknowledged"}}))); return; } auto dbClient = app().getDbClient(); if (eventType == "streamCreated" || eventType == "stream.created" || eventType == "publish") { // Stream started - mark realm as live immediately LOG_INFO << "Stream started via webhook: " << streamName; *dbClient << "UPDATE realms SET is_live = true, viewer_count = 0, " "updated_at = CURRENT_TIMESTAMP WHERE stream_key = $1 RETURNING id" << streamName >> [streamName](const Result& r) { LOG_INFO << "Realm marked as live via webhook: " << streamName; // Broadcast to WebSocket clients Json::Value msg; msg["type"] = "stream_live"; msg["stream_key"] = streamName; msg["is_live"] = true; StreamWebSocketController::broadcastStatsUpdate(msg); // Trigger immediate stats fetch StatsService::getInstance().updateStreamStats(streamName); // Pre-warm thumbnail cache so it's ready when users see the stream // This makes an async request to generate the thumbnail in the background auto client = HttpClient::newHttpClient("http://localhost:8088"); auto req = HttpRequest::newHttpRequest(); req->setPath("/thumb/" + streamName + ".webp"); req->setMethod(drogon::Get); client->sendRequest(req, [streamName](ReqResult result, const HttpResponsePtr& response) { if (result == ReqResult::Ok && response && response->statusCode() == k200OK) { LOG_INFO << "Thumbnail pre-warmed for stream: " << streamName; } else { LOG_DEBUG << "Thumbnail pre-warm pending for: " << streamName << " (stream may still be initializing)"; } }, 10.0); // 10 second timeout for thumbnail generation // Start restream destinations if realm has any if (!r.empty()) { int64_t realmId = r[0]["id"].as(); RestreamService::getInstance().startAllDestinations(streamName, realmId); } } >> [streamName](const DrogonDbException& e) { LOG_ERROR << "Failed to mark realm live via webhook: " << e.base().what(); }; } else if (eventType == "streamDeleted" || eventType == "stream.deleted" || eventType == "unpublish") { // Stream ended - mark realm as offline immediately LOG_INFO << "Stream ended via webhook: " << streamName; *dbClient << "UPDATE realms SET is_live = false, viewer_count = 0, " "updated_at = CURRENT_TIMESTAMP WHERE stream_key = $1 RETURNING id" << streamName >> [streamName](const Result& r) { LOG_INFO << "Realm marked as offline via webhook: " << streamName; // Broadcast to WebSocket clients Json::Value msg; msg["type"] = "stream_offline"; msg["stream_key"] = streamName; msg["is_live"] = false; StreamWebSocketController::broadcastStatsUpdate(msg); // Stop all restream destinations if (!r.empty()) { int64_t realmId = r[0]["id"].as(); RestreamService::getInstance().stopAllDestinations(streamName, realmId); } } >> [streamName](const DrogonDbException& e) { LOG_ERROR << "Failed to mark realm offline via webhook: " << e.base().what(); }; } else if (eventType == "sessionCreated" || eventType == "viewer.connected") { // Viewer connected LOG_INFO << "Viewer connected to stream: " << streamName; StatsService::getInstance().updateStreamStats(streamName); } else if (eventType == "sessionDeleted" || eventType == "viewer.disconnected") { // Viewer disconnected LOG_INFO << "Viewer disconnected from stream: " << streamName; StatsService::getInstance().updateStreamStats(streamName); } // Always respond with success to acknowledge the webhook callback(jsonOk(json({{"success", true}, {"message", "Webhook processed"}}))); } void StreamController::handleOmeAdmission(const HttpRequestPtr &req, std::function &&callback) { // Admission webhook - validates if a stream is allowed to publish/play // OME sends: { "client": {...}, "request": { "direction", "protocol", "status", "url", ... } } auto jsonPtr = req->getJsonObject(); if (!jsonPtr) { LOG_WARN << "OME admission webhook received with invalid JSON"; callback(jsonError("Invalid JSON", k400BadRequest)); return; } const auto& payload = *jsonPtr; LOG_INFO << "OME Admission webhook: " << payload.toStyledString(); // Check if this is a "closing" status - just acknowledge it if (payload.isMember("request") && payload["request"].isMember("status")) { std::string status = payload["request"]["status"].asString(); if (status == "closing") { LOG_INFO << "OME admission closing notification"; Json::Value response; callback(jsonOk(response)); // Empty response for closing return; } } // Extract stream key from URL: rtmp://host:port/app/STREAM_KEY or similar std::string streamKey; if (payload.isMember("request") && payload["request"].isMember("url")) { std::string url = payload["request"]["url"].asString(); // URL format: scheme://host[:port]/app/stream_key[/file][?query] // Find the stream key after /app/ size_t appPos = url.find("/app/"); if (appPos != std::string::npos) { std::string afterApp = url.substr(appPos + 5); // Skip "/app/" // Remove any trailing path or query string size_t endPos = afterApp.find_first_of("/?"); if (endPos != std::string::npos) { streamKey = afterApp.substr(0, endPos); } else { streamKey = afterApp; } } LOG_INFO << "Extracted stream key from URL: " << streamKey << " (URL: " << url << ")"; } if (streamKey.empty()) { LOG_WARN << "OME admission webhook: could not extract stream key, allowing by default"; Json::Value response; response["allowed"] = true; callback(jsonOk(response)); return; } // Check direction - only validate "incoming" (publish) requests std::string direction; if (payload.isMember("request") && payload["request"].isMember("direction")) { direction = payload["request"]["direction"].asString(); } if (direction == "outgoing") { // Playback request - allow all for now (could add viewer auth later) LOG_INFO << "Allowing outgoing (playback) request for: " << streamKey; Json::Value response; response["allowed"] = true; callback(jsonOk(response)); return; } // Validate stream key against database for incoming (publish) requests auto dbClient = app().getDbClient(); *dbClient << "SELECT id FROM realms WHERE stream_key = $1 AND is_active = true" << streamKey >> [callback, streamKey](const Result& r) { Json::Value response; if (!r.empty()) { LOG_INFO << "Stream key validated for admission: " << streamKey; response["allowed"] = true; // Mark stream as live immediately when publishing is approved int64_t realmId = r[0]["id"].as(); auto db = app().getDbClient(); *db << "UPDATE realms SET is_live = true, viewer_count = 0, " "updated_at = CURRENT_TIMESTAMP WHERE id = $1" << realmId >> [streamKey, realmId](const Result&) { LOG_INFO << "Realm marked live on admission: " << streamKey; // Broadcast to WebSocket clients Json::Value msg; msg["type"] = "stream_live"; msg["stream_key"] = streamKey; msg["is_live"] = true; StreamWebSocketController::broadcastStatsUpdate(msg); // Trigger stats fetch StatsService::getInstance().updateStreamStats(streamKey); // Start restream destinations RestreamService::getInstance().startAllDestinations(streamKey, realmId); } >> [streamKey](const DrogonDbException& e) { LOG_ERROR << "Failed to mark realm live on admission: " << e.base().what(); }; } else { LOG_WARN << "Invalid stream key rejected: " << streamKey; response["allowed"] = false; response["reason"] = "Invalid or inactive stream key"; } callback(jsonOk(response)); } >> [callback, streamKey](const DrogonDbException& e) { LOG_ERROR << "Database error during admission check: " << e.base().what(); // Allow on DB error to prevent blocking legitimate streams Json::Value response; response["allowed"] = true; callback(jsonOk(response)); }; }