beeta/backend/src/controllers/RealmController.cpp
doomtube e8864cc853 nu
2025-08-10 07:55:39 -04:00

622 lines
No EOL
28 KiB
C++

#include "RealmController.h"
#include "../services/DatabaseService.h"
#include "../services/StatsService.h"
#include "../services/RedisHelper.h"
#include "../services/OmeClient.h"
#include <drogon/utils/Utilities.h>
#include <drogon/Cookie.h>
#include <random>
#include <sstream>
#include <iomanip>
#include <regex>
using namespace drogon::orm;
namespace {
HttpResponsePtr jsonResp(const Json::Value& j, HttpStatusCode c = k200OK) {
auto r = HttpResponse::newHttpJsonResponse(j);
r->setStatusCode(c);
return r;
}
HttpResponsePtr jsonError(const std::string& error, HttpStatusCode code = k400BadRequest) {
Json::Value j;
j["success"] = false;
j["error"] = error;
return jsonResp(j, code);
}
std::string generateStreamKey() {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(0, 255);
std::stringstream ss;
for (int i = 0; i < 16; ++i) {
ss << std::hex << std::setw(2) << std::setfill('0') << dis(gen);
}
return ss.str();
}
bool validateRealmName(const std::string& name) {
if (name.length() < 3 || name.length() > 30) {
return false;
}
return std::regex_match(name, std::regex("^[a-z0-9-]+$"));
}
void invalidateKeyInRedis(const std::string& oldKey) {
RedisHelper::addToSet("streams_to_disconnect", oldKey);
RedisHelper::deleteKey("stream_key:" + oldKey);
services::RedisHelper::instance().keysAsync("viewer_token:*",
[oldKey](const std::vector<std::string>& keys) {
for (const auto& tokenKey : keys) {
services::RedisHelper::instance().getAsync(tokenKey,
[tokenKey, oldKey](sw::redis::OptionalString streamKey) {
if (streamKey.has_value() && streamKey.value() == oldKey) {
RedisHelper::deleteKey(tokenKey);
}
}
);
}
}
);
}
}
UserInfo RealmController::getUserFromRequest(const HttpRequestPtr &req) {
UserInfo user;
std::string auth = req->getHeader("Authorization");
if (auth.empty() || auth.substr(0, 7) != "Bearer ") {
return user;
}
std::string token = auth.substr(7);
AuthService::getInstance().validateToken(token, user);
return user;
}
void RealmController::getUserRealms(const HttpRequestPtr &req,
std::function<void(const HttpResponsePtr &)> &&callback) {
UserInfo user = getUserFromRequest(req);
if (user.id == 0) {
callback(jsonError("Unauthorized", k401Unauthorized));
return;
}
auto dbClient = app().getDbClient();
*dbClient << "SELECT id, name, stream_key, is_active, is_live, viewer_count, created_at "
"FROM realms WHERE user_id = $1 ORDER BY created_at DESC"
<< user.id
>> [callback](const Result& r) {
Json::Value resp;
resp["success"] = true;
Json::Value realms(Json::arrayValue);
for (const auto& row : r) {
Json::Value realm;
realm["id"] = static_cast<Json::Int64>(row["id"].as<int64_t>());
realm["name"] = row["name"].as<std::string>();
realm["streamKey"] = row["stream_key"].as<std::string>();
realm["isActive"] = row["is_active"].as<bool>();
realm["isLive"] = row["is_live"].as<bool>();
realm["viewerCount"] = static_cast<Json::Int64>(row["viewer_count"].as<int64_t>());
realm["createdAt"] = row["created_at"].as<std::string>();
realms.append(realm);
}
resp["realms"] = realms;
callback(jsonResp(resp));
}
>> [callback](const DrogonDbException& e) {
LOG_ERROR << "Failed to get realms: " << e.base().what();
callback(jsonError("Failed to get realms"));
};
}
void RealmController::createRealm(const HttpRequestPtr &req,
std::function<void(const HttpResponsePtr &)> &&callback) {
UserInfo user = getUserFromRequest(req);
if (user.id == 0) {
callback(jsonError("Unauthorized", k401Unauthorized));
return;
}
// Check if user is a streamer
auto dbClient = app().getDbClient();
*dbClient << "SELECT is_streamer FROM users WHERE id = $1"
<< user.id
>> [req, callback, user, dbClient](const Result& r) {
if (r.empty() || !r[0]["is_streamer"].as<bool>()) {
callback(jsonError("You must be a streamer to create realms", k403Forbidden));
return;
}
auto json = req->getJsonObject();
if (!json) {
callback(jsonError("Invalid JSON"));
return;
}
std::string name = (*json)["name"].asString();
if (!validateRealmName(name)) {
callback(jsonError("Realm name must be 3-30 characters, lowercase letters, numbers, and hyphens only"));
return;
}
// Check if realm name already exists
*dbClient << "SELECT id FROM realms WHERE name = $1"
<< name
>> [dbClient, user, name, callback](const Result& r2) {
if (!r2.empty()) {
callback(jsonError("Realm name already taken"));
return;
}
// Check user's realm limit (e.g., 5 realms per user)
*dbClient << "SELECT COUNT(*) as count FROM realms WHERE user_id = $1"
<< user.id
>> [dbClient, user, name, callback](const Result& r3) {
if (!r3.empty() && r3[0]["count"].as<int64_t>() >= 5) {
callback(jsonError("You have reached the maximum number of realms (5)"));
return;
}
std::string streamKey = generateStreamKey();
*dbClient << "INSERT INTO realms (user_id, name, stream_key) "
"VALUES ($1, $2, $3) RETURNING id"
<< user.id << name << streamKey
>> [callback, name, streamKey](const Result& r4) {
if (r4.empty()) {
callback(jsonError("Failed to create realm"));
return;
}
// Store stream key in Redis
RedisHelper::storeKey("stream_key:" + streamKey, "1", 86400);
Json::Value resp;
resp["success"] = true;
resp["realm"]["id"] = static_cast<Json::Int64>(r4[0]["id"].as<int64_t>());
resp["realm"]["name"] = name;
resp["realm"]["streamKey"] = streamKey;
callback(jsonResp(resp));
}
>> [callback](const DrogonDbException& e) {
LOG_ERROR << "Failed to create realm: " << e.base().what();
callback(jsonError("Failed to create realm"));
};
}
>> [callback](const DrogonDbException& e) {
LOG_ERROR << "Failed to count realms: " << e.base().what();
callback(jsonError("Database error"));
};
}
>> [callback](const DrogonDbException& e) {
LOG_ERROR << "Failed to check realm name: " << e.base().what();
callback(jsonError("Database error"));
};
}
>> [callback](const DrogonDbException& e) {
LOG_ERROR << "Failed to check streamer status: " << e.base().what();
callback(jsonError("Database error"));
};
}
void RealmController::issueRealmViewerToken(const HttpRequestPtr &,
std::function<void(const HttpResponsePtr &)> &&callback,
const std::string &realmId) {
int64_t id = std::stoll(realmId);
auto dbClient = app().getDbClient();
*dbClient << "SELECT stream_key FROM realms WHERE id = $1 AND is_active = true"
<< id
>> [callback](const Result& r) {
if (r.empty()) {
callback(jsonResp({}, k404NotFound));
return;
}
std::string streamKey = r[0]["stream_key"].as<std::string>();
auto bytes = drogon::utils::genRandomString(32);
std::string token = drogon::utils::base64Encode(
(const unsigned char*)bytes.data(), bytes.length()
);
RedisHelper::storeKeyAsync("viewer_token:" + token, streamKey, 30,
[callback, token](bool stored) {
if (!stored) {
callback(jsonResp({}, k500InternalServerError));
return;
}
auto resp = HttpResponse::newHttpResponse();
Cookie cookie("viewer_token", token);
cookie.setPath("/");
cookie.setHttpOnly(true);
cookie.setSecure(false);
cookie.setMaxAge(300);
resp->addCookie(cookie);
Json::Value body;
body["success"] = true;
body["viewer_token"] = token;
body["expires_in"] = 30;
resp->setContentTypeCode(CT_APPLICATION_JSON);
resp->setBody(Json::FastWriter().write(body));
callback(resp);
}
);
}
>> [callback](const DrogonDbException& e) {
LOG_ERROR << "Database error: " << e.base().what();
callback(jsonResp({}, k500InternalServerError));
};
}
void RealmController::getRealmStreamKey(const HttpRequestPtr &req,
std::function<void(const HttpResponsePtr &)> &&callback,
const std::string &realmId) {
// Check for viewer token
auto token = req->getCookie("viewer_token");
if (token.empty()) {
callback(jsonError("No viewer token", k403Forbidden));
return;
}
int64_t id = std::stoll(realmId);
// First get the stream key for this realm
auto dbClient = app().getDbClient();
*dbClient << "SELECT stream_key FROM realms WHERE id = $1 AND is_active = true"
<< id
>> [token, callback](const Result& r) {
if (r.empty()) {
callback(jsonError("Realm not found", k404NotFound));
return;
}
std::string streamKey = r[0]["stream_key"].as<std::string>();
// Verify the token is valid for this stream
RedisHelper::getKeyAsync("viewer_token:" + token,
[callback, streamKey](const std::string& storedStreamKey) {
if (storedStreamKey != streamKey) {
callback(jsonError("Invalid token for this realm", k403Forbidden));
return;
}
// Token is valid, return the stream key
Json::Value resp;
resp["success"] = true;
resp["streamKey"] = streamKey;
callback(jsonResp(resp));
}
);
}
>> [callback](const DrogonDbException& e) {
LOG_ERROR << "Database error: " << e.base().what();
callback(jsonError("Database error"));
};
}
void RealmController::getRealm(const HttpRequestPtr &, // Remove parameter name since it's unused
std::function<void(const HttpResponsePtr &)> &&callback,
const std::string &realmId) {
// Remove authentication requirement for public viewing
int64_t id = std::stoll(realmId);
auto dbClient = app().getDbClient();
*dbClient << "SELECT r.*, u.username FROM realms r "
"JOIN users u ON r.user_id = u.id "
"WHERE r.id = $1 AND r.is_active = true"
<< id
>> [callback](const Result& r) {
if (r.empty()) {
callback(jsonError("Realm not found", k404NotFound));
return;
}
Json::Value resp;
resp["success"] = true;
auto& realm = resp["realm"];
realm["id"] = static_cast<Json::Int64>(r[0]["id"].as<int64_t>());
realm["name"] = r[0]["name"].as<std::string>();
// Don't expose stream key in public endpoint
// realm["streamKey"] = r[0]["stream_key"].as<std::string>();
realm["isActive"] = r[0]["is_active"].as<bool>();
realm["isLive"] = r[0]["is_live"].as<bool>();
realm["viewerCount"] = static_cast<Json::Int64>(r[0]["viewer_count"].as<int64_t>());
realm["createdAt"] = r[0]["created_at"].as<std::string>();
realm["username"] = r[0]["username"].as<std::string>();
callback(jsonResp(resp));
}
>> [callback](const DrogonDbException& e) {
LOG_ERROR << "Failed to get realm: " << e.base().what();
callback(jsonError("Database error"));
};
}
void RealmController::updateRealm(const HttpRequestPtr &req,
std::function<void(const HttpResponsePtr &)> &&callback,
const std::string &realmId) {
UserInfo user = getUserFromRequest(req);
if (user.id == 0) {
callback(jsonError("Unauthorized", k401Unauthorized));
return;
}
// Parse realm ID
int64_t id;
try {
id = std::stoll(realmId);
} catch (...) {
callback(jsonError("Invalid realm ID", k400BadRequest));
return;
}
// Verify the realm exists and belongs to the user
auto dbClient = app().getDbClient();
*dbClient << "SELECT id FROM realms WHERE id = $1 AND user_id = $2"
<< id << user.id
>> [callback](const Result& r) {
if (r.empty()) {
callback(jsonError("Realm not found or access denied", k404NotFound));
return;
}
// Currently no fields to update since we removed display_name and description
// This endpoint is kept for potential future updates
// For now, just return success
Json::Value resp;
resp["success"] = true;
resp["message"] = "Realm updated successfully";
callback(jsonResp(resp));
}
>> [callback](const DrogonDbException& e) {
LOG_ERROR << "Database error: " << e.base().what();
callback(jsonError("Database error"));
};
}
void RealmController::deleteRealm(const HttpRequestPtr &req,
std::function<void(const HttpResponsePtr &)> &&callback,
const std::string &realmId) {
UserInfo user = getUserFromRequest(req);
if (user.id == 0) {
callback(jsonError("Unauthorized", k401Unauthorized));
return;
}
int64_t id = std::stoll(realmId);
auto dbClient = app().getDbClient();
// First get the stream key to invalidate it
*dbClient << "SELECT stream_key FROM realms WHERE id = $1 AND user_id = $2"
<< id << user.id
>> [dbClient, id, user, callback](const Result& r) {
if (r.empty()) {
callback(jsonError("Realm not found", k404NotFound));
return;
}
std::string streamKey = r[0]["stream_key"].as<std::string>();
// Invalidate the key
invalidateKeyInRedis(streamKey);
// Delete the realm
*dbClient << "DELETE FROM realms WHERE id = $1 AND user_id = $2"
<< id << user.id
>> [callback](const Result&) {
Json::Value resp;
resp["success"] = true;
callback(jsonResp(resp));
}
>> [callback](const DrogonDbException& e) {
LOG_ERROR << "Failed to delete realm: " << e.base().what();
callback(jsonError("Failed to delete realm"));
};
}
>> [callback](const DrogonDbException& e) {
LOG_ERROR << "Failed to get realm: " << e.base().what();
callback(jsonError("Database error"));
};
}
void RealmController::regenerateRealmKey(const HttpRequestPtr &req,
std::function<void(const HttpResponsePtr &)> &&callback,
const std::string &realmId) {
UserInfo user = getUserFromRequest(req);
if (user.id == 0) {
callback(jsonError("Unauthorized", k401Unauthorized));
return;
}
int64_t id = std::stoll(realmId);
auto dbClient = app().getDbClient();
// Get old key
*dbClient << "SELECT stream_key FROM realms WHERE id = $1 AND user_id = $2"
<< id << user.id
>> [dbClient, id, user, callback](const Result& r) {
if (r.empty()) {
callback(jsonError("Realm not found", k404NotFound));
return;
}
std::string oldKey = r[0]["stream_key"].as<std::string>();
invalidateKeyInRedis(oldKey);
std::string newKey = generateStreamKey();
*dbClient << "UPDATE realms SET stream_key = $1 WHERE id = $2 AND user_id = $3"
<< newKey << id << user.id
>> [callback, newKey](const Result&) {
// Store new key in Redis
RedisHelper::storeKey("stream_key:" + newKey, "1", 86400);
Json::Value resp;
resp["success"] = true;
resp["streamKey"] = newKey;
callback(jsonResp(resp));
}
>> [callback](const DrogonDbException& e) {
LOG_ERROR << "Failed to update stream key: " << e.base().what();
callback(jsonError("Failed to regenerate key"));
};
}
>> [callback](const DrogonDbException& e) {
LOG_ERROR << "Failed to get realm: " << e.base().what();
callback(jsonError("Database error"));
};
}
void RealmController::getRealmByName(const HttpRequestPtr &,
std::function<void(const HttpResponsePtr &)> &&callback,
const std::string &realmName) {
auto dbClient = app().getDbClient();
*dbClient << "SELECT r.id, r.name, r.is_live, r.viewer_count, "
"u.username, u.avatar_url FROM realms r "
"JOIN users u ON r.user_id = u.id "
"WHERE r.name = $1 AND r.is_active = true"
<< realmName
>> [callback](const Result& r) {
if (r.empty()) {
callback(jsonError("Realm not found", k404NotFound));
return;
}
Json::Value resp;
resp["success"] = true;
auto& realm = resp["realm"];
realm["id"] = static_cast<Json::Int64>(r[0]["id"].as<int64_t>());
realm["name"] = r[0]["name"].as<std::string>();
realm["isLive"] = r[0]["is_live"].as<bool>();
realm["viewerCount"] = static_cast<Json::Int64>(r[0]["viewer_count"].as<int64_t>());
realm["username"] = r[0]["username"].as<std::string>();
realm["avatarUrl"] = r[0]["avatar_url"].isNull() ? "" : r[0]["avatar_url"].as<std::string>();
callback(jsonResp(resp));
}
>> [callback](const DrogonDbException& e) {
LOG_ERROR << "Failed to get realm by name: " << e.base().what();
callback(jsonError("Database error"));
};
}
void RealmController::getLiveRealms(const HttpRequestPtr &,
std::function<void(const HttpResponsePtr &)> &&callback) {
auto dbClient = app().getDbClient();
*dbClient << "SELECT r.name, r.viewer_count, u.username, u.avatar_url "
"FROM realms r JOIN users u ON r.user_id = u.id "
"WHERE r.is_live = true AND r.is_active = true "
"ORDER BY r.viewer_count DESC"
>> [callback](const Result& r) {
Json::Value resp(Json::arrayValue);
for (const auto& row : r) {
Json::Value realm;
realm["name"] = row["name"].as<std::string>();
realm["viewerCount"] = static_cast<Json::Int64>(row["viewer_count"].as<int64_t>());
realm["username"] = row["username"].as<std::string>();
realm["avatarUrl"] = row["avatar_url"].isNull() ? "" : row["avatar_url"].as<std::string>();
resp.append(realm);
}
callback(jsonResp(resp));
}
>> [callback](const DrogonDbException& e) {
LOG_ERROR << "Failed to get live realms: " << e.base().what();
callback(jsonError("Database error"));
};
}
void RealmController::validateRealmKey(const HttpRequestPtr &,
std::function<void(const HttpResponsePtr &)> &&callback,
const std::string &key) {
auto dbClient = app().getDbClient();
*dbClient << "SELECT id FROM realms WHERE stream_key = $1 AND is_active = true"
<< key
>> [callback, key](const Result& r) {
bool valid = !r.empty();
if (valid) {
// Store in Redis
RedisHelper::storeKey("stream_key:" + key, "1", 86400);
}
Json::Value resp;
resp["valid"] = valid;
callback(jsonResp(resp));
}
>> [callback](const DrogonDbException& e) {
LOG_ERROR << "Database error: " << e.base().what();
Json::Value resp;
resp["valid"] = false;
callback(jsonResp(resp));
};
}
void RealmController::getRealmStats(const HttpRequestPtr &,
std::function<void(const HttpResponsePtr &)> &&callback,
const std::string &realmId) {
// Public endpoint - no authentication required
int64_t id = std::stoll(realmId);
auto dbClient = app().getDbClient();
*dbClient << "SELECT stream_key FROM realms WHERE id = $1"
<< id
>> [callback](const Result& r) {
if (r.empty()) {
callback(jsonError("Realm not found", k404NotFound));
return;
}
std::string streamKey = r[0]["stream_key"].as<std::string>();
StatsService::getInstance().getStreamStats(streamKey,
[callback](bool success, const StreamStats& stats) {
if (success) {
Json::Value json;
json["success"] = true;
auto& s = json["stats"];
s["connections"] = static_cast<Json::Int64>(stats.uniqueViewers);
s["total_connections"] = static_cast<Json::Int64>(stats.totalConnections);
s["bytes_in"] = static_cast<Json::Int64>(stats.totalBytesIn);
s["bytes_out"] = static_cast<Json::Int64>(stats.totalBytesOut);
s["bitrate"] = stats.bitrate;
s["codec"] = stats.codec;
s["resolution"] = stats.resolution;
s["fps"] = stats.fps;
s["is_live"] = stats.isLive;
// Protocol breakdown
auto& pc = s["protocol_connections"];
pc["webrtc"] = static_cast<Json::Int64>(stats.protocolConnections.webrtc);
pc["hls"] = static_cast<Json::Int64>(stats.protocolConnections.hls);
pc["llhls"] = static_cast<Json::Int64>(stats.protocolConnections.llhls);
pc["dash"] = static_cast<Json::Int64>(stats.protocolConnections.dash);
callback(jsonResp(json));
} else {
callback(jsonError("Failed to retrieve stats"));
}
});
}
>> [callback](const DrogonDbException& e) {
LOG_ERROR << "Database error: " << e.base().what();
callback(jsonError("Database error"));
};
}