Feature #11015
closed
Improve throughput of crunch-run output-uploading stage using multi-threaded transfers
Added by Tom Clegg almost 8 years ago.
Updated almost 8 years ago.
Assigned To:
Radhika Chippada
Description
To improve throughput of crunch-run job output uploading, add support for multi-threaded asynchronous transfers to hide the latency inherent in cloud environments.
Refactoring to support public APIs in the Go SDK is a separate task.
- Description updated (diff)
- Target version set to Arvados Future Sprints
- Target version changed from Arvados Future Sprints to 2017-03-01 sprint
- Description updated (diff)
- Subject changed from Improve throughput of crunch-run output-uploading stage using multithreaded transfers to Improve throughput of crunch-run output-uploading stage using multi-threaded transfers
- Description updated (diff)
- Assigned To set to Radhika Chippada
- Story points set to 2.0
Possible implementation
In (*CollectionFileWriter)goUpload(), currently we have
goUpload() {
for block := range uploader {
compute md5
write to keep
append signed locator to m.ManifestStream.Blocks
(or append err to errors)
}
}
We can add a buffered channel to CollectionWriter and pass it to (CFW)goUpload() and use it to limit the total number of blocks being written by all CFWs of a given CollectionWriter.
func (m *CollectionWriter) Open(path string) io.WriteCloser {
...
m.mtx.Lock()
defer m.mtx.Unlock()
if m.workers == nil {
if m.MaxWriters < 1 {
m.MaxWriters = 1
}
m.workers = make(chan struct{}, m.MaxWriters)
}
go fw.goUpload(m.workers)
m.Streams = append(m.Streams, fw)
...
}
goUpload(workers chan struct{}) {
var blocks []string
var mtx sync.Mutex
for block := range uploader {
mtx.Lock()
blockIndex := len(blocks)
blocks = append(blocks, "")
mtx.Unlock()
workers <- struct{}{} // wait for an available worker slot
wg.Add(1)
go func(blockIndex int) {
compute md5
write to keep
<-workers // release worker slot
mtx.Lock()
m.ManifestStream.Blocks[blockIndex] = signedLocator
(or append err to errors)
mtx.Unlock()
wg.Done()
}(blockIndex)
}
wg.Wait()
finish <- errors
}
- Status changed from New to In Progress
Sorry for taking so long to send comments, was trying to understand the code the best I could.
One question: Are the upload workers number configurable? I'm not able to see where, it seems to be hardcoded to be 2.
I've run services/crunch-run
tests locally without issues.
Are the upload workers number configurable?
Number of writers >1 will help us as this let's us use multiple threads to upload.
Thanks.
- Status changed from In Progress to Resolved
- % Done changed from 0 to 100
Applied in changeset arvados|commit:0b125c52cd816e6a4120c414d3817f354cad1055.
Also available in: Atom
PDF