Project

General

Profile

Actions

Feature #11015

closed

Improve throughput of crunch-run output-uploading stage using multi-threaded transfers

Added by Tom Clegg about 7 years ago. Updated about 7 years ago.

Status:
Resolved
Priority:
Normal
Assigned To:
Radhika Chippada
Category:
-
Target version:
Story points:
2.0

Description

To improve throughput of crunch-run job output uploading, add support for multi-threaded asynchronous transfers to hide the latency inherent in cloud environments.

Refactoring to support public APIs in the Go SDK is a separate task.


Subtasks 1 (0 open1 closed)

Task #11108: Review 11015-crunch-run-output-uploadResolvedRadhika Chippada01/31/2017Actions
Actions #1

Updated by Tom Morris about 7 years ago

  • Description updated (diff)
Actions #2

Updated by Tom Morris about 7 years ago

  • Target version set to Arvados Future Sprints
Actions #3

Updated by Tom Morris about 7 years ago

  • Target version changed from Arvados Future Sprints to 2017-03-01 sprint
Actions #4

Updated by Tom Clegg about 7 years ago

  • Description updated (diff)
Actions #5

Updated by Tom Morris about 7 years ago

  • Subject changed from Improve throughput of crunch-run output-uploading stage using multithreaded transfers to Improve throughput of crunch-run output-uploading stage using multi-threaded transfers
  • Description updated (diff)
  • Assigned To set to Radhika Chippada
  • Story points set to 2.0
Actions #6

Updated by Tom Clegg about 7 years ago

Possible implementation

In (*CollectionFileWriter)goUpload(), currently we have

goUpload() {
  for block := range uploader {
    compute md5
    write to keep
    append signed locator to m.ManifestStream.Blocks
    (or append err to errors)
  }
}

We can add a buffered channel to CollectionWriter and pass it to (CFW)goUpload() and use it to limit the total number of blocks being written by all CFWs of a given CollectionWriter.

func (m *CollectionWriter) Open(path string) io.WriteCloser {
  ...
  m.mtx.Lock()
  defer m.mtx.Unlock()
  if m.workers == nil {
    if m.MaxWriters < 1 {
      m.MaxWriters = 1
    }
    m.workers = make(chan struct{}, m.MaxWriters)
  }
  go fw.goUpload(m.workers)
  m.Streams = append(m.Streams, fw)
  ...
}

goUpload(workers chan struct{}) {
  var blocks []string
  var mtx sync.Mutex
  for block := range uploader {
    mtx.Lock()
    blockIndex := len(blocks)
    blocks = append(blocks, "")
    mtx.Unlock()

    workers <- struct{}{}    // wait for an available worker slot
    wg.Add(1)

    go func(blockIndex int) {
      compute md5
      write to keep
      <-workers              // release worker slot

      mtx.Lock()
      m.ManifestStream.Blocks[blockIndex] = signedLocator
      (or append err to errors)
      mtx.Unlock()

      wg.Done()
    }(blockIndex)
  }
  wg.Wait()
  finish <- errors
}
Actions #7

Updated by Radhika Chippada about 7 years ago

  • Status changed from New to In Progress
Actions #8

Updated by Lucas Di Pentima about 7 years ago

Sorry for taking so long to send comments, was trying to understand the code the best I could.
One question: Are the upload workers number configurable? I'm not able to see where, it seems to be hardcoded to be 2.

I've run services/crunch-run tests locally without issues.

Actions #9

Updated by Radhika Chippada about 7 years ago

Are the upload workers number configurable?

Number of writers >1 will help us as this let's us use multiple threads to upload.

Thanks.

Actions #10

Updated by Radhika Chippada about 7 years ago

  • Status changed from In Progress to Resolved
  • % Done changed from 0 to 100

Applied in changeset arvados|commit:0b125c52cd816e6a4120c414d3817f354cad1055.

Actions

Also available in: Atom PDF