// backgroundJobs provides functions to start and stop the background jobs loop // This is a loop that runs various jobs in the background periodically // The backgroundJobs loop must be started when a user signs in, and stopped when they sign out package backgroundJobs // TODO: Add these jobs: // -Remove old moderator reviews (where moderator has updated their review)(within reviews list) // -Prune old profiles (if they are outdated and not reported/moderated with ban reviews) // -Import broadcasted reviews and profiles into database (needed if restoring from old device/database is deleted) // -Broadcast content (rebroadcast content in the background on a set period) // -Find reviews with cipher keys that do not hash to the message cipherKeyHash, messages that are not decryptable // when using a cipherKey that hashes to the message's cipherKeyHash, and messages that are invalid upon being decrypted. // Ban the moderators who approved the messages (if ModeratorMode is enabled) // Also, automatically ban these kinds of messages. // -Delete messages whose metadata we already have (for moderation, if HostMode is disabled and ModeratorMode is enabled) // -Prune invalid message reports (whose cipher key's hash does not match message's cipherKeyHash) // -Prune unfunded/expired identities/profiles/reports/messages // -Prune old profiles/reviews once they have been replaced, if we know they are not reported or banned // We have to keep content that has been banned for some time, even if it has been replaced with newer content by its author // This is necessary so the moderators can review the content // An example is a mate user who shares unruleful content in a profile, then changes their profile to something ruleful // -Prune content we have already reviewed (in moderatorMode) // Prune user profiles who no longer fulfill my downloads criteria (if space is running out and host/moderator mode is disabled) // -Delete a mate user's older profiles if they have a newer profile that does not // fulfill our criteria, even if that newer profile is not viewable yet (for users who are not within our host/moderator range) // -Prune database of different networkType content // -Moderators should automatically ban other moderators who review content from a different networkType // For example, a review is created on Mainnet for a message which belongs to Testnet1 // The Seekia app will not allow this to happen, so any moderator who does it must be malicious. //TODO: If a user disables a mode, we should be able to stop all of the jobs associated with that mode // We will do this by using the checkIfStopped function which we will pass to all of the networkJobs // We will also use the checkIfStopped function to stop networkJobs upon application closure and user signout. import "seekia/internal/appMemory" import "seekia/internal/databaseJobs" import "seekia/internal/helpers" import "seekia/internal/logger" import "seekia/internal/moderation/bannedModeratorConsensus" import "seekia/internal/moderation/enabledModerators" import "seekia/internal/moderation/verdictHistory" import "seekia/internal/myRanges" import "seekia/internal/mySettings" import "seekia/internal/network/appNetworkType/getAppNetworkType" import "seekia/internal/network/enabledHosts" import "seekia/internal/network/networkJobs" import "seekia/internal/network/peerServer" import "errors" import "sync" import "time" var loopIsRunningMutex sync.RWMutex var loopIsRunning bool var taskCompletionTimesMapMutex sync.RWMutex // Map Structure: Task name -> Last time task was completed var taskCompletionTimesMap map[string]int64 = make(map[string]int64) var runningTasksMapMutex sync.RWMutex // Map Structure: Task Name -> Task is running var runningTasksMap map[string]bool = make(map[string]bool) func setLoopIsRunningStatus(newStatus bool){ loopIsRunningMutex.Lock() loopIsRunning = newStatus loopIsRunningMutex.Unlock() } func StopBackgroundJobs()error{ //TODO // This needs to be run whenever the user is changed // This will be tricky to implement, because some tasks take a long time // We need to add isStopped checking into many functions such as sendRequests // This function should wait for all tasks to stop. setLoopIsRunningStatus(false) return nil } func StartBackgroundJobs()error{ exists, currentUserName := appMemory.GetMemoryEntry("AppUser") if (exists == false){ return errors.New("StartBackgroundJobs called when appUser is not signed in.") } checkIfUserHasChanged := func()bool{ exists, userName := appMemory.GetMemoryEntry("AppUser") if (exists == false || currentUserName != userName){ return true } return false } appNetworkType, err := getAppNetworkType.GetAppNetworkType() if (err != nil) { return err } checkIfAppNetworkTypeHasChanged := func()(bool, error){ currentAppNetworkType, err := getAppNetworkType.GetAppNetworkType() if (err != nil) { return false, err } if (appNetworkType != currentAppNetworkType){ return true, nil } return false, nil } setLoopIsRunningStatus(true) runBackgroundJobsLoop := func(){ //TODO: Stagger jobs, so we don't start them all at the same time // If we start them all at the same time, it could result in fingerprinting and privacy leaks for{ userHasChanged := checkIfUserHasChanged() if (userHasChanged == true){ // This should not happen // User should only be changed after this loop has stopped logger.AddLogError("BackgroundJobs", errors.New("App user changed before backgroundJobs loop is stopped.")) return } appNetworkTypeHasChanged, err := checkIfAppNetworkTypeHasChanged() if (err != nil){ logger.AddLogError("BackgroundJobs", err) return } if (appNetworkTypeHasChanged == true){ // This should not happen // App network type should only be changed after this loop has stopped logger.AddLogError("BackgroundJobs", errors.New("App network type changed before backgroundJobs loop is stopped.")) return } type taskStruct struct{ // A task name is a jobName with a suffix representing the task process TaskName string // The name of the Job which this task will run JobName string // Seconds between each completed run of the task to wait before we run it again TimeBetweenTasks int } // For example, _1 is the first process, _2 is the second, etc... getTasksList := func()([]taskStruct, error){ tasksList := make([]taskStruct, 0) addTasksToList := func(jobName string, timeBetweenTasks int, numberOfProcesses int){ for i:=1; i <= numberOfProcesses; i++{ processString := helpers.ConvertIntToString(i) taskName := jobName + "_" + processString newTaskObject := taskStruct{ TaskName: taskName, JobName: jobName, TimeBetweenTasks: timeBetweenTasks, } tasksList = append(tasksList, newTaskObject) } } //TODO: Choose better timeBetweenTasks and numberOfProcesses values addTasksToList("UpdateEnabledHosts", 60, 1) addTasksToList("DownloadParameters", 120, 1) // Database Jobs: addTasksToList("UpdateDatabaseMateIdentityProfilesLists", 300, 1) addTasksToList("UpdateDatabaseHostIdentityProfilesLists", 300, 1) addTasksToList("UpdateDatabaseModeratorIdentityProfilesLists", 300, 1) addTasksToList("PruneMateProfileMetadata", 300, 1) addTasksToList("PruneHostProfileMetadata", 300, 1) addTasksToList("PruneModeratorProfileMetadata", 300, 1) addTasksToList("PruneMessageMetadata", 300, 1) // Host Jobs: getHostModeIsEnabledStatus := func()(bool, error){ exists, hostModeStatus, err := mySettings.GetSetting("HostModeOnOffStatus") if (err != nil) { return false, err } if (exists == true && hostModeStatus == "On"){ return true, nil } return false, nil } hostModeIsEnabled, err := getHostModeIsEnabledStatus() if (err != nil){ return nil, err } if (hostModeIsEnabled == true){ addTasksToList("AdjustMyHostRanges", 120, 1) addTasksToList("DownloadMateProfilesToHost", 60, 1) addTasksToList("DownloadHostProfilesToHost", 60, 1) addTasksToList("DownloadModeratorProfilesToHost", 60, 1) addTasksToList("DownloadMessagesToHost", 60, 1) addTasksToList("DownloadReviewsToHost", 60, 1) addTasksToList("DownloadReportsToHost", 60, 1) addTasksToList("StartPeerServer", 3, 1) } else { addTasksToList("StopPeerServer", 3, 1) } // Moderator Jobs: getModeratorModeIsEnabledStatus := func()(bool, error){ exists, moderatorModeStatus, err := mySettings.GetSetting("ModeratorModeOnOffStatus") if (err != nil){ return false, err } if (exists == true && moderatorModeStatus == "On"){ return true, nil } return false, nil } moderatorModeIsEnabled, err := getModeratorModeIsEnabledStatus() if (err != nil) { return nil, err } if (moderatorModeIsEnabled == true){ addTasksToList("AdjustMyModeratorRanges", 120, 1) } if (hostModeIsEnabled == true || moderatorModeIsEnabled == true){ addTasksToList("UpdateEnabledModerators", 60, 1) addTasksToList("UpdateBannedModerators", 60, 1) addTasksToList("RecordIdentityVerdictHistories", 120, 1) addTasksToList("RecordProfileVerdictHistories", 120, 1) addTasksToList("RecordMessageVerdictHistories", 120, 1) } addTasksToList("DownloadMyMateMessages", 1, 3) if (moderatorModeIsEnabled == true){ addTasksToList("DownloadMyModeratorMessages", 1, 3) } //TODO: Add tasks from networkJobs.go, and more tasks: // networkJobs.DownloadAllNewestViewableUserProfiles // networkJobs.DownloadMateProfilesToBrowse // networkJobs.DownloadMateOutlierProfiles // networkJobs.DownloadProfilesToModerate // networkJobs.DownloadMessagesToModerate // networkJobs.DownloadMyInboxMessages // networkJobs.DownloadModeratorIdentityBanningReviews // networkJobs.DownloadIdentityReviewsToHost // networkJobs.DownloadMessageReviewsToHost // networkJobs.DownloadIdentityReviewsForModeration // networkJobs.DownloadMessageReviewsForModeration // networkJobs.DownloadIdentityReportsToHost // networkJobs.DownloadMessageReportsToHost // networkJobs.DownloadIdentityReportsForModeration // networkJobs.DownloadMessageReportsForModeration // networkJobs.DownloadHostViewableStatuses // networkJobs.DownloadModeratorProfileViewableStatuses // networkJobs.DownloadMateViewableStatusesForBrowsing // networkJobs.DownloadMateOutlierViewableStatuses // If local blockchain is not enabled and host/moderator mode is enabled: // networkJobs.DownloadModeratorIdentityDeposits // myMessageQueue.AttemptToSendMessagesInQueue // myBroadcasts.PruneMyBroadcastedReviews return tasksList, nil } tasksList, err := getTasksList() if (err != nil) { logger.AddLogError("BackgroundJobs", err) return } for _, taskObject := range tasksList{ taskName := taskObject.TaskName jobName := taskObject.JobName taskWaitTime := taskObject.TimeBetweenTasks runningTasksMapMutex.RLock() taskIsRunning, exists := runningTasksMap[taskName] runningTasksMapMutex.RUnlock() if (exists == true && taskIsRunning == true){ // Task is already running. // We skip it. continue } taskCompletionTimesMapMutex.RLock() taskLastCompletionTime, exists := taskCompletionTimesMap[taskName] taskCompletionTimesMapMutex.RUnlock() if (exists == true){ currentTime := time.Now().Unix() timeElapsedFromLastCompletion := currentTime - taskLastCompletionTime if (timeElapsedFromLastCompletion < int64(taskWaitTime)){ continue } } // Now we run the task // We get the task's job function //TODO: Add a CheckIfStoppedFunction that we pass to many jobs // This will return true if the user is signing out/closing Seekia getJobFunction := func()(func()error, error){ if (jobName == "UpdateEnabledModerators"){ jobFunction := func()error{ err := enabledModerators.UpdateEnabledModeratorsList(appNetworkType) return err } return jobFunction, nil } if (jobName == "UpdateBannedModerators"){ jobFunction := func()error{ err := bannedModeratorConsensus.UpdateBannedModeratorsList(appNetworkType) return err } return jobFunction, nil } if (jobName == "UpdateEnabledHosts"){ jobFunction := func()error{ err := enabledHosts.UpdateEnabledHostsList(appNetworkType) return err } return jobFunction, nil } if (jobName == "AdjustMyHostRanges"){ jobFunction := func()error{ err := myRanges.AdjustMyRanges("Host") return err } return jobFunction, nil } if (jobName == "AdjustMyModeratorRanges"){ jobFunction := func()error{ err := myRanges.AdjustMyRanges("Moderator") return err } return jobFunction, nil } if (jobName == "UpdateDatabaseMateIdentityProfilesLists"){ jobFunction := func()error{ err := databaseJobs.UpdateDatabaseIdentityProfilesLists("Mate") return err } return jobFunction, nil } if (jobName == "UpdateDatabaseHostIdentityProfilesLists"){ jobFunction := func()error{ err := databaseJobs.UpdateDatabaseIdentityProfilesLists("Host") return err } return jobFunction, nil } if (jobName == "UpdateDatabaseModeratorIdentityProfilesLists"){ jobFunction := func()error{ err := databaseJobs.UpdateDatabaseIdentityProfilesLists("Moderator") return err } return jobFunction, nil } if (jobName == "PruneMateProfileMetadata"){ jobFunction := func()error{ err := databaseJobs.PruneProfileMetadata("Mate") return err } return jobFunction, nil } if (jobName == "PruneHostProfileMetadata"){ jobFunction := func()error{ err := databaseJobs.PruneProfileMetadata("Host") return err } return jobFunction, nil } if (jobName == "PruneModeratorProfileMetadata"){ jobFunction := func()error{ err := databaseJobs.PruneProfileMetadata("Moderator") return err } return jobFunction, nil } if (jobName == "PruneMessageMetadata"){ jobFunction := databaseJobs.PruneMessageMetadata return jobFunction, nil } if (jobName == "DownloadParameters"){ jobFunction := func()error{ err := networkJobs.DownloadParameters(appNetworkType) return err } return jobFunction, nil } if (jobName == "DownloadMateProfilesToHost"){ jobFunction := func()error{ err := networkJobs.DownloadProfilesToHost("Mate", appNetworkType) return err } return jobFunction, nil } if (jobName == "DownloadHostProfilesToHost"){ jobFunction := func()error{ err := networkJobs.DownloadProfilesToHost("Host", appNetworkType) return err } return jobFunction, nil } if (jobName == "DownloadModeratorProfilesToHost"){ jobFunction := func()error{ err := networkJobs.DownloadProfilesToHost("Moderator", appNetworkType) return err } return jobFunction, nil } if (jobName == "DownloadMessagesToHost"){ jobFunction := func()error{ err := networkJobs.DownloadMessagesToHost(appNetworkType) return err } return jobFunction, nil } if (jobName == "StartPeerServer"){ jobFunction := peerServer.StartPeerServer return jobFunction, nil } if (jobName == "StopPeerServer"){ jobFunction := peerServer.StopPeerServer return jobFunction, nil } if (jobName == "RecordIdentityVerdictHistories"){ jobFunction := func()error{ err := verdictHistory.RecordIdentityVerdictsToHistoryMap(appNetworkType) return err } return jobFunction, nil } if (jobName == "RecordProfileVerdictHistories"){ jobFunction := func()error{ err := verdictHistory.RecordProfileVerdictsToHistoryMap(appNetworkType) return err } return jobFunction, nil } if (jobName == "RecordMessageVerdictHistories"){ jobFunction := func()error{ err := verdictHistory.RecordMessageVerdictsToHistoryMap(appNetworkType) return err } return jobFunction, nil } if (jobName == "DownloadMyMateMessages"){ jobFunction := func()error{ err := networkJobs.DownloadMyInboxMessages("Mate", appNetworkType) return err } return jobFunction, nil } if (jobName == "DownloadMyModeratorMessages"){ jobFunction := func()error{ err := networkJobs.DownloadMyInboxMessages("Moderator", appNetworkType) return err } return jobFunction, nil } return nil, errors.New("Tasks map contains unknown jobName: " + jobName) } jobFunction, err := getJobFunction() if (err != nil){ logger.AddLogError("BackgroundJobs", err) return } runningTasksMapMutex.Lock() runningTasksMap[taskName] = true runningTasksMapMutex.Unlock() startTaskFunction := func(){ err := jobFunction() if (err != nil){ logger.AddLogError("BackgroundJobs", err) } runningTasksMapMutex.Lock() runningTasksMap[taskName] = false runningTasksMapMutex.Unlock() currentTime := time.Now().Unix() taskCompletionTimesMapMutex.Lock() taskCompletionTimesMap[taskName] = currentTime taskCompletionTimesMapMutex.Unlock() } go startTaskFunction() } loopIsRunningMutex.RLock() currentLoopIsRunningStatus := loopIsRunning loopIsRunningMutex.RUnlock() if (currentLoopIsRunningStatus == false){ // Background jobs loop has been stopped. return } time.Sleep(time.Second) } } go runBackgroundJobsLoop() return nil }