Line data Source code
1 : /*
2 : * Famedly Matrix SDK
3 : * Copyright (C) 2019, 2020, 2021 Famedly GmbH
4 : *
5 : * This program is free software: you can redistribute it and/or modify
6 : * it under the terms of the GNU Affero General Public License as
7 : * published by the Free Software Foundation, either version 3 of the
8 : * License, or (at your option) any later version.
9 : *
10 : * This program is distributed in the hope that it will be useful,
11 : * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : * GNU Affero General Public License for more details.
14 : *
15 : * You should have received a copy of the GNU Affero General Public License
16 : * along with this program. If not, see <https://www.gnu.org/licenses/>.
17 : */
18 :
19 : import 'dart:async';
20 : import 'dart:convert';
21 :
22 : import 'package:collection/collection.dart';
23 :
24 : import 'package:matrix/matrix.dart';
25 : import 'package:matrix/src/models/timeline_chunk.dart';
26 :
27 : /// Represents the timeline of a room. The callback [onUpdate] will be triggered
28 : /// automatically. The initial
29 : /// event list will be retreived when created by the `room.getTimeline()` method.
30 :
31 : class Timeline {
32 : final Room room;
33 33 : List<Event> get events => chunk.events;
34 :
35 : /// Map of event ID to map of type to set of aggregated events
36 : final Map<String, Map<String, Set<Event>>> aggregatedEvents = {};
37 :
38 : final void Function()? onUpdate;
39 : final void Function(int index)? onChange;
40 : final void Function(int index)? onInsert;
41 : final void Function(int index)? onRemove;
42 : final void Function()? onNewEvent;
43 :
44 : StreamSubscription<EventUpdate>? sub;
45 : StreamSubscription<SyncUpdate>? roomSub;
46 : StreamSubscription<String>? sessionIdReceivedSub;
47 : StreamSubscription<String>? cancelSendEventSub;
48 : bool isRequestingHistory = false;
49 : bool isRequestingFuture = false;
50 :
51 : bool allowNewEvent = true;
52 : bool isFragmentedTimeline = false;
53 :
54 : final Map<String, Event> _eventCache = {};
55 :
56 : TimelineChunk chunk;
57 :
58 : /// Searches for the event in this timeline. If not
59 : /// found, requests from the server. Requested events
60 : /// are cached.
61 2 : Future<Event?> getEventById(String id) async {
62 4 : for (final event in events) {
63 4 : if (event.eventId == id) return event;
64 : }
65 4 : if (_eventCache.containsKey(id)) return _eventCache[id];
66 4 : final requestedEvent = await room.getEventById(id);
67 : if (requestedEvent == null) return null;
68 4 : _eventCache[id] = requestedEvent;
69 4 : return _eventCache[id];
70 : }
71 :
72 : // When fetching history, we will collect them into the `_historyUpdates` set
73 : // first, and then only process all events at once, once we have the full history.
74 : // This ensures that the entire history fetching only triggers `onUpdate` only *once*,
75 : // even if /sync's complete while history is being proccessed.
76 : bool _collectHistoryUpdates = false;
77 :
78 : // We confirmed, that there are no more events to load from the database.
79 : bool _fetchedAllDatabaseEvents = false;
80 :
81 1 : bool get canRequestHistory {
82 2 : if (events.isEmpty) return true;
83 0 : return !_fetchedAllDatabaseEvents ||
84 0 : (room.prev_batch != null && events.last.type != EventTypes.RoomCreate);
85 : }
86 :
87 : /// Request more previous events from the server. [historyCount] defines how many events should
88 : /// be received maximum. [filter] allows you to specify a [StateFilter] object to filter the
89 : /// events, which can include various criteria such as event types (e.g., [EventTypes.Message])
90 : /// and other state-related filters. The [StateFilter] object will have [lazyLoadMembers] set to
91 : /// true by default, but this can be overridden.
92 : /// This method does not return a value.
93 2 : Future<void> requestHistory({
94 : int historyCount = Room.defaultHistoryCount,
95 : StateFilter? filter,
96 : }) async {
97 2 : if (isRequestingHistory) {
98 : return;
99 : }
100 :
101 2 : isRequestingHistory = true;
102 2 : await _requestEvents(
103 : direction: Direction.b,
104 : historyCount: historyCount,
105 : filter: filter,
106 : );
107 2 : isRequestingHistory = false;
108 : }
109 :
110 0 : bool get canRequestFuture => !allowNewEvent;
111 :
112 : /// Request more future events from the server. [historyCount] defines how many events should
113 : /// be received maximum. [filter] allows you to specify a [StateFilter] object to filter the
114 : /// events, which can include various criteria such as event types (e.g., [EventTypes.Message])
115 : /// and other state-related filters. The [StateFilter] object will have [lazyLoadMembers] set to
116 : /// true by default, but this can be overridden.
117 : /// This method does not return a value.
118 1 : Future<void> requestFuture({
119 : int historyCount = Room.defaultHistoryCount,
120 : StateFilter? filter,
121 : }) async {
122 1 : if (allowNewEvent) {
123 : return; // we shouldn't force to add new events if they will autatically be added
124 : }
125 :
126 1 : if (isRequestingFuture) return;
127 1 : isRequestingFuture = true;
128 1 : await _requestEvents(
129 : direction: Direction.f,
130 : historyCount: historyCount,
131 : filter: filter,
132 : );
133 1 : isRequestingFuture = false;
134 : }
135 :
136 3 : Future<void> _requestEvents({
137 : int historyCount = Room.defaultHistoryCount,
138 : required Direction direction,
139 : StateFilter? filter,
140 : }) async {
141 4 : onUpdate?.call();
142 :
143 : try {
144 : // Look up for events in the database first. With fragmented view, we should delete the database cache
145 3 : final eventsFromStore = isFragmentedTimeline
146 : ? null
147 8 : : await room.client.database?.getEventList(
148 2 : room,
149 4 : start: events.length,
150 : limit: historyCount,
151 : );
152 :
153 2 : if (eventsFromStore != null && eventsFromStore.isNotEmpty) {
154 : // Fetch all users from database we have got here.
155 0 : for (final event in events) {
156 0 : if (room.getState(EventTypes.RoomMember, event.senderId) != null) {
157 : continue;
158 : }
159 : final dbUser =
160 0 : await room.client.database?.getUser(event.senderId, room);
161 0 : if (dbUser != null) room.setState(dbUser);
162 : }
163 :
164 0 : if (direction == Direction.b) {
165 0 : events.addAll(eventsFromStore);
166 0 : final startIndex = events.length - eventsFromStore.length;
167 0 : final endIndex = events.length;
168 0 : for (var i = startIndex; i < endIndex; i++) {
169 0 : onInsert?.call(i);
170 : }
171 : } else {
172 0 : events.insertAll(0, eventsFromStore);
173 0 : final startIndex = eventsFromStore.length;
174 : final endIndex = 0;
175 0 : for (var i = startIndex; i > endIndex; i--) {
176 0 : onInsert?.call(i);
177 : }
178 : }
179 : } else {
180 3 : _fetchedAllDatabaseEvents = true;
181 6 : Logs().i('No more events found in the store. Request from server...');
182 :
183 3 : if (isFragmentedTimeline) {
184 1 : await getRoomEvents(
185 : historyCount: historyCount,
186 : direction: direction,
187 : filter: filter,
188 : );
189 : } else {
190 4 : if (room.prev_batch == null) {
191 0 : Logs().i('No more events to request from server...');
192 : } else {
193 4 : await room.requestHistory(
194 : historyCount: historyCount,
195 : direction: direction,
196 2 : onHistoryReceived: () {
197 2 : _collectHistoryUpdates = true;
198 : },
199 : filter: filter,
200 : );
201 : }
202 : }
203 : }
204 : } finally {
205 3 : _collectHistoryUpdates = false;
206 3 : isRequestingHistory = false;
207 4 : onUpdate?.call();
208 : }
209 : }
210 :
211 : /// Request more previous events from the server. [historyCount] defines how much events should
212 : /// be received maximum. When the request is answered, [onHistoryReceived] will be triggered **before**
213 : /// the historical events will be published in the onEvent stream. [filter] allows you to specify a
214 : /// [StateFilter] object to filter the events, which can include various criteria such as
215 : /// event types (e.g., [EventTypes.Message]) and other state-related filters.
216 : /// The [StateFilter] object will have [lazyLoadMembers] set to true by default, but this can be overridden.
217 : /// Returns the actual count of received timeline events.
218 1 : Future<int> getRoomEvents({
219 : int historyCount = Room.defaultHistoryCount,
220 : direction = Direction.b,
221 : StateFilter? filter,
222 : }) async {
223 : // Ensure stateFilter is not null and set lazyLoadMembers to true if not already set
224 1 : filter ??= StateFilter(lazyLoadMembers: true);
225 1 : filter.lazyLoadMembers ??= true;
226 :
227 3 : final resp = await room.client.getRoomEvents(
228 2 : room.id,
229 : direction,
230 3 : from: direction == Direction.b ? chunk.prevBatch : chunk.nextBatch,
231 : limit: historyCount,
232 2 : filter: jsonEncode(filter.toJson()),
233 : );
234 :
235 1 : if (resp.end == null) {
236 2 : Logs().w('We reached the end of the timeline');
237 : }
238 :
239 2 : final newNextBatch = direction == Direction.b ? resp.start : resp.end;
240 2 : final newPrevBatch = direction == Direction.b ? resp.end : resp.start;
241 :
242 1 : final type = direction == Direction.b
243 : ? EventUpdateType.history
244 : : EventUpdateType.timeline;
245 :
246 3 : if ((resp.state?.length ?? 0) == 0 &&
247 3 : resp.start != resp.end &&
248 : newPrevBatch != null &&
249 : newNextBatch != null) {
250 1 : if (type == EventUpdateType.history) {
251 0 : Logs().w(
252 0 : '[nav] we can still request history prevBatch: $type $newPrevBatch',
253 : );
254 : } else {
255 2 : Logs().w(
256 1 : '[nav] we can still request timeline nextBatch: $type $newNextBatch',
257 : );
258 : }
259 : }
260 :
261 : final newEvents =
262 6 : resp.chunk.map((e) => Event.fromMatrixEvent(e, room)).toList();
263 :
264 1 : if (!allowNewEvent) {
265 3 : if (resp.start == resp.end ||
266 2 : (resp.end == null && direction == Direction.f)) {
267 1 : allowNewEvent = true;
268 : }
269 :
270 1 : if (allowNewEvent) {
271 2 : Logs().d('We now allow sync update into the timeline.');
272 1 : newEvents.addAll(
273 5 : await room.client.database?.getEventList(room, onlySending: true) ??
274 0 : [],
275 : );
276 : }
277 : }
278 :
279 : // Try to decrypt encrypted events but don't update the database.
280 2 : if (room.encrypted && room.client.encryptionEnabled) {
281 0 : for (var i = 0; i < newEvents.length; i++) {
282 0 : if (newEvents[i].type == EventTypes.Encrypted) {
283 0 : newEvents[i] = await room.client.encryption!.decryptRoomEvent(
284 0 : newEvents[i],
285 : );
286 : }
287 : }
288 : }
289 :
290 : // update chunk anchors
291 1 : if (type == EventUpdateType.history) {
292 0 : chunk.prevBatch = newPrevBatch ?? '';
293 :
294 0 : final offset = chunk.events.length;
295 :
296 0 : chunk.events.addAll(newEvents);
297 :
298 0 : for (var i = 0; i < newEvents.length; i++) {
299 0 : onInsert?.call(i + offset);
300 : }
301 : } else {
302 2 : chunk.nextBatch = newNextBatch ?? '';
303 4 : chunk.events.insertAll(0, newEvents.reversed);
304 :
305 3 : for (var i = 0; i < newEvents.length; i++) {
306 2 : onInsert?.call(i);
307 : }
308 : }
309 :
310 1 : if (onUpdate != null) {
311 2 : onUpdate!();
312 : }
313 2 : return resp.chunk.length;
314 : }
315 :
316 11 : Timeline({
317 : required this.room,
318 : this.onUpdate,
319 : this.onChange,
320 : this.onInsert,
321 : this.onRemove,
322 : this.onNewEvent,
323 : required this.chunk,
324 : }) {
325 77 : sub = room.client.onEvent.stream.listen(_handleEventUpdate);
326 :
327 : // If the timeline is limited we want to clear our events cache
328 55 : roomSub = room.client.onSync.stream
329 73 : .where((sync) => sync.rooms?.join?[room.id]?.timeline?.limited == true)
330 22 : .listen(_removeEventsNotInThisSync);
331 :
332 11 : sessionIdReceivedSub =
333 55 : room.onSessionKeyReceived.stream.listen(_sessionKeyReceived);
334 11 : cancelSendEventSub =
335 66 : room.client.onCancelSendEvent.stream.listen(_cleanUpCancelledEvent);
336 :
337 : // we want to populate our aggregated events
338 20 : for (final e in events) {
339 9 : addAggregatedEvent(e);
340 : }
341 :
342 : // we are using a fragmented timeline
343 33 : if (chunk.nextBatch != '') {
344 1 : allowNewEvent = false;
345 1 : isFragmentedTimeline = true;
346 : // fragmented timelines never read from the database.
347 1 : _fetchedAllDatabaseEvents = true;
348 : }
349 : }
350 :
351 4 : void _cleanUpCancelledEvent(String eventId) {
352 4 : final i = _findEvent(event_id: eventId);
353 12 : if (i < events.length) {
354 12 : removeAggregatedEvent(events[i]);
355 8 : events.removeAt(i);
356 6 : onRemove?.call(i);
357 6 : onUpdate?.call();
358 : }
359 : }
360 :
361 : /// Removes all entries from [events] which are not in this SyncUpdate.
362 2 : void _removeEventsNotInThisSync(SyncUpdate sync) {
363 15 : final newSyncEvents = sync.rooms?.join?[room.id]?.timeline?.events ?? [];
364 4 : final keepEventIds = newSyncEvents.map((e) => e.eventId);
365 7 : events.removeWhere((e) => !keepEventIds.contains(e.eventId));
366 : }
367 :
368 : /// Don't forget to call this before you dismiss this object!
369 0 : void cancelSubscriptions() {
370 : // ignore: discarded_futures
371 0 : sub?.cancel();
372 : // ignore: discarded_futures
373 0 : roomSub?.cancel();
374 : // ignore: discarded_futures
375 0 : sessionIdReceivedSub?.cancel();
376 : // ignore: discarded_futures
377 0 : cancelSendEventSub?.cancel();
378 : }
379 :
380 2 : void _sessionKeyReceived(String sessionId) async {
381 : var decryptAtLeastOneEvent = false;
382 2 : Future<void> decryptFn() async {
383 6 : final encryption = room.client.encryption;
384 6 : if (!room.client.encryptionEnabled || encryption == null) {
385 : return;
386 : }
387 7 : for (var i = 0; i < events.length; i++) {
388 4 : if (events[i].type == EventTypes.Encrypted &&
389 4 : events[i].messageType == MessageTypes.BadEncrypted &&
390 0 : events[i].content['session_id'] == sessionId) {
391 0 : events[i] = await encryption.decryptRoomEvent(
392 0 : events[i],
393 : store: true,
394 : updateType: EventUpdateType.history,
395 : );
396 0 : addAggregatedEvent(events[i]);
397 0 : onChange?.call(i);
398 0 : if (events[i].type != EventTypes.Encrypted) {
399 : decryptAtLeastOneEvent = true;
400 : }
401 : }
402 : }
403 : }
404 :
405 6 : if (room.client.database != null) {
406 8 : await room.client.database?.transaction(decryptFn);
407 : } else {
408 0 : await decryptFn();
409 : }
410 0 : if (decryptAtLeastOneEvent) onUpdate?.call();
411 : }
412 :
413 : /// Request the keys for undecryptable events of this timeline
414 0 : void requestKeys({
415 : bool tryOnlineBackup = true,
416 : bool onlineKeyBackupOnly = true,
417 : }) {
418 0 : for (final event in events) {
419 0 : if (event.type == EventTypes.Encrypted &&
420 0 : event.messageType == MessageTypes.BadEncrypted &&
421 0 : event.content['can_request_session'] == true) {
422 0 : final sessionId = event.content.tryGet<String>('session_id');
423 0 : final senderKey = event.content.tryGet<String>('sender_key');
424 : if (sessionId != null && senderKey != null) {
425 0 : room.client.encryption?.keyManager.maybeAutoRequest(
426 0 : room.id,
427 : sessionId,
428 : senderKey,
429 : tryOnlineBackup: tryOnlineBackup,
430 : onlineKeyBackupOnly: onlineKeyBackupOnly,
431 : );
432 : }
433 : }
434 : }
435 : }
436 :
437 : /// Set the read marker to the last synced event in this timeline.
438 2 : Future<void> setReadMarker({String? eventId, bool? public}) async {
439 : eventId ??=
440 12 : events.firstWhereOrNull((event) => event.status.isSynced)?.eventId;
441 : if (eventId == null) return;
442 4 : return room.setReadMarker(eventId, mRead: eventId, public: public);
443 : }
444 :
445 9 : int _findEvent({String? event_id, String? unsigned_txid}) {
446 : // we want to find any existing event where either the passed event_id or the passed unsigned_txid
447 : // matches either the event_id or transaction_id of the existing event.
448 : // For that we create two sets, searchNeedle, what we search, and searchHaystack, where we check if there is a match.
449 : // Now, after having these two sets, if the intersect between them is non-empty, we know that we have at least one match in one pair,
450 : // thus meaning we found our element.
451 : final searchNeedle = <String>{};
452 : if (event_id != null) {
453 9 : searchNeedle.add(event_id);
454 : }
455 : if (unsigned_txid != null) {
456 6 : searchNeedle.add(unsigned_txid);
457 : }
458 : int i;
459 36 : for (i = 0; i < events.length; i++) {
460 27 : final searchHaystack = <String>{events[i].eventId};
461 :
462 36 : final txnid = events[i].unsigned?.tryGet<String>('transaction_id');
463 : if (txnid != null) {
464 6 : searchHaystack.add(txnid);
465 : }
466 18 : if (searchNeedle.intersection(searchHaystack).isNotEmpty) {
467 : break;
468 : }
469 : }
470 : return i;
471 : }
472 :
473 6 : void _removeEventFromSet(Set<Event> eventSet, Event event) {
474 6 : eventSet.removeWhere(
475 6 : (e) =>
476 12 : e.matchesEventOrTransactionId(event.eventId) ||
477 6 : event.unsigned != null &&
478 6 : e.matchesEventOrTransactionId(
479 12 : event.unsigned?.tryGet<String>('transaction_id'),
480 : ),
481 : );
482 : }
483 :
484 11 : void addAggregatedEvent(Event event) {
485 : // we want to add an event to the aggregation tree
486 11 : final relationshipType = event.relationshipType;
487 11 : final relationshipEventId = event.relationshipEventId;
488 : if (relationshipType == null || relationshipEventId == null) {
489 : return; // nothing to do
490 : }
491 12 : final events = (aggregatedEvents[relationshipEventId] ??=
492 10 : <String, Set<Event>>{})[relationshipType] ??= <Event>{};
493 : // remove a potential old event
494 6 : _removeEventFromSet(events, event);
495 : // add the new one
496 6 : events.add(event);
497 6 : if (onChange != null) {
498 0 : final index = _findEvent(event_id: relationshipEventId);
499 0 : onChange?.call(index);
500 : }
501 : }
502 :
503 6 : void removeAggregatedEvent(Event event) {
504 18 : aggregatedEvents.remove(event.eventId);
505 6 : if (event.unsigned != null) {
506 24 : aggregatedEvents.remove(event.unsigned?['transaction_id']);
507 : }
508 16 : for (final types in aggregatedEvents.values) {
509 8 : for (final events in types.values) {
510 4 : _removeEventFromSet(events, event);
511 : }
512 : }
513 : }
514 :
515 9 : void _handleEventUpdate(EventUpdate eventUpdate, {bool update = true}) {
516 : try {
517 36 : if (eventUpdate.roomID != room.id) return;
518 :
519 18 : if (eventUpdate.type != EventUpdateType.timeline &&
520 8 : eventUpdate.type != EventUpdateType.history) {
521 : return;
522 : }
523 :
524 18 : if (eventUpdate.type == EventUpdateType.timeline) {
525 9 : onNewEvent?.call();
526 : }
527 :
528 9 : if (!allowNewEvent) return;
529 :
530 9 : final status = eventStatusFromInt(
531 18 : eventUpdate.content['status'] ??
532 15 : (eventUpdate.content['unsigned'] is Map<String, dynamic>
533 15 : ? eventUpdate.content['unsigned'][messageSendingStatusKey]
534 : : null) ??
535 4 : EventStatus.synced.intValue,
536 : );
537 :
538 9 : final i = _findEvent(
539 18 : event_id: eventUpdate.content['event_id'],
540 27 : unsigned_txid: eventUpdate.content['unsigned'] is Map
541 27 : ? eventUpdate.content['unsigned']['transaction_id']
542 : : null,
543 : );
544 :
545 27 : if (i < events.length) {
546 : // if the old status is larger than the new one, we also want to preserve the old status
547 27 : final oldStatus = events[i].status;
548 27 : events[i] = Event.fromJson(
549 9 : eventUpdate.content,
550 9 : room,
551 : );
552 : // do we preserve the status? we should allow 0 -> -1 updates and status increases
553 18 : if ((latestEventStatus(status, oldStatus) == oldStatus) &&
554 11 : !(status.isError && oldStatus.isSending)) {
555 21 : events[i].status = oldStatus;
556 : }
557 27 : addAggregatedEvent(events[i]);
558 11 : onChange?.call(i);
559 : } else {
560 8 : final newEvent = Event.fromJson(
561 8 : eventUpdate.content,
562 8 : room,
563 : );
564 :
565 16 : if (eventUpdate.type == EventUpdateType.history &&
566 6 : events.indexWhere(
567 15 : (e) => e.eventId == eventUpdate.content['event_id'],
568 3 : ) !=
569 3 : -1) {
570 : return;
571 : }
572 16 : var index = events.length;
573 16 : if (eventUpdate.type == EventUpdateType.history) {
574 6 : events.add(newEvent);
575 : } else {
576 12 : index = events.firstIndexWhereNotError;
577 12 : events.insert(index, newEvent);
578 : }
579 12 : onInsert?.call(index);
580 :
581 8 : addAggregatedEvent(newEvent);
582 : }
583 :
584 : // Handle redaction events
585 27 : if (eventUpdate.content['type'] == EventTypes.Redaction) {
586 : final index =
587 9 : _findEvent(event_id: eventUpdate.content.tryGet<String>('redacts'));
588 9 : if (index < events.length) {
589 3 : removeAggregatedEvent(events[index]);
590 :
591 : // Is the redacted event a reaction? Then update the event this
592 : // belongs to:
593 1 : if (onChange != null) {
594 3 : final relationshipEventId = events[index].relationshipEventId;
595 : if (relationshipEventId != null) {
596 0 : onChange?.call(_findEvent(event_id: relationshipEventId));
597 : return;
598 : }
599 : }
600 :
601 3 : events[index].setRedactionEvent(
602 1 : Event.fromJson(
603 1 : eventUpdate.content,
604 1 : room,
605 : ),
606 : );
607 2 : onChange?.call(index);
608 : }
609 : }
610 :
611 9 : if (update && !_collectHistoryUpdates) {
612 11 : onUpdate?.call();
613 : }
614 : } catch (e, s) {
615 0 : Logs().w('Handle event update failed', e, s);
616 : }
617 : }
618 :
619 0 : @Deprecated('Use [startSearch] instead.')
620 : Stream<List<Event>> searchEvent({
621 : String? searchTerm,
622 : int requestHistoryCount = 100,
623 : int maxHistoryRequests = 10,
624 : String? sinceEventId,
625 : int? limit,
626 : bool Function(Event)? searchFunc,
627 : }) =>
628 0 : startSearch(
629 : searchTerm: searchTerm,
630 : requestHistoryCount: requestHistoryCount,
631 : maxHistoryRequests: maxHistoryRequests,
632 : // ignore: deprecated_member_use_from_same_package
633 : sinceEventId: sinceEventId,
634 : limit: limit,
635 : searchFunc: searchFunc,
636 0 : ).map((result) => result.$1);
637 :
638 : /// Searches [searchTerm] in this timeline. It first searches in the
639 : /// cache, then in the database and then on the server. The search can
640 : /// take a while, which is why this returns a stream so the already found
641 : /// events can already be displayed.
642 : /// Override the [searchFunc] if you need another search. This will then
643 : /// ignore [searchTerm].
644 : /// Returns the List of Events and the next prevBatch at the end of the
645 : /// search.
646 0 : Stream<(List<Event>, String?)> startSearch({
647 : String? searchTerm,
648 : int requestHistoryCount = 100,
649 : int maxHistoryRequests = 10,
650 : String? prevBatch,
651 : @Deprecated('Use [prevBatch] instead.') String? sinceEventId,
652 : int? limit,
653 : bool Function(Event)? searchFunc,
654 : }) async* {
655 0 : assert(searchTerm != null || searchFunc != null);
656 0 : searchFunc ??= (event) =>
657 0 : event.body.toLowerCase().contains(searchTerm?.toLowerCase() ?? '');
658 0 : final found = <Event>[];
659 :
660 : if (sinceEventId == null) {
661 : // Search locally
662 0 : for (final event in events) {
663 0 : if (searchFunc(event)) {
664 0 : yield (found..add(event), null);
665 : }
666 : }
667 :
668 : // Search in database
669 0 : var start = events.length;
670 : while (true) {
671 0 : final eventsFromStore = await room.client.database?.getEventList(
672 0 : room,
673 : start: start,
674 : limit: requestHistoryCount,
675 : ) ??
676 0 : [];
677 0 : if (eventsFromStore.isEmpty) break;
678 0 : start += eventsFromStore.length;
679 0 : for (final event in eventsFromStore) {
680 0 : if (searchFunc(event)) {
681 0 : yield (found..add(event), null);
682 : }
683 : }
684 : }
685 : }
686 :
687 : // Search on the server
688 0 : prevBatch ??= room.prev_batch;
689 : if (sinceEventId != null) {
690 : prevBatch =
691 0 : (await room.client.getEventContext(room.id, sinceEventId)).end;
692 : }
693 0 : final encryption = room.client.encryption;
694 0 : for (var i = 0; i < maxHistoryRequests; i++) {
695 : if (prevBatch == null) break;
696 0 : if (limit != null && found.length >= limit) break;
697 : try {
698 0 : final resp = await room.client.getRoomEvents(
699 0 : room.id,
700 : Direction.b,
701 : from: prevBatch,
702 : limit: requestHistoryCount,
703 0 : filter: jsonEncode(StateFilter(lazyLoadMembers: true).toJson()),
704 : );
705 0 : for (final matrixEvent in resp.chunk) {
706 0 : var event = Event.fromMatrixEvent(matrixEvent, room);
707 0 : if (event.type == EventTypes.Encrypted && encryption != null) {
708 0 : event = await encryption.decryptRoomEvent(event);
709 0 : if (event.type == EventTypes.Encrypted &&
710 0 : event.messageType == MessageTypes.BadEncrypted &&
711 0 : event.content['can_request_session'] == true) {
712 : // Await requestKey() here to ensure decrypted message bodies
713 0 : await event.requestKey();
714 : }
715 : }
716 0 : if (searchFunc(event)) {
717 0 : yield (found..add(event), resp.end);
718 0 : if (limit != null && found.length >= limit) break;
719 : }
720 : }
721 0 : prevBatch = resp.end;
722 : // We are at the beginning of the room
723 0 : if (resp.chunk.length < requestHistoryCount) break;
724 0 : } on MatrixException catch (e) {
725 : // We have no permission anymore to request the history
726 0 : if (e.error == MatrixError.M_FORBIDDEN) {
727 : break;
728 : }
729 : rethrow;
730 : }
731 : }
732 : return;
733 : }
734 : }
735 :
736 : extension on List<Event> {
737 6 : int get firstIndexWhereNotError {
738 6 : if (isEmpty) return 0;
739 24 : final index = indexWhere((event) => !event.status.isError);
740 13 : if (index == -1) return length;
741 : return index;
742 : }
743 : }
|