iykyk_msn/mobile/lib/core/chat_socket.dart

188 lines
5.2 KiB
Dart

import 'dart:async';
import 'dart:convert';
import 'dart:math';
import 'package:flutter_riverpod/flutter_riverpod.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import '../models/message_model.dart';
import '../models/ws_chat_side_event.dart';
import 'api_client.dart';
import 'api_config.dart';
import 'token_storage.dart';
final chatSocketProvider = Provider<ChatSocketService>((ref) {
final svc = ChatSocketService(ref.watch(tokenStorageProvider));
ref.onDispose(svc.dispose);
return svc;
});
class _PendingSend {
_PendingSend(this.roomId, this.body);
final String roomId;
final String body;
}
class ChatSocketService {
ChatSocketService(this._storage);
final TokenStorage _storage;
WebSocketChannel? _channel;
final _incoming = StreamController<MessageModel>.broadcast();
final _side = StreamController<Object>.broadcast();
final Set<String> _subscribedRooms = {};
final List<_PendingSend> _pendingSends = [];
Timer? _reconnectTimer;
int _reconnectAttempt = 0;
bool _disposed = false;
int get pendingSendCount => _pendingSends.length;
Stream<MessageModel> get messages => _incoming.stream;
/// [WsTypingEvent] and [WsReadEvent] from server.
Stream<Object> get sideEvents => _side.stream;
Uri _wsUri(String token) {
final base = ApiConfig.baseUrl.replaceFirst(RegExp(r'^http'), 'ws');
return Uri.parse('$base/ws').replace(queryParameters: {'token': token});
}
Future<void> _closeChannelOnly() async {
await _channel?.sink.close();
_channel = null;
}
Future<void> connect() async {
if (_disposed) return;
_reconnectTimer?.cancel();
_reconnectTimer = null;
await _closeChannelOnly();
final token = await _storage.readToken();
if (token == null || token.isEmpty) return;
try {
_channel = WebSocketChannel.connect(_wsUri(token));
} catch (_) {
_scheduleReconnect();
return;
}
_reconnectAttempt = 0;
_channel!.stream.listen(
(raw) {
try {
final map = jsonDecode(raw as String) as Map<String, dynamic>;
final t = map['type'] as String?;
if (t == 'message' && map['message'] != null) {
final m = map['message'] as Map<String, dynamic>;
_incoming.add(MessageModel.fromWs(m));
} else if (t == 'typing') {
_side.add(
WsTypingEvent(
roomId: map['roomId'] as String,
userId: map['userId'] as String,
active: map['active'] as bool? ?? false,
),
);
} else if (t == 'read') {
_side.add(
WsReadEvent(
roomId: map['roomId'] as String,
userId: map['userId'] as String,
upToMessageId: map['upToMessageId'] as String,
),
);
}
} catch (_) {}
},
onError: (_) => _onConnectionLost(),
onDone: _onConnectionLost,
cancelOnError: false,
);
for (final id in _subscribedRooms) {
_channel?.sink.add(jsonEncode({'type': 'subscribe', 'roomId': id}));
}
_flushPendingSends();
}
void _flushPendingSends() {
if (_channel == null || _pendingSends.isEmpty) return;
final batch = List<_PendingSend>.from(_pendingSends);
_pendingSends.clear();
for (final p in batch) {
try {
_channel?.sink.add(
jsonEncode({'type': 'send', 'roomId': p.roomId, 'body': p.body}),
);
} catch (_) {
_pendingSends.add(p);
}
}
}
void _onConnectionLost() {
if (_disposed) return;
_scheduleReconnect();
}
void _scheduleReconnect() {
if (_disposed) return;
_reconnectTimer?.cancel();
final ms = min(30000, 500 * (1 << min(_reconnectAttempt, 6)));
_reconnectAttempt++;
_reconnectTimer = Timer(Duration(milliseconds: ms), () {
if (_disposed) return;
unawaited(connect());
});
}
/// Logout: cancel reconnect, clear room subscriptions, close socket.
Future<void> disconnect() async {
_reconnectTimer?.cancel();
_reconnectTimer = null;
_subscribedRooms.clear();
_pendingSends.clear();
_reconnectAttempt = 0;
await _closeChannelOnly();
}
void subscribeRoom(String roomId) {
_subscribedRooms.add(roomId);
_channel?.sink.add(jsonEncode({'type': 'subscribe', 'roomId': roomId}));
}
/// Returns false if the message was queued (no socket); true if written to socket.
void sendTyping(String roomId, bool active) {
if (_channel == null) return;
try {
_channel!.sink.add(
jsonEncode({'type': 'typing', 'roomId': roomId, 'active': active}),
);
} catch (_) {}
}
bool sendMessage(String roomId, String body) {
if (_channel == null) {
_pendingSends.add(_PendingSend(roomId, body));
return false;
}
try {
_channel!.sink.add(
jsonEncode({'type': 'send', 'roomId': roomId, 'body': body}),
);
return true;
} catch (_) {
_pendingSends.add(_PendingSend(roomId, body));
return false;
}
}
void dispose() {
_disposed = true;
_reconnectTimer?.cancel();
_reconnectTimer = null;
unawaited(_closeChannelOnly());
_incoming.close();
_side.close();
}
}