this repo has no description

feat: Implement WebSocket service for real-time notifications and update notification handling

+239 -98
+1 -60
lib/api.dart
··· 30 30 List<Gallery> galleries = []; 31 31 32 32 String get _apiUrl => AppConfig.apiUrl; 33 - String get _wsUrl => AppConfig.wsUrl; 33 + String? getAccessToken() => _accessToken; 34 34 35 35 Future<void> loadToken() async { 36 36 _accessToken = await _storage.read(key: 'access_token'); ··· 1042 1042 } 1043 1043 } 1044 1044 return photoRecords; 1045 - } 1046 - 1047 - /// Connects to a WebSocket and listens for messages. 1048 - /// Uses the same headers as other authenticated requests. 1049 - /// [wsUrl]: The WebSocket URL to connect to. 1050 - /// [onMessage]: Callback for each incoming message. 1051 - /// Returns the WebSocket instance. 1052 - Future<WebSocket?> connectAndListenWs({required void Function(dynamic message) onMessage}) async { 1053 - final session = await auth.getValidSession(); 1054 - if (session == null) { 1055 - appLogger.w('No valid session for WebSocket connection'); 1056 - return null; 1057 - } 1058 - int attempt = 0; 1059 - const int maxRetries = 5; 1060 - Duration delay = const Duration(seconds: 2); 1061 - WebSocket? ws; 1062 - final headers = {'Authorization': 'Bearer $_accessToken', 'Content-Type': 'application/json'}; 1063 - 1064 - late Future<WebSocket?> Function() connect; 1065 - late Future<void> Function() retry; 1066 - 1067 - connect = () async { 1068 - try { 1069 - appLogger.i('Connecting to WebSocket: $_wsUrl (attempt \\${attempt + 1})'); 1070 - ws = await WebSocket.connect(_wsUrl, headers: headers); 1071 - ws!.listen( 1072 - (message) => onMessage(message), 1073 - onError: (error) async { 1074 - appLogger.w('WebSocket error: $error'); 1075 - await retry(); 1076 - }, 1077 - onDone: () async { 1078 - appLogger.i('WebSocket connection closed'); 1079 - await retry(); 1080 - }, 1081 - ); 1082 - appLogger.i('Connected to WebSocket: $_wsUrl'); 1083 - return ws; 1084 - } catch (e) { 1085 - appLogger.e('Failed to connect to WebSocket: $e'); 1086 - await retry(); 1087 - return null; 1088 - } 1089 - }; 1090 - 1091 - retry = () async { 1092 - if (attempt < maxRetries) { 1093 - attempt++; 1094 - appLogger.i('Retrying WebSocket connection in \\${delay.inSeconds} seconds...'); 1095 - await Future.delayed(delay); 1096 - delay *= 2; 1097 - await connect(); 1098 - } else { 1099 - appLogger.e('Max WebSocket retry attempts reached.'); 1100 - } 1101 - }; 1102 - 1103 - return await connect(); 1104 1045 } 1105 1046 1106 1047 /// Notifies the server that the requesting account has seen notifications.
+46 -8
lib/main.dart
··· 8 8 import 'package:grain/auth.dart'; 9 9 import 'package:grain/screens/home_page.dart'; 10 10 import 'package:grain/screens/login_page.dart'; 11 + import 'package:grain/websocket_service.dart'; 11 12 12 13 import 'providers/profile_provider.dart'; 13 14 import 'widgets/skeleton_timeline.dart'; ··· 51 52 State<MyApp> createState() => _MyAppState(); 52 53 } 53 54 54 - class _MyAppState extends State<MyApp> { 55 + class _MyAppState extends State<MyApp> with WidgetsBindingObserver { 55 56 bool isSignedIn = false; 56 57 bool _loading = true; 57 - String? displayName; 58 + WebSocketService? _wsService; 58 59 59 60 @override 60 61 void initState() { 61 62 super.initState(); 63 + WidgetsBinding.instance.addObserver(this); 62 64 _checkToken(); 63 65 } 64 66 67 + @override 68 + void dispose() { 69 + WidgetsBinding.instance.removeObserver(this); 70 + _disconnectWebSocket(); 71 + super.dispose(); 72 + } 73 + 74 + void _connectWebSocket() { 75 + _disconnectWebSocket(); 76 + if (!isSignedIn) return; 77 + final accessToken = apiService.getAccessToken(); 78 + if (accessToken == null) return; 79 + _wsService = WebSocketService( 80 + wsUrl: AppConfig.wsUrl, 81 + accessToken: accessToken, 82 + onMessage: (message) { 83 + // Optionally: handle global messages or trigger provider updates 84 + }, 85 + ); 86 + _wsService!.connect(); 87 + } 88 + 89 + void _disconnectWebSocket() { 90 + _wsService?.disconnect(); 91 + _wsService = null; 92 + } 93 + 94 + @override 95 + void didChangeAppLifecycleState(AppLifecycleState state) { 96 + if (state == AppLifecycleState.resumed && isSignedIn) { 97 + _connectWebSocket(); 98 + } else if (state == AppLifecycleState.paused || state == AppLifecycleState.detached) { 99 + _disconnectWebSocket(); 100 + } 101 + } 102 + 65 103 Future<void> _checkToken() async { 66 104 await apiService.loadToken(); 67 105 bool valid = false; ··· 72 110 await apiService.fetchCurrentUser(); 73 111 valid = true; 74 112 } else { 75 - // Session fetch failed, clear session 76 113 await auth.clearSession(); 77 114 } 78 115 } catch (e) { 79 - // Error fetching session, clear session 80 116 await auth.clearSession(); 81 117 } 82 118 } ··· 84 120 isSignedIn = valid; 85 121 _loading = false; 86 122 }); 123 + if (valid) { 124 + _connectWebSocket(); 125 + } 87 126 } 88 127 89 128 void handleSignIn() async { 90 129 setState(() { 91 130 isSignedIn = true; 92 131 }); 93 - // Fetch current user profile from /oauth/session after login 94 132 appLogger.i('Fetching current user after sign in'); 95 133 await apiService.fetchCurrentUser(); 134 + _connectWebSocket(); 96 135 } 97 136 98 137 void handleSignOut(BuildContext context) async { 99 138 final container = ProviderScope.containerOf(context, listen: false); 100 - await auth.clearSession(); // Clear session data 101 - // Invalidate Riverpod providers for profile state 139 + await auth.clearSession(); 102 140 container.invalidate(profileNotifierProvider); 103 - // Add any other providers you want to invalidate here 104 141 setState(() { 105 142 isSignedIn = false; 106 143 }); 144 + _disconnectWebSocket(); 107 145 } 108 146 109 147 @override
+44 -18
lib/providers/notifications_provider.dart
··· 1 1 import 'dart:async'; 2 2 import 'dart:convert'; 3 - import 'dart:io'; 4 3 5 4 import 'package:flutter_riverpod/flutter_riverpod.dart'; 6 5 import 'package:grain/api.dart'; 6 + import 'package:grain/main.dart'; 7 7 import 'package:grain/models/notification.dart'; 8 + import 'package:grain/websocket_service.dart'; 8 9 9 - final notificationsProvider = StateNotifierProvider<NotificationsNotifier, List<Notification>>(( 10 - ref, 11 - ) { 12 - return NotificationsNotifier(); 13 - }); 10 + final notificationsProvider = 11 + StateNotifierProvider<NotificationsNotifier, AsyncValue<List<Notification>>>( 12 + (ref) => NotificationsNotifier(), 13 + ); 14 14 15 - class NotificationsNotifier extends StateNotifier<List<Notification>> { 16 - NotificationsNotifier() : super([]) { 15 + class NotificationsNotifier extends StateNotifier<AsyncValue<List<Notification>>> { 16 + WebSocketService? _wsService; 17 + 18 + NotificationsNotifier() : super(const AsyncValue.loading()) { 17 19 _connectAndListen(); 18 20 } 19 21 20 - WebSocket? _ws; 21 - StreamSubscription? _wsSubscription; 22 - 23 22 void _connectAndListen() async { 24 - _ws = await apiService.connectAndListenWs( 23 + // Get the current access token and wsUrl from apiService 24 + final accessToken = apiService.hasToken ? apiService.getAccessToken() : null; 25 + final wsUrl = AppConfig.wsUrl; 26 + _wsService = WebSocketService( 27 + wsUrl: wsUrl, 28 + accessToken: accessToken, 25 29 onMessage: (message) async { 26 30 try { 27 31 final data = message is String ? jsonDecode(message) : message; 28 32 if (data is Map<String, dynamic> && data['type'] == 'refresh-notifications') { 29 33 final notifications = await apiService.getNotifications(); 30 - state = notifications; 34 + if (mounted) state = AsyncValue.data(notifications); 31 35 } else { 32 36 final notification = Notification.fromJson(data); 33 - state = [...state, notification]; 37 + if (mounted) { 38 + final current = state.value ?? []; 39 + state = AsyncValue.data([...current, notification]); 40 + } 34 41 } 35 42 } catch (e) { 36 43 // Handle parse error or ignore non-notification messages 37 44 } 38 45 }, 39 46 ); 47 + try { 48 + final notifications = await apiService.getNotifications(); 49 + if (mounted) state = AsyncValue.data(notifications); 50 + } catch (e, st) { 51 + if (mounted) state = AsyncValue.error(e, st); 52 + } 53 + await _wsService!.connect(); 54 + } 55 + 56 + /// Fetch notifications directly from the API (without websocket) 57 + Future<void> fetch() async { 58 + state = const AsyncValue.loading(); 59 + try { 60 + final notifications = await apiService.getNotifications(); 61 + if (mounted) { 62 + state = AsyncValue.data(notifications); 63 + } 64 + } catch (e, st) { 65 + if (mounted) state = AsyncValue.error(e, st); 66 + } 40 67 } 41 68 42 69 /// Marks all notifications as seen both on the server and locally. 43 70 Future<void> updateSeen() async { 44 71 final success = await apiService.updateSeen(); 45 - if (success) { 46 - state = [for (final n in state) n.copyWith(isRead: true)]; 72 + if (success && mounted && state.value != null) { 73 + state = AsyncValue.data([for (final n in state.value!) n.copyWith(isRead: true)]); 47 74 } 48 75 } 49 76 50 77 @override 51 78 void dispose() { 52 - _wsSubscription?.cancel(); 53 - _ws?.close(); 79 + _wsService?.disconnect(); 54 80 super.dispose(); 55 81 } 56 82 }
+74 -11
lib/screens/notifications_page.dart
··· 180 180 ); 181 181 } 182 182 183 + Widget _buildSkeletonTile(BuildContext context) { 184 + final theme = Theme.of(context); 185 + return ListTile( 186 + contentPadding: const EdgeInsets.symmetric(horizontal: 8, vertical: 8), 187 + leading: Container( 188 + width: 40, 189 + height: 40, 190 + decoration: BoxDecoration( 191 + color: theme.colorScheme.surfaceContainerHighest.withAlpha(128), 192 + shape: BoxShape.circle, 193 + ), 194 + ), 195 + title: Container( 196 + width: 120, 197 + height: 16, 198 + color: theme.colorScheme.surfaceContainerHighest.withAlpha(128), 199 + margin: const EdgeInsets.only(bottom: 4), 200 + ), 201 + subtitle: Column( 202 + crossAxisAlignment: CrossAxisAlignment.start, 203 + children: [ 204 + Container( 205 + width: 180, 206 + height: 14, 207 + color: theme.colorScheme.surfaceContainerHighest.withAlpha(128), 208 + margin: const EdgeInsets.only(bottom: 8), 209 + ), 210 + Container( 211 + width: 140, 212 + height: 12, 213 + color: theme.colorScheme.surfaceContainerHighest.withAlpha(128), 214 + ), 215 + ], 216 + ), 217 + isThreeLine: true, 218 + ); 219 + } 220 + 183 221 @override 184 222 Widget build(BuildContext context) { 185 223 final ref = this.ref; 186 224 final theme = Theme.of(context); 187 - final notifications = ref.watch(notificationsProvider); 225 + final notificationsAsync = ref.watch(notificationsProvider); 226 + 227 + final notificationList = notificationsAsync.when( 228 + loading: () => ListView.separated( 229 + itemCount: 6, 230 + separatorBuilder: (context, index) => Divider(height: 1, color: theme.dividerColor), 231 + itemBuilder: (context, index) => _buildSkeletonTile(context), 232 + ), 233 + error: (error, stack) => 234 + Center(child: Text('Failed to load notifications', style: theme.textTheme.bodyMedium)), 235 + data: (notifications) { 236 + if (notifications.isEmpty) { 237 + return Center(child: Text('No notifications yet.', style: theme.textTheme.bodyMedium)); 238 + } else { 239 + return ListView.separated( 240 + itemCount: notifications.length, 241 + separatorBuilder: (context, index) => Divider(height: 1, color: theme.dividerColor), 242 + itemBuilder: (context, index) { 243 + final notification = notifications[index]; 244 + return _buildNotificationTile(context, notification); 245 + }, 246 + ); 247 + } 248 + }, 249 + ); 188 250 189 251 return Scaffold( 190 252 backgroundColor: theme.scaffoldBackgroundColor, 191 - body: notifications.isEmpty 192 - ? Center(child: Text('No notifications yet.', style: theme.textTheme.bodyMedium)) 193 - : ListView.separated( 194 - itemCount: notifications.length, 195 - separatorBuilder: (context, index) => Divider(height: 1, color: theme.dividerColor), 196 - itemBuilder: (context, index) { 197 - final notification = notifications[index]; 198 - return _buildNotificationTile(context, notification); 199 - }, 200 - ), 253 + body: RefreshIndicator( 254 + onRefresh: () async { 255 + await ref.read(notificationsProvider.notifier).fetch(); 256 + }, 257 + child: notificationList is Center 258 + ? ListView( 259 + physics: const AlwaysScrollableScrollPhysics(), 260 + children: [SizedBox(height: 300), notificationList], 261 + ) 262 + : notificationList, 263 + ), 201 264 ); 202 265 } 203 266 }
+70
lib/websocket_service.dart
··· 1 + import 'dart:io'; 2 + 3 + import 'package:grain/app_logger.dart'; 4 + 5 + /// Handles WebSocket connection, reconnection, and message listening. 6 + class WebSocketService { 7 + final String wsUrl; 8 + final String? accessToken; 9 + final void Function(dynamic message) onMessage; 10 + WebSocket? _ws; 11 + bool _shouldReconnect = false; 12 + int _attempt = 0; 13 + static const int _maxRetries = 5; 14 + Duration _delay = const Duration(seconds: 2); 15 + 16 + WebSocketService({required this.wsUrl, required this.accessToken, required this.onMessage}); 17 + 18 + Future<void> connect() async { 19 + _shouldReconnect = true; 20 + _attempt = 0; 21 + _delay = const Duration(seconds: 2); 22 + await _connectInternal(); 23 + } 24 + 25 + Future<void> disconnect() async { 26 + _shouldReconnect = false; 27 + await _ws?.close(); 28 + _ws = null; 29 + } 30 + 31 + Future<void> _connectInternal() async { 32 + if (accessToken == null) { 33 + appLogger.w('No access token for WebSocket connection'); 34 + return; 35 + } 36 + final headers = {'Authorization': 'Bearer $accessToken', 'Content-Type': 'application/json'}; 37 + try { 38 + appLogger.i('Connecting to WebSocket: $wsUrl (attempt ${_attempt + 1})'); 39 + _ws = await WebSocket.connect(wsUrl, headers: headers); 40 + _ws!.listen( 41 + onMessage, 42 + onError: (error) async { 43 + appLogger.w('WebSocket error: $error'); 44 + await _retry(); 45 + }, 46 + onDone: () async { 47 + appLogger.i('WebSocket connection closed'); 48 + await _retry(); 49 + }, 50 + ); 51 + appLogger.i('Connected to WebSocket: $wsUrl'); 52 + } catch (e) { 53 + appLogger.e('Failed to connect to WebSocket: $e'); 54 + await _retry(); 55 + } 56 + } 57 + 58 + Future<void> _retry() async { 59 + if (!_shouldReconnect) return; 60 + if (_attempt < _maxRetries) { 61 + _attempt++; 62 + appLogger.i('Retrying WebSocket connection in ${_delay.inSeconds} seconds...'); 63 + await Future.delayed(_delay); 64 + _delay *= 2; 65 + await _connectInternal(); 66 + } else { 67 + appLogger.e('Max WebSocket retry attempts reached.'); 68 + } 69 + } 70 + }
+4 -1
lib/widgets/bottom_nav_bar.dart
··· 41 41 42 42 // Get unread notifications count 43 43 final notifications = ref.watch(notificationsProvider); 44 - final unreadCount = notifications.where((n) => n.isRead == false).length; 44 + final unreadCount = notifications.maybeWhen( 45 + data: (list) => list.where((n) => n.isRead == false).length, 46 + orElse: () => 0, 47 + ); 45 48 46 49 return Container( 47 50 decoration: BoxDecoration(