seekia/internal/backgroundJobs/backgroundJobs.go

646 lines
18 KiB
Go

// backgroundJobs provides functions to start and stop the background jobs loop
// This is a loop that runs various jobs in the background periodically
// The backgroundJobs loop must be started when a user signs in, and stopped when they sign out
package backgroundJobs
// TODO: Add these jobs:
// -Remove old moderator reviews (where moderator has updated their review)(within reviews list)
// -Prune old profiles (if they are outdated and not reported/moderated with ban reviews)
// -Import broadcasted reviews and profiles into database (needed if restoring from old device/database is deleted)
// -Broadcast content (rebroadcast content in the background on a set period)
// -Find reviews with cipher keys that do not hash to the message cipherKeyHash, messages that are not decryptable
// when using a cipherKey that hashes to the message's cipherKeyHash, and messages that are invalid upon being decrypted.
// Ban the moderators who approved the messages (if ModeratorMode is enabled)
// Also, automatically ban these kinds of messages.
// -Delete messages whose metadata we already have (for moderation, if HostMode is disabled and ModeratorMode is enabled)
// -Prune invalid message reports (whose cipher key's hash does not match message's cipherKeyHash)
// -Prune unfunded/expired identities/profiles/reports/messages
// -Prune old profiles/reviews once they have been replaced, if we know they are not reported or banned
// We have to keep content that has been banned for some time, even if it has been replaced with newer content by its author
// This is necessary so the moderators can review the content
// An example is a mate user who shares unruleful content in a profile, then changes their profile to something ruleful
// -Prune content we have already reviewed (in moderatorMode)
// Prune user profiles who no longer fulfill my downloads criteria (if space is running out and host/moderator mode is disabled)
// -Delete a mate user's older profiles if they have a newer profile that does not
// fulfill our criteria, even if that newer profile is not viewable yet (for users who are not within our host/moderator range)
// -Prune database of different networkType content
// -Moderators should automatically ban other moderators who review content from a different networkType
// For example, a review is created on Mainnet for a message which belongs to Testnet1
// The Seekia app will not allow this to happen, so any moderator who does it must be malicious.
//TODO: If a user disables a mode, we should be able to stop all of the jobs associated with that mode
// We will do this by using the checkIfStopped function which we will pass to all of the networkJobs
// We will also use the checkIfStopped function to stop networkJobs upon application closure and user signout.
import "seekia/internal/appMemory"
import "seekia/internal/databaseJobs"
import "seekia/internal/helpers"
import "seekia/internal/logger"
import "seekia/internal/moderation/bannedModeratorConsensus"
import "seekia/internal/moderation/enabledModerators"
import "seekia/internal/moderation/verdictHistory"
import "seekia/internal/myRanges"
import "seekia/internal/mySettings"
import "seekia/internal/network/appNetworkType/getAppNetworkType"
import "seekia/internal/network/enabledHosts"
import "seekia/internal/network/networkJobs"
import "seekia/internal/network/peerServer"
import "errors"
import "sync"
import "time"
var loopIsRunningMutex sync.RWMutex
var loopIsRunning bool
var taskCompletionTimesMapMutex sync.RWMutex
// Map Structure: Task name -> Last time task was completed
var taskCompletionTimesMap map[string]int64 = make(map[string]int64)
var runningTasksMapMutex sync.RWMutex
// Map Structure: Task Name -> Task is running
var runningTasksMap map[string]bool = make(map[string]bool)
func setLoopIsRunningStatus(newStatus bool){
loopIsRunningMutex.Lock()
loopIsRunning = newStatus
loopIsRunningMutex.Unlock()
}
func StopBackgroundJobs()error{
//TODO
// This needs to be run whenever the user is changed
// This will be tricky to implement, because some tasks take a long time
// We need to add isStopped checking into many functions such as sendRequests
// This function should wait for all tasks to stop.
setLoopIsRunningStatus(false)
return nil
}
func StartBackgroundJobs()error{
exists, currentUserName := appMemory.GetMemoryEntry("AppUser")
if (exists == false){
return errors.New("StartBackgroundJobs called when appUser is not signed in.")
}
checkIfUserHasChanged := func()bool{
exists, userName := appMemory.GetMemoryEntry("AppUser")
if (exists == false || currentUserName != userName){
return true
}
return false
}
appNetworkType, err := getAppNetworkType.GetAppNetworkType()
if (err != nil) { return err }
checkIfAppNetworkTypeHasChanged := func()(bool, error){
currentAppNetworkType, err := getAppNetworkType.GetAppNetworkType()
if (err != nil) { return false, err }
if (appNetworkType != currentAppNetworkType){
return true, nil
}
return false, nil
}
setLoopIsRunningStatus(true)
runBackgroundJobsLoop := func(){
//TODO: Stagger jobs, so we don't start them all at the same time
// If we start them all at the same time, it could result in fingerprinting and privacy leaks
for{
userHasChanged := checkIfUserHasChanged()
if (userHasChanged == true){
// This should not happen
// User should only be changed after this loop has stopped
logger.AddLogError("BackgroundJobs", errors.New("App user changed before backgroundJobs loop is stopped."))
return
}
appNetworkTypeHasChanged, err := checkIfAppNetworkTypeHasChanged()
if (err != nil){
logger.AddLogError("BackgroundJobs", err)
return
}
if (appNetworkTypeHasChanged == true){
// This should not happen
// App network type should only be changed after this loop has stopped
logger.AddLogError("BackgroundJobs", errors.New("App network type changed before backgroundJobs loop is stopped."))
return
}
type taskStruct struct{
// A task name is a jobName with a suffix representing the task process
TaskName string
// The name of the Job which this task will run
JobName string
// Seconds between each completed run of the task to wait before we run it again
TimeBetweenTasks int
}
// For example, _1 is the first process, _2 is the second, etc...
getTasksList := func()([]taskStruct, error){
tasksList := make([]taskStruct, 0)
addTasksToList := func(jobName string, timeBetweenTasks int, numberOfProcesses int){
for i:=1; i <= numberOfProcesses; i++{
processString := helpers.ConvertIntToString(i)
taskName := jobName + "_" + processString
newTaskObject := taskStruct{
TaskName: taskName,
JobName: jobName,
TimeBetweenTasks: timeBetweenTasks,
}
tasksList = append(tasksList, newTaskObject)
}
}
//TODO: Choose better timeBetweenTasks and numberOfProcesses values
addTasksToList("UpdateEnabledHosts", 60, 1)
addTasksToList("DownloadParameters", 120, 1)
// Database Jobs:
addTasksToList("UpdateDatabaseMateIdentityProfilesLists", 300, 1)
addTasksToList("UpdateDatabaseHostIdentityProfilesLists", 300, 1)
addTasksToList("UpdateDatabaseModeratorIdentityProfilesLists", 300, 1)
addTasksToList("PruneMateProfileMetadata", 300, 1)
addTasksToList("PruneHostProfileMetadata", 300, 1)
addTasksToList("PruneModeratorProfileMetadata", 300, 1)
addTasksToList("PruneMessageMetadata", 300, 1)
// Host Jobs:
getHostModeIsEnabledStatus := func()(bool, error){
exists, hostModeStatus, err := mySettings.GetSetting("HostModeOnOffStatus")
if (err != nil) { return false, err }
if (exists == true && hostModeStatus == "On"){
return true, nil
}
return false, nil
}
hostModeIsEnabled, err := getHostModeIsEnabledStatus()
if (err != nil){ return nil, err }
if (hostModeIsEnabled == true){
addTasksToList("AdjustMyHostRanges", 120, 1)
addTasksToList("DownloadMateProfilesToHost", 60, 1)
addTasksToList("DownloadHostProfilesToHost", 60, 1)
addTasksToList("DownloadModeratorProfilesToHost", 60, 1)
addTasksToList("DownloadMessagesToHost", 60, 1)
addTasksToList("DownloadReviewsToHost", 60, 1)
addTasksToList("DownloadReportsToHost", 60, 1)
addTasksToList("StartPeerServer", 3, 1)
} else {
addTasksToList("StopPeerServer", 3, 1)
}
// Moderator Jobs:
getModeratorModeIsEnabledStatus := func()(bool, error){
exists, moderatorModeStatus, err := mySettings.GetSetting("ModeratorModeOnOffStatus")
if (err != nil){ return false, err }
if (exists == true && moderatorModeStatus == "On"){
return true, nil
}
return false, nil
}
moderatorModeIsEnabled, err := getModeratorModeIsEnabledStatus()
if (err != nil) { return nil, err }
if (moderatorModeIsEnabled == true){
addTasksToList("AdjustMyModeratorRanges", 120, 1)
}
if (hostModeIsEnabled == true || moderatorModeIsEnabled == true){
addTasksToList("UpdateEnabledModerators", 60, 1)
addTasksToList("UpdateBannedModerators", 60, 1)
addTasksToList("RecordIdentityVerdictHistories", 120, 1)
addTasksToList("RecordProfileVerdictHistories", 120, 1)
addTasksToList("RecordMessageVerdictHistories", 120, 1)
}
addTasksToList("DownloadMyMateMessages", 1, 3)
if (moderatorModeIsEnabled == true){
addTasksToList("DownloadMyModeratorMessages", 1, 3)
}
//TODO: Add tasks from networkJobs.go, and more tasks:
// networkJobs.DownloadAllNewestViewableUserProfiles
// networkJobs.DownloadMateProfilesToBrowse
// networkJobs.DownloadMateOutlierProfiles
// networkJobs.DownloadProfilesToModerate
// networkJobs.DownloadMessagesToModerate
// networkJobs.DownloadMyInboxMessages
// networkJobs.DownloadModeratorIdentityBanningReviews
// networkJobs.DownloadIdentityReviewsToHost
// networkJobs.DownloadMessageReviewsToHost
// networkJobs.DownloadIdentityReviewsForModeration
// networkJobs.DownloadMessageReviewsForModeration
// networkJobs.DownloadIdentityReportsToHost
// networkJobs.DownloadMessageReportsToHost
// networkJobs.DownloadIdentityReportsForModeration
// networkJobs.DownloadMessageReportsForModeration
// networkJobs.DownloadHostViewableStatuses
// networkJobs.DownloadModeratorProfileViewableStatuses
// networkJobs.DownloadMateViewableStatusesForBrowsing
// networkJobs.DownloadMateOutlierViewableStatuses
// If local blockchain is not enabled and host/moderator mode is enabled:
// networkJobs.DownloadModeratorIdentityDeposits
// myMessageQueue.AttemptToSendMessagesInQueue
// myBroadcasts.PruneMyBroadcastedReviews
return tasksList, nil
}
tasksList, err := getTasksList()
if (err != nil) {
logger.AddLogError("BackgroundJobs", err)
return
}
for _, taskObject := range tasksList{
taskName := taskObject.TaskName
jobName := taskObject.JobName
taskWaitTime := taskObject.TimeBetweenTasks
runningTasksMapMutex.RLock()
taskIsRunning, exists := runningTasksMap[taskName]
runningTasksMapMutex.RUnlock()
if (exists == true && taskIsRunning == true){
// Task is already running.
// We skip it.
continue
}
taskCompletionTimesMapMutex.RLock()
taskLastCompletionTime, exists := taskCompletionTimesMap[taskName]
taskCompletionTimesMapMutex.RUnlock()
if (exists == true){
currentTime := time.Now().Unix()
timeElapsedFromLastCompletion := currentTime - taskLastCompletionTime
if (timeElapsedFromLastCompletion < int64(taskWaitTime)){
continue
}
}
// Now we run the task
// We get the task's job function
//TODO: Add a CheckIfStoppedFunction that we pass to many jobs
// This will return true if the user is signing out/closing Seekia
getJobFunction := func()(func()error, error){
if (jobName == "UpdateEnabledModerators"){
jobFunction := func()error{
err := enabledModerators.UpdateEnabledModeratorsList(appNetworkType)
return err
}
return jobFunction, nil
}
if (jobName == "UpdateBannedModerators"){
jobFunction := func()error{
err := bannedModeratorConsensus.UpdateBannedModeratorsList(appNetworkType)
return err
}
return jobFunction, nil
}
if (jobName == "UpdateEnabledHosts"){
jobFunction := func()error{
err := enabledHosts.UpdateEnabledHostsList(appNetworkType)
return err
}
return jobFunction, nil
}
if (jobName == "AdjustMyHostRanges"){
jobFunction := func()error{
err := myRanges.AdjustMyRanges("Host")
return err
}
return jobFunction, nil
}
if (jobName == "AdjustMyModeratorRanges"){
jobFunction := func()error{
err := myRanges.AdjustMyRanges("Moderator")
return err
}
return jobFunction, nil
}
if (jobName == "UpdateDatabaseMateIdentityProfilesLists"){
jobFunction := func()error{
err := databaseJobs.UpdateDatabaseIdentityProfilesLists("Mate")
return err
}
return jobFunction, nil
}
if (jobName == "UpdateDatabaseHostIdentityProfilesLists"){
jobFunction := func()error{
err := databaseJobs.UpdateDatabaseIdentityProfilesLists("Host")
return err
}
return jobFunction, nil
}
if (jobName == "UpdateDatabaseModeratorIdentityProfilesLists"){
jobFunction := func()error{
err := databaseJobs.UpdateDatabaseIdentityProfilesLists("Moderator")
return err
}
return jobFunction, nil
}
if (jobName == "PruneMateProfileMetadata"){
jobFunction := func()error{
err := databaseJobs.PruneProfileMetadata("Mate")
return err
}
return jobFunction, nil
}
if (jobName == "PruneHostProfileMetadata"){
jobFunction := func()error{
err := databaseJobs.PruneProfileMetadata("Host")
return err
}
return jobFunction, nil
}
if (jobName == "PruneModeratorProfileMetadata"){
jobFunction := func()error{
err := databaseJobs.PruneProfileMetadata("Moderator")
return err
}
return jobFunction, nil
}
if (jobName == "PruneMessageMetadata"){
jobFunction := databaseJobs.PruneMessageMetadata
return jobFunction, nil
}
if (jobName == "DownloadParameters"){
jobFunction := func()error{
err := networkJobs.DownloadParameters(appNetworkType)
return err
}
return jobFunction, nil
}
if (jobName == "DownloadMateProfilesToHost"){
jobFunction := func()error{
err := networkJobs.DownloadProfilesToHost("Mate", appNetworkType)
return err
}
return jobFunction, nil
}
if (jobName == "DownloadHostProfilesToHost"){
jobFunction := func()error{
err := networkJobs.DownloadProfilesToHost("Host", appNetworkType)
return err
}
return jobFunction, nil
}
if (jobName == "DownloadModeratorProfilesToHost"){
jobFunction := func()error{
err := networkJobs.DownloadProfilesToHost("Moderator", appNetworkType)
return err
}
return jobFunction, nil
}
if (jobName == "DownloadMessagesToHost"){
jobFunction := func()error{
err := networkJobs.DownloadMessagesToHost(appNetworkType)
return err
}
return jobFunction, nil
}
if (jobName == "StartPeerServer"){
jobFunction := peerServer.StartPeerServer
return jobFunction, nil
}
if (jobName == "StopPeerServer"){
jobFunction := peerServer.StopPeerServer
return jobFunction, nil
}
if (jobName == "RecordIdentityVerdictHistories"){
jobFunction := func()error{
err := verdictHistory.RecordIdentityVerdictsToHistoryMap(appNetworkType)
return err
}
return jobFunction, nil
}
if (jobName == "RecordProfileVerdictHistories"){
jobFunction := func()error{
err := verdictHistory.RecordProfileVerdictsToHistoryMap(appNetworkType)
return err
}
return jobFunction, nil
}
if (jobName == "RecordMessageVerdictHistories"){
jobFunction := func()error{
err := verdictHistory.RecordMessageVerdictsToHistoryMap(appNetworkType)
return err
}
return jobFunction, nil
}
if (jobName == "DownloadMyMateMessages"){
jobFunction := func()error{
err := networkJobs.DownloadMyInboxMessages("Mate", appNetworkType)
return err
}
return jobFunction, nil
}
if (jobName == "DownloadMyModeratorMessages"){
jobFunction := func()error{
err := networkJobs.DownloadMyInboxMessages("Moderator", appNetworkType)
return err
}
return jobFunction, nil
}
return nil, errors.New("Tasks map contains unknown jobName: " + jobName)
}
jobFunction, err := getJobFunction()
if (err != nil){
logger.AddLogError("BackgroundJobs", err)
return
}
runningTasksMapMutex.Lock()
runningTasksMap[taskName] = true
runningTasksMapMutex.Unlock()
startTaskFunction := func(){
err := jobFunction()
if (err != nil){
logger.AddLogError("BackgroundJobs", err)
}
runningTasksMapMutex.Lock()
runningTasksMap[taskName] = false
runningTasksMapMutex.Unlock()
currentTime := time.Now().Unix()
taskCompletionTimesMapMutex.Lock()
taskCompletionTimesMap[taskName] = currentTime
taskCompletionTimesMapMutex.Unlock()
}
go startTaskFunction()
}
loopIsRunningMutex.RLock()
currentLoopIsRunningStatus := loopIsRunning
loopIsRunningMutex.RUnlock()
if (currentLoopIsRunningStatus == false){
// Background jobs loop has been stopped.
return
}
time.Sleep(time.Second)
}
}
go runBackgroundJobsLoop()
return nil
}