#pragma once #include #include #include #include #include #include using namespace drogon; class WatchSyncController : public drogon::WebSocketController { public: void handleNewMessage(const WebSocketConnectionPtr& wsConnPtr, std::string&& message, const WebSocketMessageType& type) override; void handleNewConnection(const HttpRequestPtr& req, const WebSocketConnectionPtr& wsConnPtr) override; void handleConnectionClosed(const WebSocketConnectionPtr& wsConnPtr) override; WS_PATH_LIST_BEGIN WS_PATH_ADD("/watch/ws"); WS_PATH_LIST_END // Broadcast sync state to all viewers in a watch room static void broadcastToRoom(const std::string& realmId, const Json::Value& message); // Get viewer count for a room static int getViewerCount(const std::string& realmId); // Force linker to include this object file static void ensureLoaded(); // Start/stop the sync loop (called on first connection and when last viewer leaves) static void startSyncLoop(); static void stopSyncLoop(); private: struct ViewerInfo { std::string realmId; std::string userId; std::string username; std::string authToken; // Store auth token for backend API calls bool canAddToPlaylist = false; bool canControlPlayback = false; bool isGuest = false; std::chrono::system_clock::time_point connectionTime; // Rate limiting fields int64_t lastMessageMs = 0; // Timestamp of last message int messageCount = 0; // Message count in current window int64_t windowStartMs = 0; // Start of rate limit window }; // In-memory room state for accurate time tracking (CyTube-style) struct RoomState { std::string playbackState = "paused"; // "playing", "paused", "ended", "buffering" double currentTime = 0.0; // Current playback position in seconds int64_t lastUpdateMs = 0; // Timestamp of last update (milliseconds) std::string currentVideoId; // YouTube video ID int64_t currentPlaylistItemId = 0; // Playlist item ID std::string currentVideoTitle; int durationSeconds = 0; bool leadInActive = false; // True during initial buffering period int64_t leadInStartMs = 0; // When lead-in started int repeatCount = 0; // Current repeat count for last video (max 3) bool isRepeating = false; // True when in repeat mode (last video looping) bool currentVideoLocked = false; // True if current video is locked (loops forever) // Skip idempotency tracking int64_t lastSkipMs = 0; // Timestamp of last skip (prevents double-skip) uint64_t stateVersion = 0; // State version for sync validation // State freshness tracking int64_t lastDbSyncMs = 0; // Last time state was synced from database // Client-reported video end (fallback when duration is unknown) bool videoEndedReported = false; // True when client reports video ended }; static std::unordered_map viewers_; static std::mutex viewersMutex_; // In-memory room states (keyed by realmId) static std::unordered_map roomStates_; static std::mutex roomStatesMutex_; // Sync loop thread static std::thread syncLoopThread_; static std::atomic syncLoopRunning_; static std::mutex syncLoopMutex_; // Protects thread start/stop operations // Sync loop - runs every second to update time and broadcast static void syncLoop(); // Update room state from database (called when joining or on state change) static void updateRoomStateFromDb(const std::string& realmId); // Broadcast current state to all viewers in a room static void broadcastRoomSync(const std::string& realmId); // Auto-advance to next video when current video ends (server-side, no owner required) static void autoAdvanceToNextVideo(const std::string& realmId); // Get current expected playback time for a room static double getExpectedTime(const RoomState& state); void handleJoinRoom(const WebSocketConnectionPtr& wsConnPtr, ViewerInfo& info, const Json::Value& data); void handleSyncRequest(const WebSocketConnectionPtr& wsConnPtr, const ViewerInfo& info); void handlePlaybackControl(const WebSocketConnectionPtr& wsConnPtr, const ViewerInfo& info, const Json::Value& data); void handleSkipWithRepeat(const WebSocketConnectionPtr& wsConnPtr, const ViewerInfo& info, const Json::Value& data); void performSkip(const WebSocketConnectionPtr& wsConnPtr, const ViewerInfo& info, const Json::Value& data); void handleUpdateDuration(const WebSocketConnectionPtr& wsConnPtr, const ViewerInfo& info, const Json::Value& data); void handleLockUpdate(const WebSocketConnectionPtr& wsConnPtr, const ViewerInfo& info, const Json::Value& data); void handleVideoEnded(const WebSocketConnectionPtr& wsConnPtr, const ViewerInfo& info); void sendError(const WebSocketConnectionPtr& wsConnPtr, const std::string& error); void sendSuccess(const WebSocketConnectionPtr& wsConnPtr, const Json::Value& data); // Helper to check if connection still exists (for async callback safety) static bool isConnectionValid(const WebSocketConnectionPtr& wsConnPtr); // Helper to safely send to a connection (validates first) static void safeSend(const WebSocketConnectionPtr& wsConnPtr, const std::string& message); // Broadcast viewer count update static void broadcastViewerCount(const std::string& realmId); // Rate limiting check (returns true if message should be processed) bool checkRateLimit(const WebSocketConnectionPtr& wsConnPtr); // Constants for rate limiting static constexpr int RATE_LIMIT_MESSAGES = 30; // Max messages per window static constexpr int64_t RATE_LIMIT_WINDOW_MS = 10000; // 10 second window static constexpr int64_t MIN_MESSAGE_INTERVAL_MS = 100; // Min 100ms between messages // Skip debounce interval (prevent double-skips within this window) static constexpr int64_t SKIP_DEBOUNCE_MS = 1000; // Database sync freshness threshold (refresh from DB if older than this) // Reduced to 2 seconds for faster response to state changes (video deletion, etc.) static constexpr int64_t DB_SYNC_STALE_MS = 2000; };