[SDK] Refactor keep parallel write strategy
arvados.keep.KeepClient.put creates one KeepWriterThread per keep server and then uses a complex (and error prone) locking strategy implemented in ThreadLimiter to ensure that only certain threads perform uploads in a certain order.
Refactor this code to use the following alternate strategy:
- For N wanted copies create N upload threads
- Start a new upload to the next server in sorted_roots when an upload fails
- The server response may include a header
x-keep-replicas-stored, if so the client's replica count should be incremented by the number in the header
- If the service lists consists only of "disk" type services, start up N parallel write threads for each copy; if the service list has non-"disk" services (such as "proxy") perform one upload at a time and look for the
- Don't write to servers that are read only (this should already be handled by
You may want to use a Queue (instead of explicit locks) to communicate between the main thread and the upload threads.
Consider setting up a thread pool attached to the keep client object and dispatching work instead of spawning new threads.
Two things to keep in mind:
- The order that it tries to upload to each server matters (because the earlier servers are preferred over the later ones)
- If it goes through the entire list without uploading sufficient replicas, it should try again. However, when it does so, it should:
- remember how many replicas have already been uploaded (so if it wanted 3 and got 2 on the first pass, it only needs 1 more)
- not try to upload to services to which it did a successful upload or did not get a retryable failure code in the previous round. Retryable failure codes (from the Go SDK) are 408 (Request Timeout), 429 (Too Many Requests), and any 5xx error except 503 (which means "Server Full").
#11 Updated by Peter Amstutz over 4 years ago
KeepWriterQueue.pending_copies() should take successful_copies_lock.
def pending_copies(self): with self.successful_copies_lock: return self.wanted_copies - self.successful_copies
This is confusing:
if not self.queue.retries > 0:
It would be clearer as:
if self.queue.retries <= 0:
self.queue.task_done() unless a task was actually removed from the queue. If there is a race and
Queue.Empty exception is thrown, the task could get double-counted.
Add task_done to the bottom of the
if self.queue.pending_copies() > 0: block and inside the exception handler of the else branch:
if self.queue.pending_copies() > 0: .... # Mark as done so the queue can be join()ed self.queue.task_done() else: # Remove the task from the queue anyways try: self.queue.get_nowait() # Mark as done so the queue can be join()ed self.queue.task_done() except Queue.Empty: continue
Using "retry" for "try the next service in the queue" is a little confusing, because the term "retry" is used a lot in the Arvados SDK to mean "try the same service again". Considering using a different term, maybe "pending_tries" or "pending_attempts".
The rest of it looks good! Thanks!