646 lines
18 KiB
Go
646 lines
18 KiB
Go
|
|
// backgroundJobs provides functions to start and stop the background jobs loop
|
|
// This is a loop that runs various jobs in the background periodically
|
|
// The backgroundJobs loop must be started when a user signs in, and stopped when they sign out
|
|
|
|
package backgroundJobs
|
|
|
|
// TODO: Add these jobs:
|
|
// -Remove old moderator reviews (where moderator has updated their review)(within reviews list)
|
|
// -Prune old profiles (if they are outdated and not reported/moderated with ban reviews)
|
|
// -Import broadcasted reviews and profiles into database (needed if restoring from old device/database is deleted)
|
|
// -Broadcast content (rebroadcast content in the background on a set period)
|
|
// -Find reviews with cipher keys that do not hash to the message cipherKeyHash, messages that are not decryptable
|
|
// when using a cipherKey that hashes to the message's cipherKeyHash, and messages that are invalid upon being decrypted.
|
|
// Ban the moderators who approved the messages (if ModeratorMode is enabled)
|
|
// Also, automatically ban these kinds of messages.
|
|
// -Delete messages whose metadata we already have (for moderation, if HostMode is disabled and ModeratorMode is enabled)
|
|
// -Prune invalid message reports (whose cipher key's hash does not match message's cipherKeyHash)
|
|
// -Prune unfunded/expired identities/profiles/reports/messages
|
|
// -Prune old profiles/reviews once they have been replaced, if we know they are not reported or banned
|
|
// We have to keep content that has been banned for some time, even if it has been replaced with newer content by its author
|
|
// This is necessary so the moderators can review the content
|
|
// An example is a mate user who shares unruleful content in a profile, then changes their profile to something ruleful
|
|
// -Prune content we have already reviewed (in moderatorMode)
|
|
// Prune user profiles who no longer fulfill my downloads criteria (if space is running out and host/moderator mode is disabled)
|
|
// -Delete a mate user's older profiles if they have a newer profile that does not
|
|
// fulfill our criteria, even if that newer profile is not viewable yet (for users who are not within our host/moderator range)
|
|
// -Prune database of different networkType content
|
|
// -Moderators should automatically ban other moderators who review content from a different networkType
|
|
// For example, a review is created on Mainnet for a message which belongs to Testnet1
|
|
// The Seekia app will not allow this to happen, so any moderator who does it must be malicious.
|
|
|
|
//TODO: If a user disables a mode, we should be able to stop all of the jobs associated with that mode
|
|
// We will do this by using the checkIfStopped function which we will pass to all of the networkJobs
|
|
// We will also use the checkIfStopped function to stop networkJobs upon application closure and user signout.
|
|
|
|
import "seekia/internal/appMemory"
|
|
import "seekia/internal/databaseJobs"
|
|
import "seekia/internal/helpers"
|
|
import "seekia/internal/logger"
|
|
import "seekia/internal/moderation/bannedModeratorConsensus"
|
|
import "seekia/internal/moderation/enabledModerators"
|
|
import "seekia/internal/moderation/verdictHistory"
|
|
import "seekia/internal/myRanges"
|
|
import "seekia/internal/mySettings"
|
|
import "seekia/internal/network/appNetworkType/getAppNetworkType"
|
|
import "seekia/internal/network/enabledHosts"
|
|
import "seekia/internal/network/networkJobs"
|
|
import "seekia/internal/network/peerServer"
|
|
|
|
import "errors"
|
|
import "sync"
|
|
import "time"
|
|
|
|
var loopIsRunningMutex sync.RWMutex
|
|
var loopIsRunning bool
|
|
|
|
var taskCompletionTimesMapMutex sync.RWMutex
|
|
// Map Structure: Task name -> Last time task was completed
|
|
var taskCompletionTimesMap map[string]int64 = make(map[string]int64)
|
|
|
|
var runningTasksMapMutex sync.RWMutex
|
|
// Map Structure: Task Name -> Task is running
|
|
var runningTasksMap map[string]bool = make(map[string]bool)
|
|
|
|
func setLoopIsRunningStatus(newStatus bool){
|
|
|
|
loopIsRunningMutex.Lock()
|
|
loopIsRunning = newStatus
|
|
loopIsRunningMutex.Unlock()
|
|
}
|
|
|
|
func StopBackgroundJobs()error{
|
|
|
|
//TODO
|
|
// This needs to be run whenever the user is changed
|
|
// This will be tricky to implement, because some tasks take a long time
|
|
// We need to add isStopped checking into many functions such as sendRequests
|
|
// This function should wait for all tasks to stop.
|
|
|
|
setLoopIsRunningStatus(false)
|
|
|
|
return nil
|
|
}
|
|
|
|
func StartBackgroundJobs()error{
|
|
|
|
exists, currentUserName := appMemory.GetMemoryEntry("AppUser")
|
|
if (exists == false){
|
|
return errors.New("StartBackgroundJobs called when appUser is not signed in.")
|
|
}
|
|
|
|
checkIfUserHasChanged := func()bool{
|
|
|
|
exists, userName := appMemory.GetMemoryEntry("AppUser")
|
|
if (exists == false || currentUserName != userName){
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
appNetworkType, err := getAppNetworkType.GetAppNetworkType()
|
|
if (err != nil) { return err }
|
|
|
|
checkIfAppNetworkTypeHasChanged := func()(bool, error){
|
|
|
|
currentAppNetworkType, err := getAppNetworkType.GetAppNetworkType()
|
|
if (err != nil) { return false, err }
|
|
if (appNetworkType != currentAppNetworkType){
|
|
return true, nil
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
setLoopIsRunningStatus(true)
|
|
|
|
runBackgroundJobsLoop := func(){
|
|
|
|
//TODO: Stagger jobs, so we don't start them all at the same time
|
|
// If we start them all at the same time, it could result in fingerprinting and privacy leaks
|
|
|
|
for{
|
|
|
|
userHasChanged := checkIfUserHasChanged()
|
|
if (userHasChanged == true){
|
|
// This should not happen
|
|
// User should only be changed after this loop has stopped
|
|
logger.AddLogError("BackgroundJobs", errors.New("App user changed before backgroundJobs loop is stopped."))
|
|
return
|
|
}
|
|
|
|
appNetworkTypeHasChanged, err := checkIfAppNetworkTypeHasChanged()
|
|
if (err != nil){
|
|
logger.AddLogError("BackgroundJobs", err)
|
|
return
|
|
}
|
|
if (appNetworkTypeHasChanged == true){
|
|
// This should not happen
|
|
// App network type should only be changed after this loop has stopped
|
|
logger.AddLogError("BackgroundJobs", errors.New("App network type changed before backgroundJobs loop is stopped."))
|
|
return
|
|
}
|
|
|
|
type taskStruct struct{
|
|
|
|
// A task name is a jobName with a suffix representing the task process
|
|
TaskName string
|
|
|
|
// The name of the Job which this task will run
|
|
JobName string
|
|
|
|
// Seconds between each completed run of the task to wait before we run it again
|
|
TimeBetweenTasks int
|
|
}
|
|
|
|
// For example, _1 is the first process, _2 is the second, etc...
|
|
getTasksList := func()([]taskStruct, error){
|
|
|
|
tasksList := make([]taskStruct, 0)
|
|
|
|
addTasksToList := func(jobName string, timeBetweenTasks int, numberOfProcesses int){
|
|
|
|
for i:=1; i <= numberOfProcesses; i++{
|
|
|
|
processString := helpers.ConvertIntToString(i)
|
|
|
|
taskName := jobName + "_" + processString
|
|
|
|
newTaskObject := taskStruct{
|
|
|
|
TaskName: taskName,
|
|
JobName: jobName,
|
|
TimeBetweenTasks: timeBetweenTasks,
|
|
}
|
|
|
|
tasksList = append(tasksList, newTaskObject)
|
|
}
|
|
}
|
|
|
|
//TODO: Choose better timeBetweenTasks and numberOfProcesses values
|
|
|
|
addTasksToList("UpdateEnabledHosts", 60, 1)
|
|
|
|
addTasksToList("DownloadParameters", 120, 1)
|
|
|
|
// Database Jobs:
|
|
|
|
addTasksToList("UpdateDatabaseMateIdentityProfilesLists", 300, 1)
|
|
addTasksToList("UpdateDatabaseHostIdentityProfilesLists", 300, 1)
|
|
addTasksToList("UpdateDatabaseModeratorIdentityProfilesLists", 300, 1)
|
|
|
|
addTasksToList("PruneMateProfileMetadata", 300, 1)
|
|
addTasksToList("PruneHostProfileMetadata", 300, 1)
|
|
addTasksToList("PruneModeratorProfileMetadata", 300, 1)
|
|
addTasksToList("PruneMessageMetadata", 300, 1)
|
|
|
|
// Host Jobs:
|
|
|
|
getHostModeIsEnabledStatus := func()(bool, error){
|
|
|
|
exists, hostModeStatus, err := mySettings.GetSetting("HostModeOnOffStatus")
|
|
if (err != nil) { return false, err }
|
|
if (exists == true && hostModeStatus == "On"){
|
|
|
|
return true, nil
|
|
}
|
|
return false, nil
|
|
}
|
|
|
|
hostModeIsEnabled, err := getHostModeIsEnabledStatus()
|
|
if (err != nil){ return nil, err }
|
|
if (hostModeIsEnabled == true){
|
|
|
|
addTasksToList("AdjustMyHostRanges", 120, 1)
|
|
|
|
addTasksToList("DownloadMateProfilesToHost", 60, 1)
|
|
addTasksToList("DownloadHostProfilesToHost", 60, 1)
|
|
addTasksToList("DownloadModeratorProfilesToHost", 60, 1)
|
|
|
|
addTasksToList("DownloadMessagesToHost", 60, 1)
|
|
addTasksToList("DownloadReviewsToHost", 60, 1)
|
|
addTasksToList("DownloadReportsToHost", 60, 1)
|
|
|
|
addTasksToList("StartPeerServer", 3, 1)
|
|
} else {
|
|
addTasksToList("StopPeerServer", 3, 1)
|
|
}
|
|
|
|
// Moderator Jobs:
|
|
|
|
getModeratorModeIsEnabledStatus := func()(bool, error){
|
|
|
|
exists, moderatorModeStatus, err := mySettings.GetSetting("ModeratorModeOnOffStatus")
|
|
if (err != nil){ return false, err }
|
|
if (exists == true && moderatorModeStatus == "On"){
|
|
return true, nil
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
moderatorModeIsEnabled, err := getModeratorModeIsEnabledStatus()
|
|
if (err != nil) { return nil, err }
|
|
if (moderatorModeIsEnabled == true){
|
|
|
|
addTasksToList("AdjustMyModeratorRanges", 120, 1)
|
|
}
|
|
|
|
if (hostModeIsEnabled == true || moderatorModeIsEnabled == true){
|
|
|
|
addTasksToList("UpdateEnabledModerators", 60, 1)
|
|
addTasksToList("UpdateBannedModerators", 60, 1)
|
|
|
|
addTasksToList("RecordIdentityVerdictHistories", 120, 1)
|
|
addTasksToList("RecordProfileVerdictHistories", 120, 1)
|
|
addTasksToList("RecordMessageVerdictHistories", 120, 1)
|
|
|
|
}
|
|
|
|
addTasksToList("DownloadMyMateMessages", 1, 3)
|
|
if (moderatorModeIsEnabled == true){
|
|
addTasksToList("DownloadMyModeratorMessages", 1, 3)
|
|
}
|
|
|
|
//TODO: Add tasks from networkJobs.go, and more tasks:
|
|
|
|
// networkJobs.DownloadAllNewestViewableUserProfiles
|
|
// networkJobs.DownloadMateProfilesToBrowse
|
|
// networkJobs.DownloadMateOutlierProfiles
|
|
// networkJobs.DownloadProfilesToModerate
|
|
// networkJobs.DownloadMessagesToModerate
|
|
// networkJobs.DownloadMyInboxMessages
|
|
// networkJobs.DownloadModeratorIdentityBanningReviews
|
|
|
|
// networkJobs.DownloadIdentityReviewsToHost
|
|
// networkJobs.DownloadMessageReviewsToHost
|
|
// networkJobs.DownloadIdentityReviewsForModeration
|
|
// networkJobs.DownloadMessageReviewsForModeration
|
|
// networkJobs.DownloadIdentityReportsToHost
|
|
// networkJobs.DownloadMessageReportsToHost
|
|
// networkJobs.DownloadIdentityReportsForModeration
|
|
// networkJobs.DownloadMessageReportsForModeration
|
|
|
|
// networkJobs.DownloadHostViewableStatuses
|
|
// networkJobs.DownloadModeratorProfileViewableStatuses
|
|
// networkJobs.DownloadMateViewableStatusesForBrowsing
|
|
// networkJobs.DownloadMateOutlierViewableStatuses
|
|
|
|
// If local blockchain is not enabled and host/moderator mode is enabled:
|
|
// networkJobs.DownloadModeratorIdentityDeposits
|
|
|
|
// myMessageQueue.AttemptToSendMessagesInQueue
|
|
|
|
// myBroadcasts.PruneMyBroadcastedReviews
|
|
|
|
return tasksList, nil
|
|
}
|
|
|
|
tasksList, err := getTasksList()
|
|
if (err != nil) {
|
|
logger.AddLogError("BackgroundJobs", err)
|
|
return
|
|
}
|
|
|
|
for _, taskObject := range tasksList{
|
|
|
|
taskName := taskObject.TaskName
|
|
jobName := taskObject.JobName
|
|
taskWaitTime := taskObject.TimeBetweenTasks
|
|
|
|
runningTasksMapMutex.RLock()
|
|
taskIsRunning, exists := runningTasksMap[taskName]
|
|
runningTasksMapMutex.RUnlock()
|
|
if (exists == true && taskIsRunning == true){
|
|
// Task is already running.
|
|
// We skip it.
|
|
continue
|
|
}
|
|
|
|
taskCompletionTimesMapMutex.RLock()
|
|
taskLastCompletionTime, exists := taskCompletionTimesMap[taskName]
|
|
taskCompletionTimesMapMutex.RUnlock()
|
|
if (exists == true){
|
|
|
|
currentTime := time.Now().Unix()
|
|
timeElapsedFromLastCompletion := currentTime - taskLastCompletionTime
|
|
if (timeElapsedFromLastCompletion < int64(taskWaitTime)){
|
|
continue
|
|
}
|
|
}
|
|
|
|
// Now we run the task
|
|
// We get the task's job function
|
|
|
|
//TODO: Add a CheckIfStoppedFunction that we pass to many jobs
|
|
// This will return true if the user is signing out/closing Seekia
|
|
|
|
getJobFunction := func()(func()error, error){
|
|
|
|
if (jobName == "UpdateEnabledModerators"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := enabledModerators.UpdateEnabledModeratorsList(appNetworkType)
|
|
|
|
return err
|
|
}
|
|
|
|
return jobFunction, nil
|
|
}
|
|
if (jobName == "UpdateBannedModerators"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := bannedModeratorConsensus.UpdateBannedModeratorsList(appNetworkType)
|
|
|
|
return err
|
|
}
|
|
|
|
return jobFunction, nil
|
|
}
|
|
if (jobName == "UpdateEnabledHosts"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := enabledHosts.UpdateEnabledHostsList(appNetworkType)
|
|
|
|
return err
|
|
}
|
|
|
|
return jobFunction, nil
|
|
}
|
|
if (jobName == "AdjustMyHostRanges"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := myRanges.AdjustMyRanges("Host")
|
|
|
|
return err
|
|
}
|
|
return jobFunction, nil
|
|
}
|
|
if (jobName == "AdjustMyModeratorRanges"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := myRanges.AdjustMyRanges("Moderator")
|
|
|
|
return err
|
|
}
|
|
return jobFunction, nil
|
|
}
|
|
|
|
if (jobName == "UpdateDatabaseMateIdentityProfilesLists"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := databaseJobs.UpdateDatabaseIdentityProfilesLists("Mate")
|
|
|
|
return err
|
|
}
|
|
|
|
return jobFunction, nil
|
|
}
|
|
if (jobName == "UpdateDatabaseHostIdentityProfilesLists"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := databaseJobs.UpdateDatabaseIdentityProfilesLists("Host")
|
|
|
|
return err
|
|
}
|
|
|
|
return jobFunction, nil
|
|
}
|
|
if (jobName == "UpdateDatabaseModeratorIdentityProfilesLists"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := databaseJobs.UpdateDatabaseIdentityProfilesLists("Moderator")
|
|
|
|
return err
|
|
}
|
|
|
|
return jobFunction, nil
|
|
}
|
|
if (jobName == "PruneMateProfileMetadata"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := databaseJobs.PruneProfileMetadata("Mate")
|
|
|
|
return err
|
|
}
|
|
|
|
return jobFunction, nil
|
|
}
|
|
if (jobName == "PruneHostProfileMetadata"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := databaseJobs.PruneProfileMetadata("Host")
|
|
|
|
return err
|
|
}
|
|
|
|
return jobFunction, nil
|
|
}
|
|
if (jobName == "PruneModeratorProfileMetadata"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := databaseJobs.PruneProfileMetadata("Moderator")
|
|
|
|
return err
|
|
}
|
|
|
|
return jobFunction, nil
|
|
}
|
|
if (jobName == "PruneMessageMetadata"){
|
|
|
|
jobFunction := databaseJobs.PruneMessageMetadata
|
|
|
|
return jobFunction, nil
|
|
}
|
|
|
|
if (jobName == "DownloadParameters"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := networkJobs.DownloadParameters(appNetworkType)
|
|
|
|
return err
|
|
}
|
|
|
|
return jobFunction, nil
|
|
}
|
|
if (jobName == "DownloadMateProfilesToHost"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := networkJobs.DownloadProfilesToHost("Mate", appNetworkType)
|
|
|
|
return err
|
|
}
|
|
|
|
return jobFunction, nil
|
|
}
|
|
if (jobName == "DownloadHostProfilesToHost"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := networkJobs.DownloadProfilesToHost("Host", appNetworkType)
|
|
|
|
return err
|
|
}
|
|
|
|
return jobFunction, nil
|
|
}
|
|
if (jobName == "DownloadModeratorProfilesToHost"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := networkJobs.DownloadProfilesToHost("Moderator", appNetworkType)
|
|
|
|
return err
|
|
}
|
|
|
|
return jobFunction, nil
|
|
}
|
|
if (jobName == "DownloadMessagesToHost"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := networkJobs.DownloadMessagesToHost(appNetworkType)
|
|
|
|
return err
|
|
}
|
|
|
|
return jobFunction, nil
|
|
}
|
|
if (jobName == "StartPeerServer"){
|
|
|
|
jobFunction := peerServer.StartPeerServer
|
|
|
|
return jobFunction, nil
|
|
}
|
|
if (jobName == "StopPeerServer"){
|
|
|
|
jobFunction := peerServer.StopPeerServer
|
|
|
|
return jobFunction, nil
|
|
}
|
|
if (jobName == "RecordIdentityVerdictHistories"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := verdictHistory.RecordIdentityVerdictsToHistoryMap(appNetworkType)
|
|
|
|
return err
|
|
}
|
|
|
|
return jobFunction, nil
|
|
}
|
|
if (jobName == "RecordProfileVerdictHistories"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := verdictHistory.RecordProfileVerdictsToHistoryMap(appNetworkType)
|
|
|
|
return err
|
|
}
|
|
return jobFunction, nil
|
|
}
|
|
if (jobName == "RecordMessageVerdictHistories"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := verdictHistory.RecordMessageVerdictsToHistoryMap(appNetworkType)
|
|
|
|
return err
|
|
}
|
|
|
|
return jobFunction, nil
|
|
}
|
|
|
|
if (jobName == "DownloadMyMateMessages"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := networkJobs.DownloadMyInboxMessages("Mate", appNetworkType)
|
|
|
|
return err
|
|
}
|
|
|
|
return jobFunction, nil
|
|
}
|
|
|
|
if (jobName == "DownloadMyModeratorMessages"){
|
|
|
|
jobFunction := func()error{
|
|
|
|
err := networkJobs.DownloadMyInboxMessages("Moderator", appNetworkType)
|
|
|
|
return err
|
|
}
|
|
|
|
return jobFunction, nil
|
|
}
|
|
|
|
return nil, errors.New("Tasks map contains unknown jobName: " + jobName)
|
|
}
|
|
|
|
jobFunction, err := getJobFunction()
|
|
if (err != nil){
|
|
logger.AddLogError("BackgroundJobs", err)
|
|
return
|
|
}
|
|
|
|
runningTasksMapMutex.Lock()
|
|
runningTasksMap[taskName] = true
|
|
runningTasksMapMutex.Unlock()
|
|
|
|
startTaskFunction := func(){
|
|
|
|
err := jobFunction()
|
|
if (err != nil){
|
|
logger.AddLogError("BackgroundJobs", err)
|
|
}
|
|
|
|
runningTasksMapMutex.Lock()
|
|
runningTasksMap[taskName] = false
|
|
runningTasksMapMutex.Unlock()
|
|
|
|
currentTime := time.Now().Unix()
|
|
|
|
taskCompletionTimesMapMutex.Lock()
|
|
taskCompletionTimesMap[taskName] = currentTime
|
|
taskCompletionTimesMapMutex.Unlock()
|
|
}
|
|
|
|
go startTaskFunction()
|
|
}
|
|
|
|
loopIsRunningMutex.RLock()
|
|
currentLoopIsRunningStatus := loopIsRunning
|
|
loopIsRunningMutex.RUnlock()
|
|
|
|
if (currentLoopIsRunningStatus == false){
|
|
// Background jobs loop has been stopped.
|
|
return
|
|
}
|
|
|
|
time.Sleep(time.Second)
|
|
}
|
|
}
|
|
|
|
go runBackgroundJobsLoop()
|
|
|
|
return nil
|
|
}
|
|
|
|
|
|
|