import 'dart:async'; import 'dart:convert'; import 'dart:math'; import 'package:flutter_riverpod/flutter_riverpod.dart'; import 'package:web_socket_channel/web_socket_channel.dart'; import '../models/message_model.dart'; import '../models/ws_chat_side_event.dart'; import 'api_client.dart'; import 'api_config.dart'; import 'token_storage.dart'; final chatSocketProvider = Provider((ref) { final svc = ChatSocketService(ref.watch(tokenStorageProvider)); ref.onDispose(svc.dispose); return svc; }); class _PendingSend { _PendingSend(this.roomId, this.body); final String roomId; final String body; } class ChatSocketService { ChatSocketService(this._storage); final TokenStorage _storage; WebSocketChannel? _channel; final _incoming = StreamController.broadcast(); final _side = StreamController.broadcast(); final Set _subscribedRooms = {}; final List<_PendingSend> _pendingSends = []; Timer? _reconnectTimer; int _reconnectAttempt = 0; bool _disposed = false; int get pendingSendCount => _pendingSends.length; Stream get messages => _incoming.stream; /// [WsTypingEvent] and [WsReadEvent] from server. Stream get sideEvents => _side.stream; Uri _wsUri(String token) { final base = ApiConfig.baseUrl.replaceFirst(RegExp(r'^http'), 'ws'); return Uri.parse('$base/ws').replace(queryParameters: {'token': token}); } Future _closeChannelOnly() async { await _channel?.sink.close(); _channel = null; } Future connect() async { if (_disposed) return; _reconnectTimer?.cancel(); _reconnectTimer = null; await _closeChannelOnly(); final token = await _storage.readToken(); if (token == null || token.isEmpty) return; try { _channel = WebSocketChannel.connect(_wsUri(token)); } catch (_) { _scheduleReconnect(); return; } _reconnectAttempt = 0; _channel!.stream.listen( (raw) { try { final map = jsonDecode(raw as String) as Map; final t = map['type'] as String?; if (t == 'message' && map['message'] != null) { final m = map['message'] as Map; _incoming.add(MessageModel.fromWs(m)); } else if (t == 'typing') { _side.add( WsTypingEvent( roomId: map['roomId'] as String, userId: map['userId'] as String, active: map['active'] as bool? ?? false, ), ); } else if (t == 'read') { _side.add( WsReadEvent( roomId: map['roomId'] as String, userId: map['userId'] as String, upToMessageId: map['upToMessageId'] as String, ), ); } } catch (_) {} }, onError: (_) => _onConnectionLost(), onDone: _onConnectionLost, cancelOnError: false, ); for (final id in _subscribedRooms) { _channel?.sink.add(jsonEncode({'type': 'subscribe', 'roomId': id})); } _flushPendingSends(); } void _flushPendingSends() { if (_channel == null || _pendingSends.isEmpty) return; final batch = List<_PendingSend>.from(_pendingSends); _pendingSends.clear(); for (final p in batch) { try { _channel?.sink.add( jsonEncode({'type': 'send', 'roomId': p.roomId, 'body': p.body}), ); } catch (_) { _pendingSends.add(p); } } } void _onConnectionLost() { if (_disposed) return; _scheduleReconnect(); } void _scheduleReconnect() { if (_disposed) return; _reconnectTimer?.cancel(); final ms = min(30000, 500 * (1 << min(_reconnectAttempt, 6))); _reconnectAttempt++; _reconnectTimer = Timer(Duration(milliseconds: ms), () { if (_disposed) return; unawaited(connect()); }); } /// Logout: cancel reconnect, clear room subscriptions, close socket. Future disconnect() async { _reconnectTimer?.cancel(); _reconnectTimer = null; _subscribedRooms.clear(); _pendingSends.clear(); _reconnectAttempt = 0; await _closeChannelOnly(); } void subscribeRoom(String roomId) { _subscribedRooms.add(roomId); _channel?.sink.add(jsonEncode({'type': 'subscribe', 'roomId': roomId})); } /// Returns false if the message was queued (no socket); true if written to socket. void sendTyping(String roomId, bool active) { if (_channel == null) return; try { _channel!.sink.add( jsonEncode({'type': 'typing', 'roomId': roomId, 'active': active}), ); } catch (_) {} } bool sendMessage(String roomId, String body) { if (_channel == null) { _pendingSends.add(_PendingSend(roomId, body)); return false; } try { _channel!.sink.add( jsonEncode({'type': 'send', 'roomId': roomId, 'body': body}), ); return true; } catch (_) { _pendingSends.add(_PendingSend(roomId, body)); return false; } } void dispose() { _disposed = true; _reconnectTimer?.cancel(); _reconnectTimer = null; unawaited(_closeChannelOnly()); _incoming.close(); _side.close(); } }