188 lines
5.2 KiB
Dart
188 lines
5.2 KiB
Dart
import 'dart:async';
|
|
import 'dart:convert';
|
|
import 'dart:math';
|
|
|
|
import 'package:flutter_riverpod/flutter_riverpod.dart';
|
|
import 'package:web_socket_channel/web_socket_channel.dart';
|
|
|
|
import '../models/message_model.dart';
|
|
import '../models/ws_chat_side_event.dart';
|
|
import 'api_client.dart';
|
|
import 'api_config.dart';
|
|
import 'token_storage.dart';
|
|
|
|
final chatSocketProvider = Provider<ChatSocketService>((ref) {
|
|
final svc = ChatSocketService(ref.watch(tokenStorageProvider));
|
|
ref.onDispose(svc.dispose);
|
|
return svc;
|
|
});
|
|
|
|
class _PendingSend {
|
|
_PendingSend(this.roomId, this.body);
|
|
final String roomId;
|
|
final String body;
|
|
}
|
|
|
|
class ChatSocketService {
|
|
ChatSocketService(this._storage);
|
|
|
|
final TokenStorage _storage;
|
|
WebSocketChannel? _channel;
|
|
final _incoming = StreamController<MessageModel>.broadcast();
|
|
final _side = StreamController<Object>.broadcast();
|
|
final Set<String> _subscribedRooms = {};
|
|
final List<_PendingSend> _pendingSends = [];
|
|
Timer? _reconnectTimer;
|
|
int _reconnectAttempt = 0;
|
|
bool _disposed = false;
|
|
|
|
int get pendingSendCount => _pendingSends.length;
|
|
|
|
Stream<MessageModel> get messages => _incoming.stream;
|
|
|
|
/// [WsTypingEvent] and [WsReadEvent] from server.
|
|
Stream<Object> get sideEvents => _side.stream;
|
|
|
|
Uri _wsUri(String token) {
|
|
final base = ApiConfig.baseUrl.replaceFirst(RegExp(r'^http'), 'ws');
|
|
return Uri.parse('$base/ws').replace(queryParameters: {'token': token});
|
|
}
|
|
|
|
Future<void> _closeChannelOnly() async {
|
|
await _channel?.sink.close();
|
|
_channel = null;
|
|
}
|
|
|
|
Future<void> connect() async {
|
|
if (_disposed) return;
|
|
_reconnectTimer?.cancel();
|
|
_reconnectTimer = null;
|
|
await _closeChannelOnly();
|
|
final token = await _storage.readToken();
|
|
if (token == null || token.isEmpty) return;
|
|
try {
|
|
_channel = WebSocketChannel.connect(_wsUri(token));
|
|
} catch (_) {
|
|
_scheduleReconnect();
|
|
return;
|
|
}
|
|
_reconnectAttempt = 0;
|
|
_channel!.stream.listen(
|
|
(raw) {
|
|
try {
|
|
final map = jsonDecode(raw as String) as Map<String, dynamic>;
|
|
final t = map['type'] as String?;
|
|
if (t == 'message' && map['message'] != null) {
|
|
final m = map['message'] as Map<String, dynamic>;
|
|
_incoming.add(MessageModel.fromWs(m));
|
|
} else if (t == 'typing') {
|
|
_side.add(
|
|
WsTypingEvent(
|
|
roomId: map['roomId'] as String,
|
|
userId: map['userId'] as String,
|
|
active: map['active'] as bool? ?? false,
|
|
),
|
|
);
|
|
} else if (t == 'read') {
|
|
_side.add(
|
|
WsReadEvent(
|
|
roomId: map['roomId'] as String,
|
|
userId: map['userId'] as String,
|
|
upToMessageId: map['upToMessageId'] as String,
|
|
),
|
|
);
|
|
}
|
|
} catch (_) {}
|
|
},
|
|
onError: (_) => _onConnectionLost(),
|
|
onDone: _onConnectionLost,
|
|
cancelOnError: false,
|
|
);
|
|
for (final id in _subscribedRooms) {
|
|
_channel?.sink.add(jsonEncode({'type': 'subscribe', 'roomId': id}));
|
|
}
|
|
_flushPendingSends();
|
|
}
|
|
|
|
void _flushPendingSends() {
|
|
if (_channel == null || _pendingSends.isEmpty) return;
|
|
final batch = List<_PendingSend>.from(_pendingSends);
|
|
_pendingSends.clear();
|
|
for (final p in batch) {
|
|
try {
|
|
_channel?.sink.add(
|
|
jsonEncode({'type': 'send', 'roomId': p.roomId, 'body': p.body}),
|
|
);
|
|
} catch (_) {
|
|
_pendingSends.add(p);
|
|
}
|
|
}
|
|
}
|
|
|
|
void _onConnectionLost() {
|
|
if (_disposed) return;
|
|
_scheduleReconnect();
|
|
}
|
|
|
|
void _scheduleReconnect() {
|
|
if (_disposed) return;
|
|
_reconnectTimer?.cancel();
|
|
final ms = min(30000, 500 * (1 << min(_reconnectAttempt, 6)));
|
|
_reconnectAttempt++;
|
|
_reconnectTimer = Timer(Duration(milliseconds: ms), () {
|
|
if (_disposed) return;
|
|
unawaited(connect());
|
|
});
|
|
}
|
|
|
|
/// Logout: cancel reconnect, clear room subscriptions, close socket.
|
|
Future<void> disconnect() async {
|
|
_reconnectTimer?.cancel();
|
|
_reconnectTimer = null;
|
|
_subscribedRooms.clear();
|
|
_pendingSends.clear();
|
|
_reconnectAttempt = 0;
|
|
await _closeChannelOnly();
|
|
}
|
|
|
|
void subscribeRoom(String roomId) {
|
|
_subscribedRooms.add(roomId);
|
|
_channel?.sink.add(jsonEncode({'type': 'subscribe', 'roomId': roomId}));
|
|
}
|
|
|
|
/// Returns false if the message was queued (no socket); true if written to socket.
|
|
void sendTyping(String roomId, bool active) {
|
|
if (_channel == null) return;
|
|
try {
|
|
_channel!.sink.add(
|
|
jsonEncode({'type': 'typing', 'roomId': roomId, 'active': active}),
|
|
);
|
|
} catch (_) {}
|
|
}
|
|
|
|
bool sendMessage(String roomId, String body) {
|
|
if (_channel == null) {
|
|
_pendingSends.add(_PendingSend(roomId, body));
|
|
return false;
|
|
}
|
|
try {
|
|
_channel!.sink.add(
|
|
jsonEncode({'type': 'send', 'roomId': roomId, 'body': body}),
|
|
);
|
|
return true;
|
|
} catch (_) {
|
|
_pendingSends.add(_PendingSend(roomId, body));
|
|
return false;
|
|
}
|
|
}
|
|
|
|
void dispose() {
|
|
_disposed = true;
|
|
_reconnectTimer?.cancel();
|
|
_reconnectTimer = null;
|
|
unawaited(_closeChannelOnly());
|
|
_incoming.close();
|
|
_side.close();
|
|
}
|
|
}
|