583 lines
No EOL
25 KiB
C++
583 lines
No EOL
25 KiB
C++
#include "StreamController.h"
|
|
#include "../services/DatabaseService.h"
|
|
#include "../services/StatsService.h"
|
|
#include "../services/RedisHelper.h"
|
|
#include "../services/OmeClient.h"
|
|
#include "../services/AuthService.h"
|
|
#include "../services/RestreamService.h"
|
|
#include "../common/HttpHelpers.h"
|
|
#include "../common/AuthHelpers.h"
|
|
#include <drogon/utils/Utilities.h>
|
|
#include <drogon/Cookie.h>
|
|
#include <random>
|
|
#include <sstream>
|
|
#include <iomanip>
|
|
#include <chrono>
|
|
|
|
using namespace drogon::orm;
|
|
|
|
// Helper functions at the top
|
|
namespace {
|
|
HttpResponsePtr jsonOk(const Json::Value& data) {
|
|
return jsonResp(data);
|
|
}
|
|
|
|
// Quick JSON builder for common patterns
|
|
Json::Value json(std::initializer_list<std::pair<const char*, Json::Value>> items) {
|
|
Json::Value j;
|
|
for (const auto& [key, value] : items) {
|
|
j[key] = value;
|
|
}
|
|
return j;
|
|
}
|
|
}
|
|
|
|
// Static member definitions
|
|
std::mutex StreamWebSocketController::connectionsMutex_;
|
|
std::unordered_map<std::string, std::unordered_set<WebSocketConnectionPtr>> StreamWebSocketController::tokenConnections_;
|
|
std::unordered_set<WebSocketConnectionPtr> StreamWebSocketController::connections_;
|
|
|
|
void StreamController::health(const HttpRequestPtr &,
|
|
std::function<void(const HttpResponsePtr &)> &&callback) {
|
|
callback(jsonOk(json({
|
|
{"status", "ok"},
|
|
{"timestamp", Json::Int64(std::chrono::duration_cast<std::chrono::milliseconds>(
|
|
std::chrono::system_clock::now().time_since_epoch()
|
|
).count())}
|
|
})));
|
|
}
|
|
|
|
void StreamController::validateStreamKey(const HttpRequestPtr &,
|
|
std::function<void(const HttpResponsePtr &)> &&callback,
|
|
const std::string &key) {
|
|
// Now validate against realms table
|
|
auto dbClient = app().getDbClient();
|
|
*dbClient << "SELECT 1 FROM realms WHERE stream_key = $1 AND is_active = true"
|
|
<< key
|
|
>> [callback, key](const Result &r) {
|
|
bool valid = !r.empty();
|
|
if (valid) {
|
|
// Store in Redis
|
|
RedisHelper::storeKey("stream_key:" + key, "1", 86400);
|
|
}
|
|
callback(jsonOk(json({{"valid", valid}})));
|
|
}
|
|
>> [callback](const DrogonDbException &e) {
|
|
LOG_ERROR << "Database error: " << e.base().what();
|
|
callback(jsonOk(json({{"valid", false}})));
|
|
};
|
|
}
|
|
|
|
void StreamController::disconnectStream(const HttpRequestPtr &req,
|
|
std::function<void(const HttpResponsePtr &)> &&callback,
|
|
const std::string &streamKey) {
|
|
UserInfo user = getUserFromRequest(req);
|
|
if (user.id == 0) {
|
|
callback(jsonError("Unauthorized", k401Unauthorized));
|
|
return;
|
|
}
|
|
|
|
// Check if user owns this stream or is admin
|
|
auto dbClient = app().getDbClient();
|
|
*dbClient << "SELECT user_id FROM realms WHERE stream_key = $1 AND is_active = true"
|
|
<< streamKey
|
|
>> [user, callback, streamKey](const Result& r) {
|
|
if (r.empty() || (r[0]["user_id"].as<int64_t>() != user.id && !user.isAdmin)) {
|
|
callback(jsonError("Forbidden", k403Forbidden));
|
|
return;
|
|
}
|
|
|
|
OmeClient::getInstance().disconnectStream(streamKey, [callback](bool success) {
|
|
if (success) {
|
|
callback(jsonOk(json({
|
|
{"success", true},
|
|
{"message", "Stream disconnected"}
|
|
})));
|
|
} else {
|
|
callback(jsonError("Failed to disconnect stream"));
|
|
}
|
|
});
|
|
}
|
|
>> DB_ERROR(callback, "disconnect stream");
|
|
}
|
|
|
|
void StreamController::getStreamStats(const HttpRequestPtr &,
|
|
std::function<void(const HttpResponsePtr &)> &&callback,
|
|
const std::string &streamKey) {
|
|
StatsService::getInstance().getStreamStats(streamKey,
|
|
[callback](bool success, const StreamStats& stats) {
|
|
if (success) {
|
|
Json::Value json;
|
|
json["success"] = true;
|
|
|
|
auto& s = json["stats"];
|
|
s["connections"] = static_cast<Json::Int64>(stats.uniqueViewers);
|
|
s["total_connections"] = static_cast<Json::Int64>(stats.totalConnections);
|
|
s["bytes_in"] = static_cast<Json::Int64>(stats.totalBytesIn);
|
|
s["bytes_out"] = static_cast<Json::Int64>(stats.totalBytesOut);
|
|
s["bitrate"] = stats.bitrate;
|
|
s["codec"] = stats.codec;
|
|
s["resolution"] = stats.resolution;
|
|
s["fps"] = stats.fps;
|
|
s["is_live"] = stats.isLive;
|
|
|
|
if (stats.totalBytesIn > 0) {
|
|
s["data_rate_in"] = stats.bitrate / 1000.0;
|
|
}
|
|
if (stats.totalBytesOut > 0) {
|
|
s["data_rate_out"] = stats.totalBytesOut / 1024.0 / 1024.0;
|
|
}
|
|
|
|
// Protocol breakdown
|
|
auto& pc = s["protocol_connections"];
|
|
pc["webrtc"] = static_cast<Json::Int64>(stats.protocolConnections.webrtc);
|
|
pc["hls"] = static_cast<Json::Int64>(stats.protocolConnections.hls);
|
|
pc["llhls"] = static_cast<Json::Int64>(stats.protocolConnections.llhls);
|
|
pc["dash"] = static_cast<Json::Int64>(stats.protocolConnections.dash);
|
|
|
|
callback(jsonResp(json));
|
|
} else {
|
|
callback(jsonError("Failed to retrieve stream stats"));
|
|
}
|
|
});
|
|
}
|
|
|
|
void StreamController::getActiveStreams(const HttpRequestPtr &,
|
|
std::function<void(const HttpResponsePtr &)> &&callback) {
|
|
OmeClient::getInstance().getActiveStreams([callback](bool success, const Json::Value& omeResponse) {
|
|
if (success) {
|
|
LOG_INFO << "Active streams: " << omeResponse["response"].toStyledString();
|
|
callback(jsonOk(json({
|
|
{"success", true},
|
|
{"streams", omeResponse["response"]}
|
|
})));
|
|
} else {
|
|
callback(jsonError("Failed to get active streams from OME"));
|
|
}
|
|
});
|
|
}
|
|
|
|
void StreamController::issueViewerToken(const HttpRequestPtr &,
|
|
std::function<void(const HttpResponsePtr &)> &&callback,
|
|
const std::string &streamKey) {
|
|
// Validate against realms
|
|
auto dbClient = app().getDbClient();
|
|
*dbClient << "SELECT 1 FROM realms WHERE stream_key = $1 AND is_active = true"
|
|
<< streamKey
|
|
>> [callback, streamKey](const Result& r) {
|
|
if (r.empty()) {
|
|
callback(jsonResp({}, k404NotFound));
|
|
return;
|
|
}
|
|
|
|
auto bytes = drogon::utils::genRandomString(32);
|
|
std::string token = drogon::utils::base64Encode(
|
|
(const unsigned char*)bytes.data(), bytes.length()
|
|
);
|
|
|
|
RedisHelper::storeKeyAsync("viewer_token:" + token, streamKey, 30,
|
|
[callback, token](bool stored) {
|
|
if (!stored) {
|
|
callback(jsonResp({}, k500InternalServerError));
|
|
return;
|
|
}
|
|
|
|
auto resp = HttpResponse::newHttpResponse();
|
|
|
|
Cookie cookie("viewer_token", token);
|
|
cookie.setPath("/");
|
|
cookie.setHttpOnly(true);
|
|
cookie.setSecure(false);
|
|
cookie.setMaxAge(300);
|
|
resp->addCookie(cookie);
|
|
|
|
Json::Value body;
|
|
body["success"] = true;
|
|
body["viewer_token"] = token;
|
|
body["expires_in"] = 30;
|
|
|
|
resp->setContentTypeCode(CT_APPLICATION_JSON);
|
|
resp->setBody(Json::FastWriter().write(body));
|
|
|
|
callback(resp);
|
|
}
|
|
);
|
|
}
|
|
>> [callback](const DrogonDbException& e) {
|
|
LOG_ERROR << "Database error: " << e.base().what();
|
|
callback(jsonResp({}, k500InternalServerError));
|
|
};
|
|
}
|
|
|
|
void StreamController::heartbeat(const HttpRequestPtr &req,
|
|
std::function<void(const HttpResponsePtr &)> &&callback,
|
|
const std::string &realmId,
|
|
const std::string &streamKey) {
|
|
// Use realm-specific cookie to support multi-stream viewing
|
|
auto token = req->getCookie("viewer_token_" + realmId);
|
|
if (token.empty()) {
|
|
callback(jsonResp({}, k403Forbidden));
|
|
return;
|
|
}
|
|
|
|
RedisHelper::getKeyAsync("viewer_token:" + token,
|
|
[callback, streamKey, token](const std::string& storedStreamKey) {
|
|
if (storedStreamKey != streamKey) {
|
|
callback(jsonResp({}, k403Forbidden));
|
|
return;
|
|
}
|
|
|
|
// Refresh token TTL to 5 minutes on heartbeat
|
|
services::RedisHelper::instance().expireAsync("viewer_token:" + token, 300,
|
|
[callback](bool success) {
|
|
if (!success) {
|
|
callback(jsonResp({}, k500InternalServerError));
|
|
return;
|
|
}
|
|
|
|
callback(jsonOk(json({
|
|
{"success", true},
|
|
{"renewed", true}
|
|
})));
|
|
}
|
|
);
|
|
}
|
|
);
|
|
}
|
|
|
|
// WebSocket implementation
|
|
void StreamWebSocketController::handleNewMessage(const WebSocketConnectionPtr&,
|
|
std::string &&message,
|
|
const WebSocketMessageType &type) {
|
|
if (type == WebSocketMessageType::Text) {
|
|
Json::Value msg;
|
|
Json::Reader reader;
|
|
if (reader.parse(message, msg) && msg["type"].asString() == "subscribe") {
|
|
LOG_INFO << "Client subscribed to stream updates";
|
|
}
|
|
}
|
|
}
|
|
|
|
void StreamWebSocketController::handleNewConnection(const HttpRequestPtr &req,
|
|
const WebSocketConnectionPtr& wsConnPtr) {
|
|
LOG_INFO << "New WebSocket connection established";
|
|
|
|
// Allow anonymous connections for receiving public broadcasts (stream_live/stream_offline)
|
|
// These are used by the home page to get instant updates
|
|
std::lock_guard<std::mutex> lock(connectionsMutex_);
|
|
connections_.insert(wsConnPtr);
|
|
|
|
auto token = req->getCookie("viewer_token");
|
|
if (!token.empty()) {
|
|
// If viewer token is provided, validate and track it
|
|
RedisHelper::getKeyAsync("viewer_token:" + token,
|
|
[wsConnPtr, token](const std::string& streamKey) {
|
|
if (!streamKey.empty()) {
|
|
std::lock_guard<std::mutex> lock(connectionsMutex_);
|
|
tokenConnections_[token].insert(wsConnPtr);
|
|
LOG_INFO << "WebSocket authenticated for stream: " << streamKey;
|
|
} else {
|
|
LOG_DEBUG << "WebSocket with invalid/expired viewer token - treating as anonymous";
|
|
}
|
|
}
|
|
);
|
|
} else {
|
|
LOG_DEBUG << "Anonymous WebSocket connection (no viewer token)";
|
|
}
|
|
}
|
|
|
|
void StreamWebSocketController::handleConnectionClosed(const WebSocketConnectionPtr& wsConnPtr) {
|
|
LOG_INFO << "WebSocket connection closed";
|
|
|
|
std::lock_guard<std::mutex> lock(connectionsMutex_);
|
|
|
|
std::string tokenToDelete;
|
|
for (auto& [token, conns] : tokenConnections_) {
|
|
if (conns.erase(wsConnPtr)) {
|
|
if (conns.empty()) {
|
|
tokenToDelete = token;
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
connections_.erase(wsConnPtr);
|
|
|
|
if (!tokenToDelete.empty()) {
|
|
tokenConnections_.erase(tokenToDelete);
|
|
|
|
RedisHelper::deleteKeyAsync("viewer_token:" + tokenToDelete,
|
|
[tokenToDelete](bool success) {
|
|
if (success) {
|
|
LOG_INFO << "Deleted viewer token on disconnect: " << tokenToDelete;
|
|
} else {
|
|
LOG_WARN << "Failed to delete viewer token: " << tokenToDelete;
|
|
}
|
|
}
|
|
);
|
|
}
|
|
}
|
|
|
|
void StreamWebSocketController::broadcastKeyUpdate(const std::string& userId, const std::string& newKey) {
|
|
Json::Value msg;
|
|
msg["type"] = "key_regenerated";
|
|
msg["user_id"] = userId;
|
|
msg["stream_key"] = newKey;
|
|
|
|
auto msgStr = Json::FastWriter().write(msg);
|
|
|
|
std::lock_guard<std::mutex> lock(connectionsMutex_);
|
|
for (const auto& conn : connections_) {
|
|
if (conn->connected()) {
|
|
conn->send(msgStr);
|
|
}
|
|
}
|
|
}
|
|
|
|
void StreamWebSocketController::broadcastStatsUpdate(const Json::Value& stats) {
|
|
std::string jsonStr = Json::FastWriter().write(stats);
|
|
|
|
std::lock_guard<std::mutex> lock(connectionsMutex_);
|
|
for (const auto& conn : connections_) {
|
|
if (conn->connected()) {
|
|
conn->send(jsonStr);
|
|
}
|
|
}
|
|
}
|
|
|
|
// OvenMediaEngine Webhook Handlers
|
|
void StreamController::handleOmeWebhook(const HttpRequestPtr &req,
|
|
std::function<void(const HttpResponsePtr &)> &&callback) {
|
|
auto jsonPtr = req->getJsonObject();
|
|
if (!jsonPtr) {
|
|
LOG_WARN << "OME webhook received with invalid JSON";
|
|
callback(jsonError("Invalid JSON", k400BadRequest));
|
|
return;
|
|
}
|
|
|
|
const auto& payload = *jsonPtr;
|
|
std::string eventType = payload.get("eventType", "").asString();
|
|
|
|
LOG_INFO << "OME Webhook received: " << eventType;
|
|
LOG_DEBUG << "OME Webhook payload: " << payload.toStyledString();
|
|
|
|
// Extract stream information
|
|
std::string streamName;
|
|
if (payload.isMember("stream") && payload["stream"].isMember("name")) {
|
|
streamName = payload["stream"]["name"].asString();
|
|
} else if (payload.isMember("streamName")) {
|
|
streamName = payload["streamName"].asString();
|
|
}
|
|
|
|
if (streamName.empty()) {
|
|
LOG_WARN << "OME webhook missing stream name";
|
|
callback(jsonOk(json({{"success", true}, {"message", "Acknowledged"}})));
|
|
return;
|
|
}
|
|
|
|
auto dbClient = app().getDbClient();
|
|
|
|
if (eventType == "streamCreated" || eventType == "stream.created" || eventType == "publish") {
|
|
// Stream started - mark realm as live immediately
|
|
LOG_INFO << "Stream started via webhook: " << streamName;
|
|
|
|
*dbClient << "UPDATE realms SET is_live = true, viewer_count = 0, "
|
|
"updated_at = CURRENT_TIMESTAMP WHERE stream_key = $1 RETURNING id"
|
|
<< streamName
|
|
>> [streamName](const Result& r) {
|
|
LOG_INFO << "Realm marked as live via webhook: " << streamName;
|
|
|
|
// Broadcast to WebSocket clients
|
|
Json::Value msg;
|
|
msg["type"] = "stream_live";
|
|
msg["stream_key"] = streamName;
|
|
msg["is_live"] = true;
|
|
StreamWebSocketController::broadcastStatsUpdate(msg);
|
|
|
|
// Trigger immediate stats fetch
|
|
StatsService::getInstance().updateStreamStats(streamName);
|
|
|
|
// Pre-warm thumbnail cache so it's ready when users see the stream
|
|
// This makes an async request to generate the thumbnail in the background
|
|
auto client = HttpClient::newHttpClient("http://localhost:8088");
|
|
auto req = HttpRequest::newHttpRequest();
|
|
req->setPath("/thumb/" + streamName + ".webp");
|
|
req->setMethod(drogon::Get);
|
|
client->sendRequest(req, [streamName](ReqResult result, const HttpResponsePtr& response) {
|
|
if (result == ReqResult::Ok && response && response->statusCode() == k200OK) {
|
|
LOG_INFO << "Thumbnail pre-warmed for stream: " << streamName;
|
|
} else {
|
|
LOG_DEBUG << "Thumbnail pre-warm pending for: " << streamName << " (stream may still be initializing)";
|
|
}
|
|
}, 10.0); // 10 second timeout for thumbnail generation
|
|
|
|
// Start restream destinations if realm has any
|
|
if (!r.empty()) {
|
|
int64_t realmId = r[0]["id"].as<int64_t>();
|
|
RestreamService::getInstance().startAllDestinations(streamName, realmId);
|
|
}
|
|
}
|
|
>> [streamName](const DrogonDbException& e) {
|
|
LOG_ERROR << "Failed to mark realm live via webhook: " << e.base().what();
|
|
};
|
|
}
|
|
else if (eventType == "streamDeleted" || eventType == "stream.deleted" || eventType == "unpublish") {
|
|
// Stream ended - mark realm as offline immediately
|
|
LOG_INFO << "Stream ended via webhook: " << streamName;
|
|
|
|
*dbClient << "UPDATE realms SET is_live = false, viewer_count = 0, "
|
|
"updated_at = CURRENT_TIMESTAMP WHERE stream_key = $1 RETURNING id"
|
|
<< streamName
|
|
>> [streamName](const Result& r) {
|
|
LOG_INFO << "Realm marked as offline via webhook: " << streamName;
|
|
|
|
// Broadcast to WebSocket clients
|
|
Json::Value msg;
|
|
msg["type"] = "stream_offline";
|
|
msg["stream_key"] = streamName;
|
|
msg["is_live"] = false;
|
|
StreamWebSocketController::broadcastStatsUpdate(msg);
|
|
|
|
// Stop all restream destinations
|
|
if (!r.empty()) {
|
|
int64_t realmId = r[0]["id"].as<int64_t>();
|
|
RestreamService::getInstance().stopAllDestinations(streamName, realmId);
|
|
}
|
|
}
|
|
>> [streamName](const DrogonDbException& e) {
|
|
LOG_ERROR << "Failed to mark realm offline via webhook: " << e.base().what();
|
|
};
|
|
}
|
|
else if (eventType == "sessionCreated" || eventType == "viewer.connected") {
|
|
// Viewer connected
|
|
LOG_INFO << "Viewer connected to stream: " << streamName;
|
|
StatsService::getInstance().updateStreamStats(streamName);
|
|
}
|
|
else if (eventType == "sessionDeleted" || eventType == "viewer.disconnected") {
|
|
// Viewer disconnected
|
|
LOG_INFO << "Viewer disconnected from stream: " << streamName;
|
|
StatsService::getInstance().updateStreamStats(streamName);
|
|
}
|
|
|
|
// Always respond with success to acknowledge the webhook
|
|
callback(jsonOk(json({{"success", true}, {"message", "Webhook processed"}})));
|
|
}
|
|
|
|
void StreamController::handleOmeAdmission(const HttpRequestPtr &req,
|
|
std::function<void(const HttpResponsePtr &)> &&callback) {
|
|
// Admission webhook - validates if a stream is allowed to publish/play
|
|
// OME sends: { "client": {...}, "request": { "direction", "protocol", "status", "url", ... } }
|
|
auto jsonPtr = req->getJsonObject();
|
|
if (!jsonPtr) {
|
|
LOG_WARN << "OME admission webhook received with invalid JSON";
|
|
callback(jsonError("Invalid JSON", k400BadRequest));
|
|
return;
|
|
}
|
|
|
|
const auto& payload = *jsonPtr;
|
|
LOG_INFO << "OME Admission webhook: " << payload.toStyledString();
|
|
|
|
// Check if this is a "closing" status - just acknowledge it
|
|
if (payload.isMember("request") && payload["request"].isMember("status")) {
|
|
std::string status = payload["request"]["status"].asString();
|
|
if (status == "closing") {
|
|
LOG_INFO << "OME admission closing notification";
|
|
Json::Value response;
|
|
callback(jsonOk(response)); // Empty response for closing
|
|
return;
|
|
}
|
|
}
|
|
|
|
// Extract stream key from URL: rtmp://host:port/app/STREAM_KEY or similar
|
|
std::string streamKey;
|
|
if (payload.isMember("request") && payload["request"].isMember("url")) {
|
|
std::string url = payload["request"]["url"].asString();
|
|
// URL format: scheme://host[:port]/app/stream_key[/file][?query]
|
|
// Find the stream key after /app/
|
|
size_t appPos = url.find("/app/");
|
|
if (appPos != std::string::npos) {
|
|
std::string afterApp = url.substr(appPos + 5); // Skip "/app/"
|
|
// Remove any trailing path or query string
|
|
size_t endPos = afterApp.find_first_of("/?");
|
|
if (endPos != std::string::npos) {
|
|
streamKey = afterApp.substr(0, endPos);
|
|
} else {
|
|
streamKey = afterApp;
|
|
}
|
|
}
|
|
LOG_INFO << "Extracted stream key from URL: " << streamKey << " (URL: " << url << ")";
|
|
}
|
|
|
|
if (streamKey.empty()) {
|
|
LOG_WARN << "OME admission webhook: could not extract stream key, allowing by default";
|
|
Json::Value response;
|
|
response["allowed"] = true;
|
|
callback(jsonOk(response));
|
|
return;
|
|
}
|
|
|
|
// Check direction - only validate "incoming" (publish) requests
|
|
std::string direction;
|
|
if (payload.isMember("request") && payload["request"].isMember("direction")) {
|
|
direction = payload["request"]["direction"].asString();
|
|
}
|
|
|
|
if (direction == "outgoing") {
|
|
// Playback request - allow all for now (could add viewer auth later)
|
|
LOG_INFO << "Allowing outgoing (playback) request for: " << streamKey;
|
|
Json::Value response;
|
|
response["allowed"] = true;
|
|
callback(jsonOk(response));
|
|
return;
|
|
}
|
|
|
|
// Validate stream key against database for incoming (publish) requests
|
|
auto dbClient = app().getDbClient();
|
|
*dbClient << "SELECT id FROM realms WHERE stream_key = $1 AND is_active = true"
|
|
<< streamKey
|
|
>> [callback, streamKey](const Result& r) {
|
|
Json::Value response;
|
|
if (!r.empty()) {
|
|
LOG_INFO << "Stream key validated for admission: " << streamKey;
|
|
response["allowed"] = true;
|
|
|
|
// Mark stream as live immediately when publishing is approved
|
|
int64_t realmId = r[0]["id"].as<int64_t>();
|
|
auto db = app().getDbClient();
|
|
*db << "UPDATE realms SET is_live = true, viewer_count = 0, "
|
|
"updated_at = CURRENT_TIMESTAMP WHERE id = $1"
|
|
<< realmId
|
|
>> [streamKey, realmId](const Result&) {
|
|
LOG_INFO << "Realm marked live on admission: " << streamKey;
|
|
|
|
// Broadcast to WebSocket clients
|
|
Json::Value msg;
|
|
msg["type"] = "stream_live";
|
|
msg["stream_key"] = streamKey;
|
|
msg["is_live"] = true;
|
|
StreamWebSocketController::broadcastStatsUpdate(msg);
|
|
|
|
// Trigger stats fetch
|
|
StatsService::getInstance().updateStreamStats(streamKey);
|
|
|
|
// Start restream destinations
|
|
RestreamService::getInstance().startAllDestinations(streamKey, realmId);
|
|
}
|
|
>> [streamKey](const DrogonDbException& e) {
|
|
LOG_ERROR << "Failed to mark realm live on admission: " << e.base().what();
|
|
};
|
|
} else {
|
|
LOG_WARN << "Invalid stream key rejected: " << streamKey;
|
|
response["allowed"] = false;
|
|
response["reason"] = "Invalid or inactive stream key";
|
|
}
|
|
callback(jsonOk(response));
|
|
}
|
|
>> [callback, streamKey](const DrogonDbException& e) {
|
|
LOG_ERROR << "Database error during admission check: " << e.base().what();
|
|
// Allow on DB error to prevent blocking legitimate streams
|
|
Json::Value response;
|
|
response["allowed"] = true;
|
|
callback(jsonOk(response));
|
|
};
|
|
} |