Feature #11015

Improve throughput of crunch-run output-uploading stage using multi-threaded transfers

Added by Tom Clegg 5 months ago. Updated 4 months ago.

Status:ResolvedStart date:01/31/2017
Priority:NormalDue date:
Assignee:Radhika Chippada% Done:

100%

Category:-
Target version:2017-03-01 sprint
Story points2.0Remaining (hours)0.00 hour
Velocity based estimate-

Description

To improve throughput of crunch-run job output uploading, add support for multi-threaded asynchronous transfers to hide the latency inherent in cloud environments.

Refactoring to support public APIs in the Go SDK is a separate task.


Subtasks

Task #11108: Review 11015-crunch-run-output-uploadResolvedRadhika Chippada

Associated revisions

Revision 0b125c52
Added by Radhika Chippada 4 months ago

closes #11015
Merge branch '11015-crunch-run-output-upload'

History

#1 Updated by Tom Morris 5 months ago

  • Description updated (diff)

#2 Updated by Tom Morris 4 months ago

  • Target version set to Arvados Future Sprints

#3 Updated by Tom Morris 4 months ago

  • Target version changed from Arvados Future Sprints to 2017-03-01 sprint

#4 Updated by Tom Clegg 4 months ago

  • Description updated (diff)

#5 Updated by Tom Morris 4 months ago

  • Subject changed from Improve throughput of crunch-run output-uploading stage using multithreaded transfers to Improve throughput of crunch-run output-uploading stage using multi-threaded transfers
  • Description updated (diff)
  • Assignee set to Radhika Chippada
  • Story points set to 2.0

#6 Updated by Tom Clegg 4 months ago

Possible implementation

In (*CollectionFileWriter)goUpload(), currently we have

goUpload() {
  for block := range uploader {
    compute md5
    write to keep
    append signed locator to m.ManifestStream.Blocks
    (or append err to errors)
  }
}

We can add a buffered channel to CollectionWriter and pass it to (CFW)goUpload() and use it to limit the total number of blocks being written by all CFWs of a given CollectionWriter.

func (m *CollectionWriter) Open(path string) io.WriteCloser {
  ...
  m.mtx.Lock()
  defer m.mtx.Unlock()
  if m.workers == nil {
    if m.MaxWriters < 1 {
      m.MaxWriters = 1
    }
    m.workers = make(chan struct{}, m.MaxWriters)
  }
  go fw.goUpload(m.workers)
  m.Streams = append(m.Streams, fw)
  ...
}

goUpload(workers chan struct{}) {
  var blocks []string
  var mtx sync.Mutex
  for block := range uploader {
    mtx.Lock()
    blockIndex := len(blocks)
    blocks = append(blocks, "")
    mtx.Unlock()

    workers <- struct{}{}    // wait for an available worker slot
    wg.Add(1)

    go func(blockIndex int) {
      compute md5
      write to keep
      <-workers              // release worker slot

      mtx.Lock()
      m.ManifestStream.Blocks[blockIndex] = signedLocator
      (or append err to errors)
      mtx.Unlock()

      wg.Done()
    }(blockIndex)
  }
  wg.Wait()
  finish <- errors
}

#7 Updated by Radhika Chippada 4 months ago

  • Status changed from New to In Progress

#8 Updated by Lucas Di Pentima 4 months ago

Sorry for taking so long to send comments, was trying to understand the code the best I could.
One question: Are the upload workers number configurable? I'm not able to see where, it seems to be hardcoded to be 2.

I've run services/crunch-run tests locally without issues.

#9 Updated by Radhika Chippada 4 months ago

Are the upload workers number configurable?

Number of writers >1 will help us as this let's us use multiple threads to upload.

Thanks.

#10 Updated by Radhika Chippada 4 months ago

  • Status changed from In Progress to Resolved
  • % Done changed from 0 to 100

Applied in changeset arvados|commit:0b125c52cd816e6a4120c414d3817f354cad1055.

Also available in: Atom PDF