beeta/chat-service/src/controllers/WatchSyncController.h
doomtube 7f56f19e94
All checks were successful
Build and Push / build-all (push) Successful in 8m52s
Fix: Force pull images in deploy workflow
2026-01-06 23:20:31 -05:00

153 lines
6.5 KiB
C++

#pragma once
#include <drogon/WebSocketController.h>
#include <unordered_map>
#include <mutex>
#include <chrono>
#include <thread>
#include <atomic>
using namespace drogon;
class WatchSyncController : public drogon::WebSocketController<WatchSyncController> {
public:
void handleNewMessage(const WebSocketConnectionPtr& wsConnPtr, std::string&& message,
const WebSocketMessageType& type) override;
void handleNewConnection(const HttpRequestPtr& req, const WebSocketConnectionPtr& wsConnPtr) override;
void handleConnectionClosed(const WebSocketConnectionPtr& wsConnPtr) override;
WS_PATH_LIST_BEGIN
WS_PATH_ADD("/watch/ws");
WS_PATH_LIST_END
// Broadcast sync state to all viewers in a watch room
static void broadcastToRoom(const std::string& realmId, const Json::Value& message);
// Get viewer count for a room
static int getViewerCount(const std::string& realmId);
// Force linker to include this object file
static void ensureLoaded();
// Start/stop the sync loop (called on first connection and when last viewer leaves)
static void startSyncLoop();
static void stopSyncLoop();
private:
struct ViewerInfo {
std::string realmId;
std::string userId;
std::string username;
std::string authToken; // Store auth token for backend API calls
bool canAddToPlaylist = false;
bool canControlPlayback = false;
bool isGuest = false;
std::chrono::system_clock::time_point connectionTime;
// Rate limiting fields
int64_t lastMessageMs = 0; // Timestamp of last message
int messageCount = 0; // Message count in current window
int64_t windowStartMs = 0; // Start of rate limit window
};
// In-memory room state for accurate time tracking (CyTube-style)
struct RoomState {
std::string playbackState = "paused"; // "playing", "paused", "ended", "buffering"
double currentTime = 0.0; // Current playback position in seconds
int64_t lastUpdateMs = 0; // Timestamp of last update (milliseconds)
std::string currentVideoId; // YouTube video ID
int64_t currentPlaylistItemId = 0; // Playlist item ID
std::string currentVideoTitle;
int durationSeconds = 0;
bool leadInActive = false; // True during initial buffering period
int64_t leadInStartMs = 0; // When lead-in started
int repeatCount = 0; // Current repeat count for last video (max 3)
bool isRepeating = false; // True when in repeat mode (last video looping)
bool currentVideoLocked = false; // True if current video is locked (loops forever)
// Skip idempotency tracking
int64_t lastSkipMs = 0; // Timestamp of last skip (prevents double-skip)
uint64_t stateVersion = 0; // State version for sync validation
// State freshness tracking
int64_t lastDbSyncMs = 0; // Last time state was synced from database
};
static std::unordered_map<WebSocketConnectionPtr, ViewerInfo> viewers_;
static std::mutex viewersMutex_;
// In-memory room states (keyed by realmId)
static std::unordered_map<std::string, RoomState> roomStates_;
static std::mutex roomStatesMutex_;
// Sync loop thread
static std::thread syncLoopThread_;
static std::atomic<bool> syncLoopRunning_;
static std::mutex syncLoopMutex_; // Protects thread start/stop operations
// Sync loop - runs every second to update time and broadcast
static void syncLoop();
// Update room state from database (called when joining or on state change)
static void updateRoomStateFromDb(const std::string& realmId);
// Broadcast current state to all viewers in a room
static void broadcastRoomSync(const std::string& realmId);
// Auto-advance to next video when current video ends (server-side, no owner required)
static void autoAdvanceToNextVideo(const std::string& realmId);
// Get current expected playback time for a room
static double getExpectedTime(const RoomState& state);
void handleJoinRoom(const WebSocketConnectionPtr& wsConnPtr,
ViewerInfo& info,
const Json::Value& data);
void handleSyncRequest(const WebSocketConnectionPtr& wsConnPtr,
const ViewerInfo& info);
void handlePlaybackControl(const WebSocketConnectionPtr& wsConnPtr,
const ViewerInfo& info,
const Json::Value& data);
void handleSkipWithRepeat(const WebSocketConnectionPtr& wsConnPtr,
const ViewerInfo& info,
const Json::Value& data);
void performSkip(const WebSocketConnectionPtr& wsConnPtr,
const ViewerInfo& info,
const Json::Value& data);
void handleUpdateDuration(const WebSocketConnectionPtr& wsConnPtr,
const ViewerInfo& info,
const Json::Value& data);
void sendError(const WebSocketConnectionPtr& wsConnPtr, const std::string& error);
void sendSuccess(const WebSocketConnectionPtr& wsConnPtr, const Json::Value& data);
// Helper to check if connection still exists (for async callback safety)
static bool isConnectionValid(const WebSocketConnectionPtr& wsConnPtr);
// Helper to safely send to a connection (validates first)
static void safeSend(const WebSocketConnectionPtr& wsConnPtr, const std::string& message);
// Broadcast viewer count update
static void broadcastViewerCount(const std::string& realmId);
// Rate limiting check (returns true if message should be processed)
bool checkRateLimit(const WebSocketConnectionPtr& wsConnPtr);
// Constants for rate limiting
static constexpr int RATE_LIMIT_MESSAGES = 30; // Max messages per window
static constexpr int64_t RATE_LIMIT_WINDOW_MS = 10000; // 10 second window
static constexpr int64_t MIN_MESSAGE_INTERVAL_MS = 100; // Min 100ms between messages
// Skip debounce interval (prevent double-skips within this window)
static constexpr int64_t SKIP_DEBOUNCE_MS = 1000;
// Database sync freshness threshold (refresh from DB if older than this)
// Reduced to 2 seconds for faster response to state changes (video deletion, etc.)
static constexpr int64_t DB_SYNC_STALE_MS = 2000;
};