#include "RestreamService.h" #include #include #include using namespace drogon; using namespace drogon::orm; // execCurl removed - using Drogon HttpClient instead for security std::string RestreamService::getBaseUrl() { const char* envUrl = std::getenv("OME_API_URL"); if (envUrl) { return std::string(envUrl); } return "http://ovenmediaengine:8081"; } std::string RestreamService::getApiToken() { const char* envToken = std::getenv("OME_API_TOKEN"); if (!envToken || strlen(envToken) == 0) { throw std::runtime_error("OME_API_TOKEN environment variable is not set"); } return std::string(envToken); } HttpClientPtr RestreamService::getClient() { return HttpClient::newHttpClient(getBaseUrl()); } HttpRequestPtr RestreamService::createRequest(HttpMethod method, const std::string& path) { auto request = HttpRequest::newHttpRequest(); request->setMethod(method); request->setPath(path); const auto token = getApiToken(); const auto b64 = drogon::utils::base64Encode(token); request->addHeader("Authorization", std::string("Basic ") + b64); return request; } HttpRequestPtr RestreamService::createJsonRequest(HttpMethod method, const std::string& path, const Json::Value& body) { auto request = HttpRequest::newHttpJsonRequest(body); request->setMethod(method); request->setPath(path); const auto token = getApiToken(); const auto b64 = drogon::utils::base64Encode(token); request->addHeader("Authorization", std::string("Basic ") + b64); return request; } std::string RestreamService::generatePushId(const std::string& streamKey, int64_t destinationId) { return "restream_" + streamKey + "_" + std::to_string(destinationId); } void RestreamService::updateDestinationStatus(int64_t destinationId, bool isConnected, const std::string& error) { auto dbClient = app().getDbClient(); if (isConnected) { *dbClient << "UPDATE restream_destinations SET is_connected = true, last_error = NULL, " "last_connected_at = CURRENT_TIMESTAMP WHERE id = $1" << destinationId >> [destinationId](const Result&) { LOG_INFO << "Restream destination " << destinationId << " connected"; } >> [destinationId](const DrogonDbException& e) { LOG_ERROR << "Failed to update restream destination " << destinationId << ": " << e.base().what(); }; } else { *dbClient << "UPDATE restream_destinations SET is_connected = false, last_error = $1 WHERE id = $2" << error << destinationId >> [destinationId](const Result&) { LOG_INFO << "Restream destination " << destinationId << " disconnected"; } >> [destinationId](const DrogonDbException& e) { LOG_ERROR << "Failed to update restream destination " << destinationId << ": " << e.base().what(); }; } } void RestreamService::startPush(const std::string& sourceStreamKey, const RestreamDestination& dest, std::function callback) { // Build the full destination URL with stream key std::string fullUrl = dest.rtmpUrl; if (!fullUrl.empty() && fullUrl.back() != '/') { fullUrl += '/'; } fullUrl += dest.streamKey; std::string pushId = generatePushId(sourceStreamKey, dest.id); auto destId = dest.id; LOG_INFO << "Starting RTMP push for stream " << sourceStreamKey << " to " << dest.name << " (" << dest.rtmpUrl << ")"; // Build JSON body Json::Value body; body["id"] = pushId; body["stream"]["name"] = sourceStreamKey; body["protocol"] = "rtmp"; body["url"] = fullUrl; // Use Drogon HttpClient instead of curl for security auto request = createJsonRequest(drogon::Post, "/v1/vhosts/default/apps/app:startPush", body); LOG_INFO << "Sending HTTP request for push start"; getClient()->sendRequest(request, [this, callback, pushId, sourceStreamKey, destId](ReqResult result, const HttpResponsePtr& response) { if (result != ReqResult::Ok || !response) { std::string error = "Failed to connect to OME API"; updateDestinationStatus(destId, false, error); callback(false, error); LOG_ERROR << "Failed to start RTMP push: " << error; return; } auto json = response->getJsonObject(); if (json) { int statusCode = (*json).get("statusCode", 0).asInt(); std::string message = (*json).get("message", "").asString(); // 200 = success, 400 with "Duplicate ID" = already running (treat as success) bool isSuccess = (statusCode == 200); bool isDuplicate = (statusCode == 400 && message.find("Duplicate") != std::string::npos); if (isSuccess || isDuplicate) { // Track the active push { std::lock_guard lock(pushMutex_); activePushes_[sourceStreamKey][destId] = pushId; } updateDestinationStatus(destId, true, ""); callback(true, ""); if (isDuplicate) { LOG_INFO << "RTMP push already active (duplicate ID): " << pushId; } else { LOG_INFO << "RTMP push started successfully: " << pushId; } return; } else { std::string error = (*json).get("message", "Unknown error").asString(); updateDestinationStatus(destId, false, error); callback(false, error); LOG_ERROR << "Failed to start RTMP push: " << error; return; } } std::string error = "Invalid response from OME API"; updateDestinationStatus(destId, false, error); callback(false, error); LOG_ERROR << "Failed to start RTMP push: " << error; }); } void RestreamService::stopPush(const std::string& sourceStreamKey, int64_t destinationId, std::function callback) { std::string pushId; { std::lock_guard lock(pushMutex_); auto streamIt = activePushes_.find(sourceStreamKey); if (streamIt != activePushes_.end()) { auto destIt = streamIt->second.find(destinationId); if (destIt != streamIt->second.end()) { pushId = destIt->second; } } } // If not tracked in memory, generate the push ID anyway and try to stop it // This handles cases where server restarted but push is still active on OME if (pushId.empty()) { pushId = generatePushId(sourceStreamKey, destinationId); } LOG_INFO << "Stopping RTMP push: " << pushId; // Build JSON body Json::Value body; body["id"] = pushId; // Use Drogon HttpClient instead of curl for security auto request = createJsonRequest(drogon::Post, "/v1/vhosts/default/apps/app:stopPush", body); LOG_INFO << "Sending HTTP request for push stop"; getClient()->sendRequest(request, [this, callback, pushId, sourceStreamKey, destinationId](ReqResult result, const HttpResponsePtr& response) { // Remove from tracking regardless of result { std::lock_guard lock(pushMutex_); auto streamIt = activePushes_.find(sourceStreamKey); if (streamIt != activePushes_.end()) { streamIt->second.erase(destinationId); if (streamIt->second.empty()) { activePushes_.erase(streamIt); } } } updateDestinationStatus(destinationId, false, ""); if (result == ReqResult::Ok && response) { auto json = response->getJsonObject(); if (json) { int statusCode = (*json).get("statusCode", 0).asInt(); if (statusCode == 200 || statusCode == 404) { callback(true); LOG_INFO << "RTMP push stopped: " << pushId; return; } } } // Even if API call failed, we've removed from tracking callback(true); LOG_WARN << "RTMP push stop may have failed, but removed from tracking: " << pushId; }); } void RestreamService::stopAllPushes(const std::string& sourceStreamKey, std::function callback) { std::vector destinationIds; { std::lock_guard lock(pushMutex_); auto streamIt = activePushes_.find(sourceStreamKey); if (streamIt != activePushes_.end()) { for (const auto& [destId, pushId] : streamIt->second) { destinationIds.push_back(destId); } } } if (destinationIds.empty()) { callback(true); return; } // Stop each push auto remaining = std::make_shared>(destinationIds.size()); auto allSuccess = std::make_shared>(true); for (int64_t destId : destinationIds) { stopPush(sourceStreamKey, destId, [remaining, allSuccess, callback](bool success) { if (!success) { allSuccess->store(false); } if (--(*remaining) == 0) { callback(allSuccess->load()); } }); } } void RestreamService::getPushStatus(const std::string& sourceStreamKey, int64_t destinationId, std::function callback) { std::string pushId; { std::lock_guard lock(pushMutex_); auto streamIt = activePushes_.find(sourceStreamKey); if (streamIt != activePushes_.end()) { auto destIt = streamIt->second.find(destinationId); if (destIt != streamIt->second.end()) { pushId = destIt->second; } } } if (pushId.empty()) { callback(true, false, "Not connected"); return; } // OME API: GET /v1/vhosts/{vhost}/apps/{app}/push std::string path = "/v1/vhosts/default/apps/app/push"; auto request = createRequest(Get, path); getClient()->sendRequest(request, [callback, pushId](ReqResult result, const HttpResponsePtr& response) { if (result == ReqResult::Ok && response && response->getStatusCode() == k200OK) { try { auto json = *response->getJsonObject(); // Look for our push in the response if (json.isMember("response") && json["response"].isArray()) { for (const auto& push : json["response"]) { if (push["id"].asString() == pushId) { std::string state = push.get("state", "unknown").asString(); bool connected = (state == "started" || state == "connected"); std::string error = push.get("error", "").asString(); callback(true, connected, error); return; } } } callback(true, false, "Push not found"); } catch (const std::exception& e) { callback(false, false, e.what()); } } else { callback(false, false, "Failed to get push status"); } }); } void RestreamService::startAllDestinations(const std::string& streamKey, int64_t realmId) { LOG_INFO << "Starting all restream destinations for realm " << realmId; auto dbClient = app().getDbClient(); *dbClient << "SELECT id, realm_id, name, rtmp_url, stream_key, enabled " "FROM restream_destinations WHERE realm_id = $1 AND enabled = true" << realmId >> [this, streamKey](const Result& r) { for (const auto& row : r) { RestreamDestination dest; dest.id = row["id"].as(); dest.realmId = row["realm_id"].as(); dest.name = row["name"].as(); dest.rtmpUrl = row["rtmp_url"].as(); dest.streamKey = row["stream_key"].as(); dest.enabled = row["enabled"].as(); startPush(streamKey, dest, [dest](bool success, const std::string& error) { if (!success) { LOG_ERROR << "Failed to start restream to " << dest.name << ": " << error; } }); } } >> [realmId](const DrogonDbException& e) { LOG_ERROR << "Failed to fetch restream destinations for realm " << realmId << ": " << e.base().what(); }; } void RestreamService::stopAllDestinations(const std::string& streamKey, int64_t realmId) { LOG_INFO << "Stopping all restream destinations for realm " << realmId; stopAllPushes(streamKey, [realmId](bool success) { if (!success) { LOG_WARN << "Some restream pushes may not have stopped cleanly for realm " << realmId; } }); // Also update all destinations in DB as disconnected auto dbClient = app().getDbClient(); *dbClient << "UPDATE restream_destinations SET is_connected = false WHERE realm_id = $1" << realmId >> [](const Result&) {} >> [realmId](const DrogonDbException& e) { LOG_ERROR << "Failed to update restream destinations for realm " << realmId << ": " << e.base().what(); }; } void RestreamService::attemptReconnections(const std::string& streamKey, int64_t realmId) { // Get all enabled but disconnected destinations and try to reconnect auto dbClient = app().getDbClient(); *dbClient << "SELECT id, realm_id, name, rtmp_url, stream_key, enabled, is_connected " "FROM restream_destinations " "WHERE realm_id = $1 AND enabled = true AND is_connected = false" << realmId >> [this, streamKey](const Result& r) { for (const auto& row : r) { RestreamDestination dest; dest.id = row["id"].as(); dest.realmId = row["realm_id"].as(); dest.name = row["name"].as(); dest.rtmpUrl = row["rtmp_url"].as(); dest.streamKey = row["stream_key"].as(); dest.enabled = row["enabled"].as(); LOG_INFO << "Attempting to reconnect restream destination: " << dest.name; startPush(streamKey, dest, [dest](bool success, const std::string& error) { if (success) { LOG_INFO << "Reconnected restream to " << dest.name; } else { LOG_WARN << "Reconnection failed for " << dest.name << ": " << error; } }); } } >> [realmId](const DrogonDbException& e) { LOG_ERROR << "Failed to fetch disconnected restream destinations for realm " << realmId << ": " << e.base().what(); }; }