// manualDownloads provides functions to start and monitor manual downloads // These are downloads that the user initiates through the GUI, such as downloading a profile/review/report // The user is able to monitor the status of these downloads through the GUI // These are different from the downloads that are called automatically in the background (see networkJobs.go) package manualDownloads // Types of downloads: // -Download a user's profile // -Download a specific message // -Download a review by reviewHash or reviewedHash // -Download a report by reportedHash // -Download all reviews by a reviewer // -Download all reviews for a reviewedHash // -Download all reports for a reportedHash // -Update a user's identity score/balance import "seekia/internal/byteRange" import "seekia/internal/encoding" import "seekia/internal/helpers" import "seekia/internal/identity" import "seekia/internal/logger" import "seekia/internal/moderation/trustedViewableStatus" import "seekia/internal/network/eligibleHosts" import "seekia/internal/network/hostRanges" import "seekia/internal/network/peerClient" import "seekia/internal/network/sendRequests" import "seekia/internal/profiles/profileStorage" import "errors" import "time" import "sync" import "slices" // This object stores data about active processes type processObject struct{ // Set to true once process is complete. Will be true if we encounter an error isComplete bool // Set to true if we encounter an error encounteredError bool // The error we encountered errorEncountered error // Stores the number of hosts that have been successfully downloaded from numberOfSuccessfulDownloads int // Stores the number of hosts we tried to download from whom did not have the content numberOfHostsMissingContent int // Stores details about progress status, which are shown to user in GUI progressDetails string } var processObjectsMapMutex sync.RWMutex var processObjectsMap map[[23]byte]processObject = make(map[[23]byte]processObject) //Outputs: // -[23]byte: New process identifier // -error func initializeNewProcessObject()([23]byte, error){ processIdentifierBytes, err := helpers.GetNewRandomBytes(23) if (err != nil) { return [23]byte{}, err } processIdentifier := [23]byte(processIdentifierBytes) newProcessObject := processObject{ isComplete: false, encounteredError: false, errorEncountered: nil, numberOfSuccessfulDownloads: 0, numberOfHostsMissingContent: 0, progressDetails: "", } processObjectsMapMutex.Lock() processObjectsMap[processIdentifier] = newProcessObject processObjectsMapMutex.Unlock() return processIdentifier, nil } func increaseProcessSuccessfulDownloadsCount(processIdentifier [23]byte)error{ processObjectsMapMutex.Lock() defer processObjectsMapMutex.Unlock() processObject, exists := processObjectsMap[processIdentifier] if (exists == false){ processIdentifierHex := encoding.EncodeBytesToHexString(processIdentifier[:]) return errors.New("increaseProcessSuccessfulDownloadsCount called with uninitialized process: " + processIdentifierHex) } processObject.numberOfSuccessfulDownloads += 1 processObjectsMap[processIdentifier] = processObject return nil } func increaseProcessHostsMissingContentCount(processIdentifier [23]byte)error{ processObjectsMapMutex.Lock() defer processObjectsMapMutex.Unlock() processObject, exists := processObjectsMap[processIdentifier] if (exists == false){ processIdentifierHex := encoding.EncodeBytesToHexString(processIdentifier[:]) return errors.New("increaseProcessHostsMissingContentCount called with uninitialized process: " + processIdentifierHex) } processObject.numberOfHostsMissingContent += 1 processObjectsMap[processIdentifier] = processObject return nil } func setProcessProgressDetails(processIdentifier [23]byte, newProgressDetails string)error{ processObjectsMapMutex.Lock() defer processObjectsMapMutex.Unlock() processObject, exists := processObjectsMap[processIdentifier] if (exists == false){ processIdentifierHex := encoding.EncodeBytesToHexString(processIdentifier[:]) return errors.New("setProcessProgressDetails called with uninitialized process: " + processIdentifierHex) } processObject.progressDetails = newProgressDetails processObjectsMap[processIdentifier] = processObject return nil } func setProcessEncounteredError(processIdentifier [23]byte, errorEncountered error)error{ processObjectsMapMutex.Lock() defer processObjectsMapMutex.Unlock() processObject, exists := processObjectsMap[processIdentifier] if (exists == false){ processIdentifierHex := encoding.EncodeBytesToHexString(processIdentifier[:]) return errors.New("setProcessEncounteredError called with uninitialized process: " + processIdentifierHex) } processObject.isComplete = true processObject.encounteredError = true processObject.errorEncountered = errorEncountered processObjectsMap[processIdentifier] = processObject return nil } func setProcessIsComplete(processIdentifier [23]byte)error{ processObjectsMapMutex.Lock() defer processObjectsMapMutex.Unlock() processObject, exists := processObjectsMap[processIdentifier] if (exists == false){ processIdentifierHex := encoding.EncodeBytesToHexString(processIdentifier[:]) return errors.New("setProcessIsComplete called with uninitialized process: " + processIdentifierHex) } processObject.isComplete = true processObjectsMap[processIdentifier] = processObject return nil } //Outputs: // -bool: Process found // -bool: Process is complete status // -bool: Process encountered error // -error: Error encountered by process // -int: Number of hosts successfully downloaded from (These are hosts who gave us the content we are trying to download) // -int: Number of hosts missing content (These are hosts that have said they do not have the content) // -string: Process progress details func GetProcessInfo(processIdentifier [23]byte)(bool, bool, bool, error, int, int, string){ processObjectsMapMutex.RLock() defer processObjectsMapMutex.RUnlock() processObject, exists := processObjectsMap[processIdentifier] if (exists == false) { return false, false, false, nil, 0, 0, "" } processIsComplete := processObject.isComplete processEncounteredError := processObject.encounteredError processError := processObject.errorEncountered processNumberOfSuccessfulDownloads := processObject.numberOfSuccessfulDownloads processNumberOfHostsMissingContent := processObject.numberOfHostsMissingContent processProgressDetails := processObject.progressDetails return true, processIsComplete, processEncounteredError, processError, processNumberOfSuccessfulDownloads, processNumberOfHostsMissingContent, processProgressDetails } func EndProcess(processIdentifier [23]byte)error{ err := setProcessIsComplete(processIdentifier) if (err != nil){ return err } err = setProcessProgressDetails(processIdentifier, "Download manually stopped.") if (err != nil) { return err } return nil } //Inputs: // -[16]byte: User identity hash whose profile we are downloading // -byte: Network type to retrieve from // -bool: Get viewable profiles only // -int: Number of hosts to query (This is the number of hosts to request a successful download from) // -A successful download is one where the host either offered a profile we already have, or we downloaded a new profile // -If they say they have no profile, the download is not successful // -int: Maximum number of hosts to query //Outputs: // -bool: Any eligible hosts found to download from // -[23]byte: Process identifier // -error func StartNewestUserProfileDownload(userIdentityHash [16]byte, networkType byte, getViewableOnly bool, numberOfHostsToDownloadFrom int, maximumHostsToContact int)(bool, [23]byte, error){ //TODO: Add to temporary downloads range so the profile doesn't get deleted automatically by backgroundJobs userIdentityType, err := identity.GetIdentityTypeFromIdentityHash(userIdentityHash) if (err != nil) { userIdentityHashHex := encoding.EncodeBytesToHexString(userIdentityHash[:]) return false, [23]byte{}, errors.New("StartNewestUserProfileDownload called with invalid user identity hash: " + userIdentityHashHex) } isValid := helpers.VerifyNetworkType(networkType) if (isValid == false){ networkTypeString := helpers.ConvertByteToString(networkType) return false, [23]byte{}, errors.New("StartNewestUserProfileDownload called with invalid networkType: " + networkTypeString) } allEligibleHostsList, err := eligibleHosts.GetEligibleHostsList(networkType) if (err != nil) { return false, [23]byte{}, err } hostsToContactList := make([][16]byte, 0) for _, hostIdentityHash := range allEligibleHostsList{ exists, _, _, _, _, hostRawProfileMap, err := profileStorage.GetNewestUserProfile(hostIdentityHash, networkType) if (err != nil) { return false, [23]byte{}, err } if (exists == false){ // Host profile was deleted, skip host continue } hostIsHostingIdentityType, theirHostRangeStart, theirHostRangeEnd, err := hostRanges.GetHostedIdentityHashRangeFromHostRawProfileMap(hostRawProfileMap, userIdentityType) if (err != nil) { return false, [23]byte{}, err } if (hostIsHostingIdentityType == false){ // Host is not hosting any profiles of this profileType. Skip to next host continue } isWithinRange, err := byteRange.CheckIfIdentityHashIsWithinRange(theirHostRangeStart, theirHostRangeEnd, userIdentityHash) if (err != nil) { return false, [23]byte{}, err } if (isWithinRange == true){ hostsToContactList = append(hostsToContactList, hostIdentityHash) } } if (len(hostsToContactList) == 0){ // No hosts exist who we can download this user's profile from // User needs to wait for Seekia to download more hosts return false, [23]byte{}, nil } processIdentifier, err := initializeNewProcessObject() if (err != nil) { return false, [23]byte{}, err } performDownloads := func(){ var contactedHostsListMutex sync.RWMutex //We use this list to prevent downloading from the same host twice contactedHostIdentityHashesList := make([][16]byte, 0) checkIfHostHasBeenContacted := func(hostIdentityHash [16]byte)bool{ contactedHostsListMutex.RLock() isContacted := slices.Contains(contactedHostIdentityHashesList, hostIdentityHash) contactedHostsListMutex.RUnlock() return isContacted } addContactedHostIdentityHashToList := func(contactedHostIdentityHash [16]byte){ contactedHostsListMutex.Lock() contactedHostIdentityHashesList = append(contactedHostIdentityHashesList, contactedHostIdentityHash) contactedHostsListMutex.Unlock() } var activeDownloadsMutex sync.RWMutex numberOfActiveDownloads := 0 increaseActiveDownloads := func(){ activeDownloadsMutex.Lock() numberOfActiveDownloads += 1 activeDownloadsMutex.Unlock() } decreaseActiveDownloads := func(){ activeDownloadsMutex.Lock() numberOfActiveDownloads -= 1 activeDownloadsMutex.Unlock() } getNumberOfActiveDownloads := func()int{ activeDownloadsMutex.RLock() result := numberOfActiveDownloads activeDownloadsMutex.RUnlock() return result } executeDownloadFromHost := func(hostIdentityHash [16]byte){ executeDownloadFromHostFunction := func()error{ processFound, processIsComplete, errorEncountered, _, _, _, _ := GetProcessInfo(processIdentifier) if (processFound == false){ return errors.New("executeDownloadFromHostFunction called with uninitialized process.") } if (processIsComplete == true || errorEncountered == true){ // Error may have been encountered from a different broadcast goroutine return nil } hostIdentityHashString, identityType, err := identity.EncodeIdentityHashBytesToString(hostIdentityHash) if (err != nil) { return err } if (identityType != "Moderator"){ return errors.New("executeDownloadFromHost called with non-moderator identity hash.") } hostIdentityHashTrimmed, _, err := helpers.TrimAndFlattenString(hostIdentityHashString, 6) if (err != nil) { return err } hostProfileFound, connectionEstablished, connectionIdentifier, err := peerClient.EstablishNewConnectionToHost(false, hostIdentityHash, networkType) if (err != nil) { return err } if (hostProfileFound == false){ return nil } if (connectionEstablished == false){ err := setProcessProgressDetails(processIdentifier, "Failed to connect to host " + hostIdentityHashTrimmed) if (err != nil) { return err } return nil } err = setProcessProgressDetails(processIdentifier, "Downloading from host " + hostIdentityHashTrimmed) if (err != nil) { return err } minimumRange, maximumRange := byteRange.GetMinimumMaximumIdentityHashBounds() identityList := [][16]byte{userIdentityHash} successfulDownload, profilesInfoObjectsList, err := sendRequests.GetProfilesInfoFromHost(connectionIdentifier, hostIdentityHash, networkType, userIdentityType, minimumRange, maximumRange, identityList, nil, true, getViewableOnly) if (err != nil) { return err } if (successfulDownload == false){ err := setProcessProgressDetails(processIdentifier, "Failed to download profile from host " + hostIdentityHashTrimmed) if (err != nil) { return err } return nil } if (len(profilesInfoObjectsList) == 0){ // Host does not have profiles for this user. // We don't consider this a successful download // User profile we are trying to get may not exist anywhere // We need to make sure user is aware that they may never retrieve user's profile, and all downloads will fail err := increaseProcessHostsMissingContentCount(processIdentifier) if (err != nil) { return err } err = setProcessProgressDetails(processIdentifier, "Host " + hostIdentityHashTrimmed + " does not have the user's profile.") if (err != nil) { return err } return nil } if (len(profilesInfoObjectsList) != 1){ return errors.New("GetProfilesInfoFromHost not validating profilesInfoObjectsList properly") } profilesInfoObject := profilesInfoObjectsList[0] profileHash := profilesInfoObject.ProfileHash profileBroadcastTime := profilesInfoObject.ProfileBroadcastTime // We check if we already have this profile alreadyExists, _, err := profileStorage.GetStoredProfile(profileHash) if (err != nil){ return err } if (alreadyExists == true){ if (getViewableOnly == true){ err := trustedViewableStatus.AddTrustedProfileIsViewableStatus(profileHash, hostIdentityHash, true) if (err != nil) { return err } err = trustedViewableStatus.AddTrustedIdentityIsViewableStatus(userIdentityHash, hostIdentityHash, networkType, true) if (err != nil) { return err } } err := increaseProcessSuccessfulDownloadsCount(processIdentifier) if (err != nil){ return err } err = setProcessProgressDetails(processIdentifier, "Host offered profile we already have.") if (err != nil){ return err } return nil } profileHashesToDownloadList := [][28]byte{profileHash} expectedProfileIdentityHashesMap := make(map[[28]byte][16]byte) expectedProfileIdentityHashesMap[profileHash] = userIdentityHash expectedProfileBroadcastTimesMap := make(map[[28]byte]int64) expectedProfileBroadcastTimesMap[profileHash] = profileBroadcastTime processFound, processIsComplete, errorEncountered, _, _, _, _ = GetProcessInfo(processIdentifier) if (processFound == false){ // This should not happen return errors.New("Process not found after being found already.") } if (processIsComplete == true || errorEncountered == true){ // Error may have been encountered from a different download goroutine return nil } downloadSuccessful, listOfProfiles, err := sendRequests.GetProfilesFromHost(connectionIdentifier, hostIdentityHash, networkType, userIdentityType, profileHashesToDownloadList, expectedProfileIdentityHashesMap, expectedProfileBroadcastTimesMap, nil) if (err != nil) { return err } if (downloadSuccessful == false){ err := setProcessProgressDetails(processIdentifier, "Failed to download profile from host " + hostIdentityHashTrimmed) if (err != nil){ return err } return nil } if (len(listOfProfiles) == 0){ // Host does not have profiles for this user (after saying they did) // We don't consider this a successful download // User profile we are trying to get may not exist anywhere // We need to make sure user is aware that they may never retrieve user profile, and all downloads will fail err := increaseProcessHostsMissingContentCount(processIdentifier) if (err != nil) { return err } err = setProcessProgressDetails(processIdentifier, "Host " + hostIdentityHashTrimmed + " does not have the user's profile.") if (err != nil) { return err } return nil } if (len(listOfProfiles) != 1){ return errors.New("sendRequests.GetProfilesFromHost not verifying list of profiles matches requested profile hashes list.") } profileBytes := listOfProfiles[0] profileIsWellFormed, _, err := profileStorage.AddUserProfile(profileBytes) if (err != nil) { return err } if (profileIsWellFormed == false){ return errors.New("GetProfilesFromHost not verifying profile is well formed") } if (getViewableOnly == true){ err := trustedViewableStatus.AddTrustedProfileIsViewableStatus(profileHash, hostIdentityHash, true) if (err != nil) { return err } err = trustedViewableStatus.AddTrustedIdentityIsViewableStatus(userIdentityHash, hostIdentityHash, networkType, true) if (err != nil) { return err } } err = increaseProcessSuccessfulDownloadsCount(processIdentifier) if (err != nil){ return err } err = setProcessProgressDetails(processIdentifier, "Successfully downloaded profile from " + hostIdentityHashTrimmed) if (err != nil) { return err } return nil } err := executeDownloadFromHostFunction() if (err != nil){ logger.AddLogError("General", err) err := setProcessEncounteredError(processIdentifier, err) if (err != nil){ logger.AddLogError("General", err) } } decreaseActiveDownloads() } startTime := time.Now().Unix() for { processFound, processIsComplete, errorEncountered, _, numberOfSuccessfulDownloads, numberOfHostsMissingContent, _ := GetProcessInfo(processIdentifier) if (processFound == false){ // This should not happen logger.AddLogError("General", errors.New("Process not found during manualDownloads loop 1")) return } if (processIsComplete == true || errorEncountered == true){ return } if (numberOfSuccessfulDownloads >= numberOfHostsToDownloadFrom){ // We have completed the required number of downloads // Nothing left to do break } totalAttemptedDownloads := numberOfSuccessfulDownloads + numberOfHostsMissingContent if (totalAttemptedDownloads >= maximumHostsToContact){ // We have contacted enough hosts // We are done break } activeDownloads := getNumberOfActiveDownloads() pendingAndCompletedDownloads := numberOfSuccessfulDownloads + activeDownloads if (pendingAndCompletedDownloads >= numberOfHostsToDownloadFrom || (numberOfHostsMissingContent + pendingAndCompletedDownloads) >= maximumHostsToContact){ // We are actively downloading from the required/maximum number of hosts // Wait to see if we need to retry to another host currentTime := time.Now().Unix() secondsElapsed := currentTime - startTime if (secondsElapsed > 150){ // Something has gone wrong. Downloads should timeout before this. err := setProcessEncounteredError(processIdentifier, errors.New("Profile download failed: Reached timeout.")) if (err != nil) { logger.AddLogError("General", err) } return } time.Sleep(time.Second) continue } // We need to start another download // We find a host we have not contacted yet //Outputs: // -bool: We found a host startNewDownload := func()bool{ for _, hostIdentityHash := range hostsToContactList{ isContacted := checkIfHostHasBeenContacted(hostIdentityHash) if (isContacted == false){ addContactedHostIdentityHashToList(hostIdentityHash) increaseActiveDownloads() go executeDownloadFromHost(hostIdentityHash) return true } } return false } foundHost := startNewDownload() if (foundHost == false){ // There are not enough hosts to contact the requested numberOfHostsToDownloadFrom break } } // We wait for downloads to complete secondsElapsed := 0 for { processFound, processIsComplete, errorEncountered, _, _, _, _ := GetProcessInfo(processIdentifier) if (processFound == false){ // This should not happen logger.AddLogError("General", errors.New("Process not found while waiting for manual download to complete")) return } if (processIsComplete == true || errorEncountered == true){ // Error may have been encountered from a different broadcast goroutine return } activeDownloads := getNumberOfActiveDownloads() if (activeDownloads <= 0){ // We are done waiting for downloads to complete break } time.Sleep(time.Second) secondsElapsed += 1 if (secondsElapsed > 100){ // Something has gone wrong. err := setProcessEncounteredError(processIdentifier, errors.New("Download failed: reached timeout.")) if (err != nil){ logger.AddLogError("General", err) } return } } err := setProcessIsComplete(processIdentifier) if (err != nil){ logger.AddLogError("General", err) return } processFound, processIsComplete, errorEncountered, _, numberOfSuccessfulDownloads, numberOfHostsMissingContent, _ := GetProcessInfo(processIdentifier) if (processFound == false){ // This should not happen return } if (processIsComplete == false){ // This should not happen err := setProcessEncounteredError(processIdentifier, errors.New("setProcessIsComplete is not working.")) if (err != nil) { logger.AddLogError("General", err) } return } if (errorEncountered == true){ return } getFinalDownloadStatus := func()string{ if (numberOfSuccessfulDownloads != 0){ numberOfSuccessfulDownloadsString := helpers.ConvertIntToString(numberOfSuccessfulDownloads) return "Downloaded profile from " + numberOfSuccessfulDownloadsString + " hosts." } if (numberOfHostsMissingContent == 0){ // We were unable to download from any hosts return "Download failed: Hosts unavailable." } // We downloaded from hosts, but none of them had the user's profile return "Download failed: Content unavailable." } finalDownloadStatus := getFinalDownloadStatus() err = setProcessProgressDetails(processIdentifier, finalDownloadStatus) if (err != nil){ logger.AddLogError("General", err) return } } go performDownloads() return true, processIdentifier, nil }