seekia/internal/network/manualDownloads/manualDownloads.go

684 lines
23 KiB
Go

// manualDownloads provides functions to start and monitor manual downloads
// These are downloads that the user initiates through the GUI, such as downloading a profile/review/report
// The user is able to monitor the status of these downloads through the GUI
// These are different from the downloads that are called automatically in the background (see networkJobs.go)
package manualDownloads
// Types of downloads:
// -Download a user's profile
// -Download a specific message
// -Download a review by reviewHash or reviewedHash
// -Download a report by reportedHash
// -Download all reviews by a reviewer
// -Download all reviews for a reviewedHash
// -Download all reports for a reportedHash
// -Update a user's identity score/balance
import "seekia/internal/byteRange"
import "seekia/internal/encoding"
import "seekia/internal/helpers"
import "seekia/internal/identity"
import "seekia/internal/logger"
import "seekia/internal/moderation/trustedViewableStatus"
import "seekia/internal/network/eligibleHosts"
import "seekia/internal/network/hostRanges"
import "seekia/internal/network/peerClient"
import "seekia/internal/network/sendRequests"
import "seekia/internal/profiles/profileStorage"
import "errors"
import "time"
import "sync"
import "slices"
// This object stores data about active processes
type processObject struct{
// Set to true once process is complete. Will be true if we encounter an error
isComplete bool
// Set to true if we encounter an error
encounteredError bool
// The error we encountered
errorEncountered error
// Stores the number of hosts that have been successfully downloaded from
numberOfSuccessfulDownloads int
// Stores the number of hosts we tried to download from whom did not have the content
numberOfHostsMissingContent int
// Stores details about progress status, which are shown to user in GUI
progressDetails string
}
var processObjectsMapMutex sync.RWMutex
var processObjectsMap map[[23]byte]processObject = make(map[[23]byte]processObject)
//Outputs:
// -[23]byte: New process identifier
// -error
func initializeNewProcessObject()([23]byte, error){
processIdentifierBytes, err := helpers.GetNewRandomBytes(23)
if (err != nil) { return [23]byte{}, err }
processIdentifier := [23]byte(processIdentifierBytes)
newProcessObject := processObject{
isComplete: false,
encounteredError: false,
errorEncountered: nil,
numberOfSuccessfulDownloads: 0,
numberOfHostsMissingContent: 0,
progressDetails: "",
}
processObjectsMapMutex.Lock()
processObjectsMap[processIdentifier] = newProcessObject
processObjectsMapMutex.Unlock()
return processIdentifier, nil
}
func increaseProcessSuccessfulDownloadsCount(processIdentifier [23]byte)error{
processObjectsMapMutex.Lock()
defer processObjectsMapMutex.Unlock()
processObject, exists := processObjectsMap[processIdentifier]
if (exists == false){
processIdentifierHex := encoding.EncodeBytesToHexString(processIdentifier[:])
return errors.New("increaseProcessSuccessfulDownloadsCount called with uninitialized process: " + processIdentifierHex)
}
processObject.numberOfSuccessfulDownloads += 1
processObjectsMap[processIdentifier] = processObject
return nil
}
func increaseProcessHostsMissingContentCount(processIdentifier [23]byte)error{
processObjectsMapMutex.Lock()
defer processObjectsMapMutex.Unlock()
processObject, exists := processObjectsMap[processIdentifier]
if (exists == false){
processIdentifierHex := encoding.EncodeBytesToHexString(processIdentifier[:])
return errors.New("increaseProcessHostsMissingContentCount called with uninitialized process: " + processIdentifierHex)
}
processObject.numberOfHostsMissingContent += 1
processObjectsMap[processIdentifier] = processObject
return nil
}
func setProcessProgressDetails(processIdentifier [23]byte, newProgressDetails string)error{
processObjectsMapMutex.Lock()
defer processObjectsMapMutex.Unlock()
processObject, exists := processObjectsMap[processIdentifier]
if (exists == false){
processIdentifierHex := encoding.EncodeBytesToHexString(processIdentifier[:])
return errors.New("setProcessProgressDetails called with uninitialized process: " + processIdentifierHex)
}
processObject.progressDetails = newProgressDetails
processObjectsMap[processIdentifier] = processObject
return nil
}
func setProcessEncounteredError(processIdentifier [23]byte, errorEncountered error)error{
processObjectsMapMutex.Lock()
defer processObjectsMapMutex.Unlock()
processObject, exists := processObjectsMap[processIdentifier]
if (exists == false){
processIdentifierHex := encoding.EncodeBytesToHexString(processIdentifier[:])
return errors.New("setProcessEncounteredError called with uninitialized process: " + processIdentifierHex)
}
processObject.isComplete = true
processObject.encounteredError = true
processObject.errorEncountered = errorEncountered
processObjectsMap[processIdentifier] = processObject
return nil
}
func setProcessIsComplete(processIdentifier [23]byte)error{
processObjectsMapMutex.Lock()
defer processObjectsMapMutex.Unlock()
processObject, exists := processObjectsMap[processIdentifier]
if (exists == false){
processIdentifierHex := encoding.EncodeBytesToHexString(processIdentifier[:])
return errors.New("setProcessIsComplete called with uninitialized process: " + processIdentifierHex)
}
processObject.isComplete = true
processObjectsMap[processIdentifier] = processObject
return nil
}
//Outputs:
// -bool: Process found
// -bool: Process is complete status
// -bool: Process encountered error
// -error: Error encountered by process
// -int: Number of hosts successfully downloaded from (These are hosts who gave us the content we are trying to download)
// -int: Number of hosts missing content (These are hosts that have said they do not have the content)
// -string: Process progress details
func GetProcessInfo(processIdentifier [23]byte)(bool, bool, bool, error, int, int, string){
processObjectsMapMutex.RLock()
defer processObjectsMapMutex.RUnlock()
processObject, exists := processObjectsMap[processIdentifier]
if (exists == false) {
return false, false, false, nil, 0, 0, ""
}
processIsComplete := processObject.isComplete
processEncounteredError := processObject.encounteredError
processError := processObject.errorEncountered
processNumberOfSuccessfulDownloads := processObject.numberOfSuccessfulDownloads
processNumberOfHostsMissingContent := processObject.numberOfHostsMissingContent
processProgressDetails := processObject.progressDetails
return true, processIsComplete, processEncounteredError, processError, processNumberOfSuccessfulDownloads, processNumberOfHostsMissingContent, processProgressDetails
}
func EndProcess(processIdentifier [23]byte)error{
err := setProcessIsComplete(processIdentifier)
if (err != nil){ return err }
err = setProcessProgressDetails(processIdentifier, "Download manually stopped.")
if (err != nil) { return err }
return nil
}
//Inputs:
// -[16]byte: User identity hash whose profile we are downloading
// -byte: Network type to retrieve from
// -bool: Get viewable profiles only
// -int: Number of hosts to query (This is the number of hosts to request a successful download from)
// -A successful download is one where the host either offered a profile we already have, or we downloaded a new profile
// -If they say they have no profile, the download is not successful
// -int: Maximum number of hosts to query
//Outputs:
// -bool: Any eligible hosts found to download from
// -[23]byte: Process identifier
// -error
func StartNewestUserProfileDownload(userIdentityHash [16]byte, networkType byte, getViewableOnly bool, numberOfHostsToDownloadFrom int, maximumHostsToContact int)(bool, [23]byte, error){
//TODO: Add to temporary downloads range so the profile doesn't get deleted automatically by backgroundJobs
userIdentityType, err := identity.GetIdentityTypeFromIdentityHash(userIdentityHash)
if (err != nil) {
userIdentityHashHex := encoding.EncodeBytesToHexString(userIdentityHash[:])
return false, [23]byte{}, errors.New("StartNewestUserProfileDownload called with invalid user identity hash: " + userIdentityHashHex)
}
isValid := helpers.VerifyNetworkType(networkType)
if (isValid == false){
networkTypeString := helpers.ConvertByteToString(networkType)
return false, [23]byte{}, errors.New("StartNewestUserProfileDownload called with invalid networkType: " + networkTypeString)
}
allEligibleHostsList, err := eligibleHosts.GetEligibleHostsList(networkType)
if (err != nil) { return false, [23]byte{}, err }
hostsToContactList := make([][16]byte, 0)
for _, hostIdentityHash := range allEligibleHostsList{
exists, _, _, _, _, hostRawProfileMap, err := profileStorage.GetNewestUserProfile(hostIdentityHash, networkType)
if (err != nil) { return false, [23]byte{}, err }
if (exists == false){
// Host profile was deleted, skip host
continue
}
hostIsHostingIdentityType, theirHostRangeStart, theirHostRangeEnd, err := hostRanges.GetHostedIdentityHashRangeFromHostRawProfileMap(hostRawProfileMap, userIdentityType)
if (err != nil) { return false, [23]byte{}, err }
if (hostIsHostingIdentityType == false){
// Host is not hosting any profiles of this profileType. Skip to next host
continue
}
isWithinRange, err := byteRange.CheckIfIdentityHashIsWithinRange(theirHostRangeStart, theirHostRangeEnd, userIdentityHash)
if (err != nil) { return false, [23]byte{}, err }
if (isWithinRange == true){
hostsToContactList = append(hostsToContactList, hostIdentityHash)
}
}
if (len(hostsToContactList) == 0){
// No hosts exist who we can download this user's profile from
// User needs to wait for Seekia to download more hosts
return false, [23]byte{}, nil
}
processIdentifier, err := initializeNewProcessObject()
if (err != nil) { return false, [23]byte{}, err }
performDownloads := func(){
var contactedHostsListMutex sync.RWMutex
//We use this list to prevent downloading from the same host twice
contactedHostIdentityHashesList := make([][16]byte, 0)
checkIfHostHasBeenContacted := func(hostIdentityHash [16]byte)bool{
contactedHostsListMutex.RLock()
isContacted := slices.Contains(contactedHostIdentityHashesList, hostIdentityHash)
contactedHostsListMutex.RUnlock()
return isContacted
}
addContactedHostIdentityHashToList := func(contactedHostIdentityHash [16]byte){
contactedHostsListMutex.Lock()
contactedHostIdentityHashesList = append(contactedHostIdentityHashesList, contactedHostIdentityHash)
contactedHostsListMutex.Unlock()
}
var activeDownloadsMutex sync.RWMutex
numberOfActiveDownloads := 0
increaseActiveDownloads := func(){
activeDownloadsMutex.Lock()
numberOfActiveDownloads += 1
activeDownloadsMutex.Unlock()
}
decreaseActiveDownloads := func(){
activeDownloadsMutex.Lock()
numberOfActiveDownloads -= 1
activeDownloadsMutex.Unlock()
}
getNumberOfActiveDownloads := func()int{
activeDownloadsMutex.RLock()
result := numberOfActiveDownloads
activeDownloadsMutex.RUnlock()
return result
}
executeDownloadFromHost := func(hostIdentityHash [16]byte){
executeDownloadFromHostFunction := func()error{
processFound, processIsComplete, errorEncountered, _, _, _, _ := GetProcessInfo(processIdentifier)
if (processFound == false){
return errors.New("executeDownloadFromHostFunction called with uninitialized process.")
}
if (processIsComplete == true || errorEncountered == true){
// Error may have been encountered from a different broadcast goroutine
return nil
}
hostIdentityHashString, identityType, err := identity.EncodeIdentityHashBytesToString(hostIdentityHash)
if (err != nil) { return err }
if (identityType != "Moderator"){
return errors.New("executeDownloadFromHost called with non-moderator identity hash.")
}
hostIdentityHashTrimmed, _, err := helpers.TrimAndFlattenString(hostIdentityHashString, 6)
if (err != nil) { return err }
hostProfileFound, connectionEstablished, connectionIdentifier, err := peerClient.EstablishNewConnectionToHost(false, hostIdentityHash, networkType)
if (err != nil) { return err }
if (hostProfileFound == false){
return nil
}
if (connectionEstablished == false){
err := setProcessProgressDetails(processIdentifier, "Failed to connect to host " + hostIdentityHashTrimmed)
if (err != nil) { return err }
return nil
}
err = setProcessProgressDetails(processIdentifier, "Downloading from host " + hostIdentityHashTrimmed)
if (err != nil) { return err }
minimumRange, maximumRange := byteRange.GetMinimumMaximumIdentityHashBounds()
identityList := [][16]byte{userIdentityHash}
successfulDownload, profilesInfoObjectsList, err := sendRequests.GetProfilesInfoFromHost(connectionIdentifier, hostIdentityHash, networkType, userIdentityType, minimumRange, maximumRange, identityList, nil, true, getViewableOnly)
if (err != nil) { return err }
if (successfulDownload == false){
err := setProcessProgressDetails(processIdentifier, "Failed to download profile from host " + hostIdentityHashTrimmed)
if (err != nil) { return err }
return nil
}
if (len(profilesInfoObjectsList) == 0){
// Host does not have profiles for this user.
// We don't consider this a successful download
// User profile we are trying to get may not exist anywhere
// We need to make sure user is aware that they may never retrieve user's profile, and all downloads will fail
err := increaseProcessHostsMissingContentCount(processIdentifier)
if (err != nil) { return err }
err = setProcessProgressDetails(processIdentifier, "Host " + hostIdentityHashTrimmed + " does not have the user's profile.")
if (err != nil) { return err }
return nil
}
if (len(profilesInfoObjectsList) != 1){
return errors.New("GetProfilesInfoFromHost not validating profilesInfoObjectsList properly")
}
profilesInfoObject := profilesInfoObjectsList[0]
profileHash := profilesInfoObject.ProfileHash
profileCreationTime := profilesInfoObject.ProfileCreationTime
// We check if we already have this profile
alreadyExists, _, err := profileStorage.GetStoredProfile(profileHash)
if (err != nil){ return err }
if (alreadyExists == true){
if (getViewableOnly == true){
err := trustedViewableStatus.AddTrustedProfileIsViewableStatus(profileHash, hostIdentityHash, true)
if (err != nil) { return err }
err = trustedViewableStatus.AddTrustedIdentityIsViewableStatus(userIdentityHash, hostIdentityHash, networkType, true)
if (err != nil) { return err }
}
err := increaseProcessSuccessfulDownloadsCount(processIdentifier)
if (err != nil){ return err }
err = setProcessProgressDetails(processIdentifier, "Host offered profile we already have.")
if (err != nil){ return err }
return nil
}
profileHashesToDownloadList := [][28]byte{profileHash}
expectedProfileIdentityHashesMap := make(map[[28]byte][16]byte)
expectedProfileIdentityHashesMap[profileHash] = userIdentityHash
expectedProfileCreationTimesMap := make(map[[28]byte]int64)
expectedProfileCreationTimesMap[profileHash] = profileCreationTime
processFound, processIsComplete, errorEncountered, _, _, _, _ = GetProcessInfo(processIdentifier)
if (processFound == false){
// This should not happen
return errors.New("Process not found after being found already.")
}
if (processIsComplete == true || errorEncountered == true){
// Error may have been encountered from a different download goroutine
return nil
}
downloadSuccessful, listOfProfiles, err := sendRequests.GetProfilesFromHost(connectionIdentifier, hostIdentityHash, networkType, userIdentityType, profileHashesToDownloadList, expectedProfileIdentityHashesMap, expectedProfileCreationTimesMap, nil)
if (err != nil) { return err }
if (downloadSuccessful == false){
err := setProcessProgressDetails(processIdentifier, "Failed to download profile from host " + hostIdentityHashTrimmed)
if (err != nil){ return err }
return nil
}
if (len(listOfProfiles) == 0){
// Host does not have profiles for this user (after saying they did)
// We don't consider this a successful download
// User profile we are trying to get may not exist anywhere
// We need to make sure user is aware that they may never retrieve user profile, and all downloads will fail
err := increaseProcessHostsMissingContentCount(processIdentifier)
if (err != nil) { return err }
err = setProcessProgressDetails(processIdentifier, "Host " + hostIdentityHashTrimmed + " does not have the user's profile.")
if (err != nil) { return err }
return nil
}
if (len(listOfProfiles) != 1){
return errors.New("sendRequests.GetProfilesFromHost not verifying list of profiles matches requested profile hashes list.")
}
profileBytes := listOfProfiles[0]
profileIsWellFormed, _, err := profileStorage.AddUserProfile(profileBytes)
if (err != nil) { return err }
if (profileIsWellFormed == false){
return errors.New("GetProfilesFromHost not verifying profile is well formed")
}
if (getViewableOnly == true){
err := trustedViewableStatus.AddTrustedProfileIsViewableStatus(profileHash, hostIdentityHash, true)
if (err != nil) { return err }
err = trustedViewableStatus.AddTrustedIdentityIsViewableStatus(userIdentityHash, hostIdentityHash, networkType, true)
if (err != nil) { return err }
}
err = increaseProcessSuccessfulDownloadsCount(processIdentifier)
if (err != nil){ return err }
err = setProcessProgressDetails(processIdentifier, "Successfully downloaded profile from " + hostIdentityHashTrimmed)
if (err != nil) { return err }
return nil
}
err := executeDownloadFromHostFunction()
if (err != nil){
logger.AddLogError("General", err)
err := setProcessEncounteredError(processIdentifier, err)
if (err != nil){
logger.AddLogError("General", err)
}
}
decreaseActiveDownloads()
}
startTime := time.Now().Unix()
for {
processFound, processIsComplete, errorEncountered, _, numberOfSuccessfulDownloads, numberOfHostsMissingContent, _ := GetProcessInfo(processIdentifier)
if (processFound == false){
// This should not happen
logger.AddLogError("General", errors.New("Process not found during manualDownloads loop 1"))
return
}
if (processIsComplete == true || errorEncountered == true){
return
}
if (numberOfSuccessfulDownloads >= numberOfHostsToDownloadFrom){
// We have completed the required number of downloads
// Nothing left to do
break
}
totalAttemptedDownloads := numberOfSuccessfulDownloads + numberOfHostsMissingContent
if (totalAttemptedDownloads >= maximumHostsToContact){
// We have contacted enough hosts
// We are done
break
}
activeDownloads := getNumberOfActiveDownloads()
pendingAndCompletedDownloads := numberOfSuccessfulDownloads + activeDownloads
if (pendingAndCompletedDownloads >= numberOfHostsToDownloadFrom || (numberOfHostsMissingContent + pendingAndCompletedDownloads) >= maximumHostsToContact){
// We are actively downloading from the required/maximum number of hosts
// Wait to see if we need to retry to another host
currentTime := time.Now().Unix()
secondsElapsed := currentTime - startTime
if (secondsElapsed > 150){
// Something has gone wrong. Downloads should timeout before this.
err := setProcessEncounteredError(processIdentifier, errors.New("Profile download failed: Reached timeout."))
if (err != nil) {
logger.AddLogError("General", err)
}
return
}
time.Sleep(time.Second)
continue
}
// We need to start another download
// We find a host we have not contacted yet
//Outputs:
// -bool: We found a host
startNewDownload := func()bool{
for _, hostIdentityHash := range hostsToContactList{
isContacted := checkIfHostHasBeenContacted(hostIdentityHash)
if (isContacted == false){
addContactedHostIdentityHashToList(hostIdentityHash)
increaseActiveDownloads()
go executeDownloadFromHost(hostIdentityHash)
return true
}
}
return false
}
foundHost := startNewDownload()
if (foundHost == false){
// There are not enough hosts to contact the requested numberOfHostsToDownloadFrom
break
}
}
// We wait for downloads to complete
secondsElapsed := 0
for {
processFound, processIsComplete, errorEncountered, _, _, _, _ := GetProcessInfo(processIdentifier)
if (processFound == false){
// This should not happen
logger.AddLogError("General", errors.New("Process not found while waiting for manual download to complete"))
return
}
if (processIsComplete == true || errorEncountered == true){
// Error may have been encountered from a different broadcast goroutine
return
}
activeDownloads := getNumberOfActiveDownloads()
if (activeDownloads <= 0){
// We are done waiting for downloads to complete
break
}
time.Sleep(time.Second)
secondsElapsed += 1
if (secondsElapsed > 100){
// Something has gone wrong.
err := setProcessEncounteredError(processIdentifier, errors.New("Download failed: reached timeout."))
if (err != nil){
logger.AddLogError("General", err)
}
return
}
}
err := setProcessIsComplete(processIdentifier)
if (err != nil){
logger.AddLogError("General", err)
return
}
processFound, processIsComplete, errorEncountered, _, numberOfSuccessfulDownloads, numberOfHostsMissingContent, _ := GetProcessInfo(processIdentifier)
if (processFound == false){
// This should not happen
return
}
if (processIsComplete == false){
// This should not happen
err := setProcessEncounteredError(processIdentifier, errors.New("setProcessIsComplete is not working."))
if (err != nil) {
logger.AddLogError("General", err)
}
return
}
if (errorEncountered == true){
return
}
getFinalDownloadStatus := func()string{
if (numberOfSuccessfulDownloads != 0){
numberOfSuccessfulDownloadsString := helpers.ConvertIntToString(numberOfSuccessfulDownloads)
return "Downloaded profile from " + numberOfSuccessfulDownloadsString + " hosts."
}
if (numberOfHostsMissingContent == 0){
// We were unable to download from any hosts
return "Download failed: Hosts unavailable."
}
// We downloaded from hosts, but none of them had the user's profile
return "Download failed: Content unavailable."
}
finalDownloadStatus := getFinalDownloadStatus()
err = setProcessProgressDetails(processIdentifier, finalDownloadStatus)
if (err != nil){
logger.AddLogError("General", err)
return
}
}
go performDownloads()
return true, processIdentifier, nil
}