This commit is contained in:
parent
99151c6692
commit
3676dc46ed
16 changed files with 894 additions and 89 deletions
|
|
@ -260,23 +260,79 @@ void WatchSyncController::broadcastRoomSync(const std::string& realmId) {
|
|||
}
|
||||
|
||||
void WatchSyncController::autoAdvanceToNextVideo(const std::string& realmId) {
|
||||
// Check skip debounce to prevent multiple auto-advance calls
|
||||
// Variables for locked video handling (set inside lock, used after)
|
||||
bool isLockedVideo = false;
|
||||
RoomState stateCopy;
|
||||
int64_t nowMs = 0;
|
||||
|
||||
// Check skip debounce and in-memory lock state
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(roomStatesMutex_);
|
||||
auto it = roomStates_.find(realmId);
|
||||
if (it != roomStates_.end()) {
|
||||
auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
nowMs = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch()).count();
|
||||
if (now - it->second.lastSkipMs < SKIP_DEBOUNCE_MS) {
|
||||
if (nowMs - it->second.lastSkipMs < SKIP_DEBOUNCE_MS) {
|
||||
LOG_DEBUG << "Auto-advance debounced for room " << realmId;
|
||||
return;
|
||||
}
|
||||
// Mark skip time to prevent concurrent skips
|
||||
it->second.lastSkipMs = now;
|
||||
it->second.lastSkipMs = nowMs;
|
||||
|
||||
// CHECK IN-MEMORY LOCK STATE FIRST (CyTube-style immediate state)
|
||||
if (it->second.currentVideoLocked) {
|
||||
LOG_INFO << "Auto-advance: Current video is locked (in-memory), restarting in room " << realmId;
|
||||
|
||||
// Restart the locked video
|
||||
it->second.currentTime = 0.0;
|
||||
it->second.playbackState = "playing";
|
||||
it->second.lastUpdateMs = nowMs;
|
||||
it->second.leadInActive = true;
|
||||
it->second.leadInStartMs = nowMs;
|
||||
it->second.stateVersion++;
|
||||
|
||||
// Copy state for broadcast after releasing lock
|
||||
stateCopy = it->second;
|
||||
isLockedVideo = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Call backend to get playlist (includes isLocked status)
|
||||
// Handle locked video restart (outside the lock)
|
||||
if (isLockedVideo) {
|
||||
// Get viewer count for silent loop logic
|
||||
int viewerCount = getViewerCount(realmId);
|
||||
|
||||
if (viewerCount > 0) {
|
||||
// Broadcast locked_restart event
|
||||
Json::Value stateChange;
|
||||
stateChange["type"] = "state_change";
|
||||
stateChange["event"] = "locked_restart";
|
||||
stateChange["triggeredBy"] = "server";
|
||||
stateChange["playbackState"] = "playing";
|
||||
stateChange["currentTime"] = 0.0;
|
||||
stateChange["serverTime"] = static_cast<Json::Int64>(nowMs);
|
||||
stateChange["leadIn"] = true;
|
||||
stateChange["isLocked"] = true;
|
||||
|
||||
if (!stateCopy.currentVideoId.empty()) {
|
||||
Json::Value video;
|
||||
video["id"] = static_cast<Json::Int64>(stateCopy.currentPlaylistItemId);
|
||||
video["youtubeVideoId"] = stateCopy.currentVideoId;
|
||||
video["title"] = stateCopy.currentVideoTitle;
|
||||
video["durationSeconds"] = stateCopy.durationSeconds;
|
||||
video["isLocked"] = true;
|
||||
stateChange["currentVideo"] = video;
|
||||
}
|
||||
|
||||
broadcastToRoom(realmId, stateChange);
|
||||
} else {
|
||||
LOG_DEBUG << "Auto-advance: Silent loop for locked video in room " << realmId << " (no viewers)";
|
||||
}
|
||||
return; // Don't proceed to backend query
|
||||
}
|
||||
|
||||
// Call backend to get playlist (fallback for non-locked videos)
|
||||
auto client = HttpClient::newHttpClient("http://drogon-backend:8080");
|
||||
auto httpReq = HttpRequest::newHttpRequest();
|
||||
httpReq->setPath("/api/watch/" + realmId + "/playlist");
|
||||
|
|
@ -310,9 +366,16 @@ void WatchSyncController::autoAdvanceToNextVideo(const std::string& realmId) {
|
|||
}
|
||||
|
||||
const Json::Value& playlist = (*json)["playlist"];
|
||||
LOG_INFO << "Auto-advance: Got " << playlist.size() << " playlist items for room " << realmId;
|
||||
|
||||
for (const auto& item : playlist) {
|
||||
std::string status = item["status"].asString();
|
||||
bool isLocked = item.isMember("isLocked") && item["isLocked"].asBool();
|
||||
std::string title = item.isMember("title") ? item["title"].asString() : "unknown";
|
||||
|
||||
LOG_INFO << " - Item: " << title << ", status: " << status
|
||||
<< ", isLocked: " << (isLocked ? "true" : "false")
|
||||
<< ", hasIsLocked: " << (item.isMember("isLocked") ? "yes" : "no");
|
||||
|
||||
if (status == "queued") {
|
||||
queuedCount++;
|
||||
|
|
@ -722,6 +785,8 @@ void WatchSyncController::handleNewMessage(const WebSocketConnectionPtr& wsConnP
|
|||
handlePlaybackControl(wsConnPtr, info, json);
|
||||
} else if (msgType == "update_duration") {
|
||||
handleUpdateDuration(wsConnPtr, info, json);
|
||||
} else if (msgType == "lock_update") {
|
||||
handleLockUpdate(wsConnPtr, info, json);
|
||||
} else {
|
||||
sendError(wsConnPtr, "Unknown message type: " + msgType);
|
||||
}
|
||||
|
|
@ -1468,3 +1533,47 @@ void WatchSyncController::handleUpdateDuration(const WebSocketConnectionPtr& wsC
|
|||
}
|
||||
});
|
||||
}
|
||||
|
||||
void WatchSyncController::handleLockUpdate(const WebSocketConnectionPtr& wsConnPtr,
|
||||
const ViewerInfo& info,
|
||||
const Json::Value& data) {
|
||||
if (info.realmId.empty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Only owner/admin can toggle lock (frontend already enforces this)
|
||||
if (!info.canControlPlayback) {
|
||||
return;
|
||||
}
|
||||
|
||||
int64_t playlistItemId = data["playlistItemId"].asInt64();
|
||||
bool locked = data["locked"].asBool();
|
||||
|
||||
LOG_INFO << "Lock update for room " << info.realmId
|
||||
<< ": item " << playlistItemId << " locked=" << locked;
|
||||
|
||||
// Update in-memory state if this is the current video
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(roomStatesMutex_);
|
||||
auto it = roomStates_.find(info.realmId);
|
||||
if (it != roomStates_.end()) {
|
||||
if (it->second.currentPlaylistItemId == playlistItemId) {
|
||||
it->second.currentVideoLocked = locked;
|
||||
LOG_INFO << "Updated currentVideoLocked to " << locked
|
||||
<< " for room " << info.realmId;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast lock change to all viewers in the room
|
||||
auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch()).count();
|
||||
|
||||
Json::Value broadcast;
|
||||
broadcast["type"] = "lock_changed";
|
||||
broadcast["playlistItemId"] = static_cast<Json::Int64>(playlistItemId);
|
||||
broadcast["locked"] = locked;
|
||||
broadcast["serverTime"] = static_cast<Json::Int64>(now);
|
||||
|
||||
broadcastToRoom(info.realmId, broadcast);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -124,6 +124,10 @@ private:
|
|||
const ViewerInfo& info,
|
||||
const Json::Value& data);
|
||||
|
||||
void handleLockUpdate(const WebSocketConnectionPtr& wsConnPtr,
|
||||
const ViewerInfo& info,
|
||||
const Json::Value& data);
|
||||
|
||||
void sendError(const WebSocketConnectionPtr& wsConnPtr, const std::string& error);
|
||||
void sendSuccess(const WebSocketConnectionPtr& wsConnPtr, const Json::Value& data);
|
||||
|
||||
|
|
|
|||
|
|
@ -269,6 +269,10 @@ void ChatService::cleanupMessages() {
|
|||
LOG_DEBUG << "Cleaned up old messages for realm: " << realmId
|
||||
<< " (retention: " << settings.retentionHours << "h)";
|
||||
}
|
||||
|
||||
// Clean up any expired self-destruct messages that weren't deleted by their timers
|
||||
// (e.g., due to server restart or timer failures)
|
||||
redis.cleanupExpiredSelfDestruct(realmId);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -67,11 +67,15 @@ bool RedisMessageStore::addMessage(const ChatMessage& message) {
|
|||
trackActiveRealm(message.realmId);
|
||||
|
||||
auto key = getMessagesKey(message.realmId);
|
||||
auto indexKey = getMessageIndexKey(message.realmId);
|
||||
auto serialized = message.serialize();
|
||||
|
||||
// Add to sorted set with timestamp as score
|
||||
redis_->zadd(key, serialized, static_cast<double>(message.timestamp));
|
||||
|
||||
// Add to message index hash for reliable deletion by messageId
|
||||
redis_->hset(indexKey, message.messageId, serialized);
|
||||
|
||||
// Trim to max messages per realm
|
||||
auto maxMessages = drogon::app().getCustomConfig().get("chat", Json::Value::null)
|
||||
.get("max_messages_per_realm", 1000).asInt64();
|
||||
|
|
@ -94,6 +98,9 @@ std::vector<ChatMessage> RedisMessageStore::getMessages(const std::string& realm
|
|||
}
|
||||
try {
|
||||
auto key = getMessagesKey(realmId);
|
||||
auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch()
|
||||
).count();
|
||||
|
||||
std::vector<std::string> results;
|
||||
if (beforeTimestamp > 0) {
|
||||
|
|
@ -107,7 +114,12 @@ std::vector<ChatMessage> RedisMessageStore::getMessages(const std::string& realm
|
|||
}
|
||||
|
||||
for (const auto& serialized : results) {
|
||||
messages.push_back(ChatMessage::deserialize(serialized));
|
||||
auto msg = ChatMessage::deserialize(serialized);
|
||||
// Filter out expired self-destruct messages
|
||||
if (msg.selfDestructAt > 0 && msg.selfDestructAt <= now) {
|
||||
continue; // Skip expired messages
|
||||
}
|
||||
messages.push_back(msg);
|
||||
}
|
||||
|
||||
// Reverse to get chronological order
|
||||
|
|
@ -120,21 +132,33 @@ std::vector<ChatMessage> RedisMessageStore::getMessages(const std::string& realm
|
|||
}
|
||||
|
||||
bool RedisMessageStore::deleteMessage(const std::string& realmId, const std::string& messageId) {
|
||||
if (!redis_) {
|
||||
LOG_ERROR << "Redis not initialized - cannot delete message";
|
||||
return false;
|
||||
}
|
||||
try {
|
||||
auto key = getMessagesKey(realmId);
|
||||
auto indexKey = getMessageIndexKey(realmId);
|
||||
|
||||
// Get all messages and remove the one with matching ID
|
||||
std::vector<std::string> messages;
|
||||
redis_->zrange(key, 0, -1, std::back_inserter(messages));
|
||||
|
||||
for (const auto& serialized : messages) {
|
||||
auto msg = ChatMessage::deserialize(serialized);
|
||||
if (msg.messageId == messageId) {
|
||||
redis_->zrem(key, serialized);
|
||||
return true;
|
||||
}
|
||||
// Look up the exact serialized string from the message index
|
||||
auto serialized = redis_->hget(indexKey, messageId);
|
||||
if (!serialized) {
|
||||
LOG_WARN << "Message not found in index: " << messageId;
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
|
||||
// Remove from sorted set using the exact serialized string
|
||||
auto removed = redis_->zrem(key, *serialized);
|
||||
|
||||
// Remove from message index
|
||||
redis_->hdel(indexKey, messageId);
|
||||
|
||||
if (removed == 0) {
|
||||
LOG_WARN << "Message was in index but not in sorted set: " << messageId;
|
||||
}
|
||||
|
||||
LOG_DEBUG << "Deleted message: " << messageId << " (zrem=" << removed << ")";
|
||||
return true;
|
||||
} catch (const Error& e) {
|
||||
LOG_ERROR << "Redis error deleting message: " << e.what();
|
||||
return false;
|
||||
|
|
@ -155,6 +179,41 @@ void RedisMessageStore::cleanupOldMessages(const std::string& realmId, int reten
|
|||
}
|
||||
}
|
||||
|
||||
void RedisMessageStore::cleanupExpiredSelfDestruct(const std::string& realmId) {
|
||||
if (!redis_) return;
|
||||
try {
|
||||
auto key = getMessagesKey(realmId);
|
||||
auto indexKey = getMessageIndexKey(realmId);
|
||||
auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::system_clock::now().time_since_epoch()
|
||||
).count();
|
||||
|
||||
// Get all messages to check for expired self-destruct
|
||||
std::vector<std::string> messages;
|
||||
redis_->zrange(key, 0, -1, std::back_inserter(messages));
|
||||
|
||||
int cleanedCount = 0;
|
||||
for (const auto& serialized : messages) {
|
||||
auto msg = ChatMessage::deserialize(serialized);
|
||||
// Check if message has expired selfDestructAt time
|
||||
if (msg.selfDestructAt > 0 && msg.selfDestructAt <= now) {
|
||||
// Remove from sorted set
|
||||
redis_->zrem(key, serialized);
|
||||
// Remove from message index
|
||||
redis_->hdel(indexKey, msg.messageId);
|
||||
cleanedCount++;
|
||||
LOG_DEBUG << "Cleaned up expired self-destruct message: " << msg.messageId;
|
||||
}
|
||||
}
|
||||
|
||||
if (cleanedCount > 0) {
|
||||
LOG_INFO << "Cleaned up " << cleanedCount << " expired self-destruct messages in realm: " << realmId;
|
||||
}
|
||||
} catch (const Error& e) {
|
||||
LOG_ERROR << "Redis error cleaning up expired self-destruct messages: " << e.what();
|
||||
}
|
||||
}
|
||||
|
||||
void RedisMessageStore::setGlobalSettings(const GlobalChatSettings& settings) {
|
||||
try {
|
||||
redis_->hset("chat:settings:global", "guestPrefix", settings.guestPrefix);
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ public:
|
|||
int64_t beforeTimestamp = 0);
|
||||
bool deleteMessage(const std::string& realmId, const std::string& messageId);
|
||||
void cleanupOldMessages(const std::string& realmId, int retentionHours);
|
||||
void cleanupExpiredSelfDestruct(const std::string& realmId);
|
||||
|
||||
// Settings operations
|
||||
void setGlobalSettings(const models::GlobalChatSettings& settings);
|
||||
|
|
@ -119,6 +120,10 @@ private:
|
|||
std::string getKickKey(const std::string& realmId, const std::string& userId) const {
|
||||
return "chat:kicked:" + realmId + ":" + userId;
|
||||
}
|
||||
|
||||
std::string getMessageIndexKey(const std::string& realmId) const {
|
||||
return "chat:msg_index:" + realmId;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace services
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue