Project

General

Profile

Idea #3761

Updated by Tom Clegg about 9 years ago

Currently, when receiving its first pull list, At startup, keepstore sets up a WorkQueue will set up: 
 * A BlockList instance called pullq. At "currentPullList" (exists but currently called pullmgr) 
 * A BlockPuller instance that performs work on the same time it should also start a current pull worker goroutine: <pre><code class="go">go RunPullWorker(pullq.NextItem)</code></pre> list (e.g., @go BlockPuller.New(currentPullList).Run()@) 

 The resulting BlockPuller goroutine will run forever, processing pull requests on use the WorkQueue one at a time. BlockList interface to learn what work is to be done. 

 "RunPullWorker" will: 
 The BlockPuller's Run method will perform the following algorithm: 

 * Get the next current pull request. list by calling getList() on the list given during instantiation.[1] 
 * For Walk the pull list. Attempt each server, try Pull(). Stop when {block,server} entry one succeeds. time. Log each attempt and each success. 
 * When a pull succeeds, call removeItem() on the list. (This will delete all pull list entries for that block.)[2] 
 * Repeat. 

 "Pull" will: "Attempt" means: 
 * Generate a random API token[1]. token.[3] 
 * Generate a permission signature using the random token, timestamp ~60 60 seconds in the future, and desired block hash. 
 * Using this token & signature, retrieve the given block from the given keepstore server. 
 * Verify checksum and write checksum. 
 * Write to storage, just as if it had been provided by a client in a PUT transaction. I.e., PutBlock(). 

 RunPullWorker() and Pull() will look something like this: 

 <pre><code class="go"> 
 func RunPullWorker(nextItem <-chan interface{}) { 
   for item := range nextItem { 
     pullReq := item.(PullRequest) 
     for _, addr := range pullReq.Servers { 
       err := pw.Pull(pullReq.Locator, addr) 
       if err == nil { 
         break 
       } 
     } 
   } 
 } 

 func (pw *PullWorker) Pull(addr string, locator string) (err error) { 
   log.Printf("Pull %s/%s starting", locator, addr) 
   defer func() { 
     if err == nil { 
       log.Printf("Pull %s/%s success", addr, locator) 
     } else { 
       log.Printf("Pull %s/%s error: %s", addr, locator, err) 
     } 
   }() 
   // (will also need to set auth headers and add a signature token to fn1. Optionally/eventually, in cases where the locator here) 
   resp, err = http.Get("http://%s/%s", addr, locator) 
   if err { return } 
   data, err = ioutil.ReadAll(resp.Body) 
   if err { return } 
   err = PutBlock(data, locator) 
   return 
 } 
 </code></pre> 

 PullWorker doesn't need to worry about: 
 * Retrying (Data Manager will tell us to do the current pull again, if it's still needed) 
 * Concurrency (we can add concurrency safely & easily by starting multiple PullWorkers) 
 * Noticing when the pull list changes, or is empty (WorkQueue already does all this: we just read from the channel, and something empty, GetList() will arrive when there's block until something for is available. 

 fn2. If this thread to do) 
 * Detecting whether isn't already done, start by stubbing removeItem() as a given pull request is useless, e.g., data already present, before pulling (instead, trust Data Manager to give us useful pull lists, no-op, and be OK with an occasional superfluous GET) revisit after everything else if there's time left in this story. 

 fn1. fn3. Currently, Keep doesn't actually verify API tokens, just the permission signature, so a random token is just as effective as a real one. 

Back