Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:io';

import 'package:core/presentation/state/failure.dart';
import 'package:core/presentation/state/success.dart';
Expand Down Expand Up @@ -172,7 +173,7 @@ class MailboxDataSourceImpl extends MailboxDataSource {
StreamController<dartz.Either<Failure, Success>>? onProgressController,
}) {
return Future.sync(() async {
if (PlatformInfo.isWeb) {
if (PlatformInfo.isWeb || Platform.numberOfProcessors == 1) {
return await mailboxAPI.moveFolderContent(
session: session,
accountId: accountId,
Expand Down
210 changes: 96 additions & 114 deletions lib/features/mailbox/data/network/mailbox_isolate_worker.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'dart:async';
import 'dart:io';

import 'package:core/presentation/state/failure.dart';
import 'package:core/presentation/state/success.dart';
Expand Down Expand Up @@ -39,9 +40,8 @@ class MailboxIsolateWorker {

final ThreadAPI _threadApi;
final EmailAPI _emailApi;
final Executor _isolateExecutor;

MailboxIsolateWorker(this._threadApi, this._emailApi, this._isolateExecutor);
MailboxIsolateWorker(this._threadApi, this._emailApi);

Future<List<EmailId>> markAsMailboxRead(
Session session,
Expand All @@ -50,8 +50,8 @@ class MailboxIsolateWorker {
int totalEmailUnread,
StreamController<Either<Failure, Success>> onProgressController
) async {
if (PlatformInfo.isWeb) {
return _handleMarkAsMailboxReadActionOnWeb(
if (PlatformInfo.isWeb || Platform.numberOfProcessors == 1) {
return await _handleMarkAsMailboxReadActionOnMainIsolate(
session,
accountId,
mailboxId,
Expand All @@ -63,108 +63,87 @@ class MailboxIsolateWorker {
throw const CanNotGetRootIsolateToken();
}

final result = await _isolateExecutor.execute(
arg1: MailboxMarkAsReadArguments(
session,
_threadApi,
_emailApi,
accountId,
mailboxId,
rootIsolateToken
),
fun1: _handleMarkAsMailboxReadAction,
notification: (value) {
if (value is List<EmailId>) {
log('MailboxIsolateWorker::markAsMailboxRead(): onUpdateProgress: PERCENT ${value.length / totalEmailUnread}');
onProgressController.add(Right(UpdatingMarkAsMailboxReadState(
mailboxId: mailboxId,
totalUnread: totalEmailUnread,
countRead: value.length)));
}
});
return result;
final args = MailboxMarkAsReadArguments(
session,
_threadApi,
_emailApi,
accountId,
mailboxId,
rootIsolateToken,
);
return await workerManager.executeWithPort<List<EmailId>, int>(
_buildMarkAsReadClosure(args),
onMessage: (countRead) {
log('MailboxIsolateWorker::markAsMailboxRead(): onUpdateProgress: PERCENT ${countRead / totalEmailUnread}');
onProgressController.add(Right(UpdatingMarkAsMailboxReadState(
mailboxId: mailboxId,
totalUnread: totalEmailUnread,
countRead: countRead)));
},
);
}
}

static Future<List<EmailId>> _handleMarkAsMailboxReadAction(
MailboxMarkAsReadArguments args,
TypeSendPort sendPort
MailboxMarkAsReadArguments args,
SendPort sendPort,
) async {
final rootIsolateToken = args.isolateToken;
BackgroundIsolateBinaryMessenger.ensureInitialized(rootIsolateToken);
await HiveCacheConfig.instance.setUp();

List<EmailId> emailIdsCompleted = List.empty(growable: true);
bool mailboxHasEmails = true;
UTCDate? lastReceivedDate;
EmailId? lastEmailId;

while (mailboxHasEmails) {
final emailResponse = await args.threadAPI
.getAllEmail(
args.session,
args.accountId,
limit: UnsignedInt(30),
filter: EmailFilterCondition(
inMailbox: args.mailboxId,
notKeyword: KeyWordIdentifier.emailSeen.value,
before: lastReceivedDate),
sort: <Comparator>{}..add(
EmailComparator(EmailComparatorProperty.receivedAt)
..setIsAscending(false)),
properties: Properties({
EmailProperty.id,
EmailProperty.keywords,
EmailProperty.receivedAt,
}))
.then((response) {
var listEmails = response.emailList;
if (listEmails != null && listEmails.isNotEmpty && lastEmailId != null) {
listEmails = listEmails
.where((email) => email.id != lastEmailId)
.toList();
}
return EmailsResponse(emailList: listEmails, state: response.state);
});
final listEmailUnread = emailResponse.emailList;

log('MailboxIsolateWorker::_handleMarkAsMailboxRead(): listEmailUnread: ${listEmailUnread?.length}');

if (listEmailUnread == null || listEmailUnread.isEmpty) {
mailboxHasEmails = false;
} else {
lastEmailId = listEmailUnread.last.id;
lastReceivedDate = listEmailUnread.last.receivedAt;

final result = await args.emailAPI.markAsRead(
args.session,
args.accountId,
listEmailUnread.listEmailIds,
ReadActions.markAsRead);

log('MailboxIsolateWorker::_handleMarkAsMailboxRead(): MARK_READ: ${result.emailIdsSuccess.length}');
emailIdsCompleted.addAll(result.emailIdsSuccess);
sendPort.send(emailIdsCompleted);
}
}
final emailIdsCompleted = await _executeMarkAsMailboxRead(
threadAPI: args.threadAPI,
emailAPI: args.emailAPI,
session: args.session,
accountId: args.accountId,
mailboxId: args.mailboxId,
onProgress: sendPort.send,
);
log('MailboxIsolateWorker::_handleMarkAsMailboxRead(): TOTAL_READ: ${emailIdsCompleted.length}');
return emailIdsCompleted;
}

Future<List<EmailId>> _handleMarkAsMailboxReadActionOnWeb(
Future<List<EmailId>> _handleMarkAsMailboxReadActionOnMainIsolate(
Session session,
AccountId accountId,
MailboxId mailboxId,
int totalEmailUnread,
StreamController<Either<Failure, Success>> onProgressController
StreamController<Either<Failure, Success>> onProgressController,
) async {
final result = await _executeMarkAsMailboxRead(
threadAPI: _threadApi,
emailAPI: _emailApi,
session: session,
accountId: accountId,
mailboxId: mailboxId,
onProgress: (countRead) => onProgressController.add(Right(
UpdatingMarkAsMailboxReadState(
mailboxId: mailboxId,
totalUnread: totalEmailUnread,
countRead: countRead,
),
)),
);
log('MailboxIsolateWorker::_handleMarkAsMailboxReadActionOnMainIsolate(): TOTAL_READ: ${result.length}');
return result;
}

static Future<List<EmailId>> _executeMarkAsMailboxRead({
required ThreadAPI threadAPI,
required EmailAPI emailAPI,
required Session session,
required AccountId accountId,
required MailboxId mailboxId,
required void Function(int countRead) onProgress,
}) async {
List<EmailId> emailIdsCompleted = List.empty(growable: true);
bool mailboxHasEmails = true;
UTCDate? lastReceivedDate;
EmailId? lastEmailId;

while (mailboxHasEmails) {
final emailResponse = await _threadApi
final emailResponse = await threadAPI
.getAllEmail(
session,
accountId,
Expand Down Expand Up @@ -192,30 +171,27 @@ class MailboxIsolateWorker {
});
final listEmailUnread = emailResponse.emailList;

log('MailboxIsolateWorker::_handleMarkAsMailboxReadActionOnWeb(): listEmailUnread: ${listEmailUnread?.length}');
log('MailboxIsolateWorker::_executeMarkAsMailboxRead(): listEmailUnread: ${listEmailUnread?.length}');

if (listEmailUnread == null || listEmailUnread.isEmpty) {
mailboxHasEmails = false;
} else {
lastEmailId = listEmailUnread.last.id;
lastReceivedDate = listEmailUnread.last.receivedAt;

final result = await _emailApi.markAsRead(
final result = await emailAPI.markAsRead(
session,
accountId,
listEmailUnread.listEmailIds,
ReadActions.markAsRead,
);
log('MailboxIsolateWorker::_handleMarkAsMailboxReadActionOnWeb(): MARK_READ: ${result.emailIdsSuccess.length}');
log('MailboxIsolateWorker::_executeMarkAsMailboxRead(): MARK_READ: ${result.emailIdsSuccess.length}');
emailIdsCompleted.addAll(result.emailIdsSuccess);

onProgressController.add(Right(UpdatingMarkAsMailboxReadState(
mailboxId: mailboxId,
totalUnread: totalEmailUnread,
countRead: emailIdsCompleted.length)));
onProgress(emailIdsCompleted.length);
}
}
log('MailboxIsolateWorker::_handleMarkAsMailboxReadActionOnWeb(): TOTAL_READ: ${emailIdsCompleted.length}');
log('MailboxIsolateWorker::_executeMarkAsMailboxRead(): TOTAL_READ: ${emailIdsCompleted.length}');
return emailIdsCompleted;
}

Expand All @@ -230,29 +206,27 @@ class MailboxIsolateWorker {
throw const CanNotGetRootIsolateToken();
}

final countEmailsCompleted = await _isolateExecutor.execute(
arg1: MoveFolderContentIsolateArguments(
session: session,
accountId: accountId,
threadAPI: _threadApi,
emailAPI: _emailApi,
currentMailboxId: request.mailboxId,
destinationMailboxId: request.destinationMailboxId,
isolateToken: rootIsolateToken,
markAsRead: request.markAsRead,
),
fun1: _moveFolderContentIsolateMethod,
notification: (value) {
if (value is int) {
log('$runtimeType::moveFolderContent(): Progress percent is ${value / request.totalEmails}');
onProgressController?.add(
Right<Failure, Success>(MoveFolderContentProgressState(
request.mailboxId,
value,
request.totalEmails,
)),
);
}
final args = MoveFolderContentIsolateArguments(
session: session,
accountId: accountId,
threadAPI: _threadApi,
emailAPI: _emailApi,
currentMailboxId: request.mailboxId,
destinationMailboxId: request.destinationMailboxId,
isolateToken: rootIsolateToken,
markAsRead: request.markAsRead,
);
final countEmailsCompleted = await workerManager.executeWithPort<int, int>(
_buildMoveFolderClosure(args),
onMessage: (value) {
log('$runtimeType::moveFolderContent(): Progress percent is ${value / request.totalEmails}');
onProgressController?.add(
Right<Failure, Success>(MoveFolderContentProgressState(
request.mailboxId,
value,
request.totalEmails,
)),
);
},
);

Expand All @@ -263,9 +237,17 @@ class MailboxIsolateWorker {
}
}

static Future<List<EmailId>> Function(SendPort) _buildMarkAsReadClosure(
MailboxMarkAsReadArguments args,
) => (sendPort) => _handleMarkAsMailboxReadAction(args, sendPort);

static Future<int> Function(SendPort) _buildMoveFolderClosure(
MoveFolderContentIsolateArguments args,
) => (sendPort) => _moveFolderContentIsolateMethod(args, sendPort);

static Future<int> _moveFolderContentIsolateMethod(
MoveFolderContentIsolateArguments args,
TypeSendPort sendPort,
SendPort sendPort,
) async {
final rootIsolateToken = args.isolateToken;
BackgroundIsolateBinaryMessenger.ensureInitialized(rootIsolateToken);
Expand Down
Loading
Loading