Story #2798

Go Keep client library

Added by Peter Amstutz over 7 years ago. Updated over 7 years ago.

Status:
Resolved
Priority:
Normal
Assigned To:
Category:
-
Start date:
05/08/2014
Due date:
% Done:

100%

Estimated time:
(Total: 48.00 h)
Story points:
2.0

Subtasks

Task #2791: Write keep client library in GoResolvedPeter Amstutz

Task #2795: Write tests for Keep client libraryResolvedPeter Amstutz

Task #2824: Review 2798-go-keep-clientResolvedTim Pierce

Associated revisions

Revision d5823126
Added by Peter Amstutz over 7 years ago

Merge branch '2798-go-keep-client' closes #2798

History

#1 Updated by Peter Amstutz over 7 years ago

  • Project changed from Umbrella Project to Arvados

#2 Updated by Peter Amstutz over 7 years ago

  • Assigned To set to Peter Amstutz

#3 Updated by Tim Pierce over 7 years ago

Notes on 27f5c16:

  • Packages and methods should have doc comments as described in http://golang.org/doc/effective_go.html#commentary, so we can generate SDK documentation with godoc if necessary.
  • KeepDisks:
    • Let's rename to KeepServers. This usage has always confused me.
    • I'd move DiscoverKeepDisks to an apiclient package, to enforce a clean Keep/apiserver boundary, and make it the caller's responsibility to find out what Keep servers to use. The client can be provisioned with Keep servers via a method like AddServers(srv []KeepServer) or something similar.
  • Some KeepClient methods are declared func (this *KeepClient) and some func (this KeepClient). These should all be func (this *KeepClient), both for consistency and so that a method can modify the client's state if necessary.
    • Consequently, KeepClient.client does not need to be declared as a pointer to http.Client.
    • Rename KeepClient.client to something more descriptive, maybe KeepClient.http?
  • Move all of the new error declarations to the top of the file in a var block.
  • Declare 64*1024*1024 as a const, max_block_size or something. (We could even make it a var and let users change it, if we want to be real radical.)

BufferReader and Transfer:

As I understand this, the purpose of structuring this code is so that we can start pipelining data to Keep servers even before it's all been read. Here's an alternative approach for Transfer() that I think would be simpler:
  • Create an array of buffered byte channels each with max_block_size capacity
  • Launch one uploadToKeepServer goroutine for each channel
  • Read bytes from source_reader and write them directly to each of the outbound channels
  • uploadToKeepServer reads bytes from its inbound channel and writes them directly to the outbound PUT
  • uploadToKeepServer signals that it's finished by writing to a done channel
  • Transfer() counts the number of responses from done and returns when all goroutines have completed.

There is some replication of data across the buffered channels, but unless a goroutine hangs (e.g. network timeout) it is unlikely ever to be more than a relatively small amount of data.

#4 Updated by Peter Amstutz over 7 years ago

  1. I will update the comments to conform to the appropriate format, thanks for the link
  2. DiscoverKeepDisks renamed to DiscoverKeepServers()
    • Actually doing the HTTP GET /aravados/v1/keep_disks will be replaced by Google API client when that gets written
    • Some version of DiscoverKeepServers() is still required to initialize the Service_roots array, which other KeepClient methods expect to have a specific form
  3. Distinction between func (this *KeepClient) and func (this KeepClient) is intentional, to indicate that those methods are free of side effects on 'this'
    • Actually DiscoverKeepServers() is the only one that takes a pointer and modifies KeepClient, because it sets Service_roots
  4. Error declarations moved to top
  5. Added BLOCKSIZE constant
  6. Per our offline conversation, BufferReader() and Transfer() are specifically designed to work off of a single buffer to minimize copying, memory footprint, and reader/writer stall. The implementation is not Keep specific, and could be moved into its own file or package.

#5 Updated by Tim Pierce over 7 years ago

Peter Amstutz wrote:

  1. Distinction between func (this *KeepClient) and func (this KeepClient) is intentional, to indicate that those methods are free of side effects on 'this'

In my experience, that means that some methods have to be called as keepclient.Foo() and some as &keepclient.Foo() (or possibly *keepclient.Foo() depending on how it's been allocated). Are you finding that not to be the case?

  1. Per our offline conversation, BufferReader() and Transfer() are specifically designed to work off of a single buffer to minimize copying, memory footprint, and reader/writer stall. The implementation is not Keep specific, and could be moved into its own file or package.

I'm still trying to convince myself that this implementation makes sense here:

One of the core features of the BufferReader code in the Keep client is that a Reader can start pulling data from it immediately, before the buffer has even collected all the data from the source. That's pretty neat!

But the Keep client must compute the MD5 digest for the source data before it attempts to PUT anything to the server. That means that it has to read the entire block into memory anyway. Once the entire source has been read into memory, each goroutine which is writing a PUT request to a server can use a bytes.Buffer to read directly from the original slice without further copies.

I'm not sure I can think of any situation where the client can start pipelining data from disk to the remote Keeps. Someone has to compute a hash, and that requires reading all of the data, and if all of the data has been read, why not just use it as the source?

Can we review the scenarios where this would come into play, so I know what it is that I'm missing? :-)

#6 Updated by Tim Pierce over 7 years ago

Tim Pierce wrote:

I'm not sure I can think of any situation where the client can start pipelining data from disk to the remote Keeps. Someone has to compute a hash, and that requires reading all of the data, and if all of the data has been read, why not just use it as the source?

Can we review the scenarios where this would come into play, so I know what it is that I'm missing? :-)

Ward and I walked through this and the lightbulb went on for me a few minutes ago -- you're writing the Keep client so that the proxy can read requests from arv-put, so the proxy will get a hash before any of the data has arrived. Mystery solved!

#7 Updated by Peter Amstutz over 7 years ago

  1. Buffer reader moved out into its own package, added public API and made internal functions package-private
  2. Added checksum checking
  3. Added API and package comments

Ward and I walked through this and the lightbulb went on for me a few minutes ago -- you're writing the Keep client so that the proxy can read requests from arv-put, so the proxy will get a hash before any of the data has arrived. Mystery solved!

That's right, this was written with the proxy use case at the forefront, however I think this could be useful in other situations:
  • A future FUSE mount written in Go can start relaying data to the filesystem reader without having to wait for the entire block.
  • If the Keep server experiences simultaneous requests for the same block (such as during job startup) it could keep a block cache and start servicing requests even before the block has been completely read off disk.

#8 Updated by Tim Pierce over 7 years ago

Review @ 941bcf698

Thanks for refactoring the buffer code into its own package. This helps a lot with comprehension and I think also with partitioning the code more clearly.

I continue to think that attempting to support staggered writers in this package is not a good idea. It's a scenario we don't presently need to support, and if a TransferBuffer does not need to support readers that are created after it begins streaming data, the flow of the package becomes substantially simpler without any loss of efficiency.

Behavior issues:
  • TransferBuffer.Close()
    • What happens if this is called before the client readers are done? It looks like any outstanding BufferReader will panic when it tries to send a ReadRequest.
    • What happens to pending requests? Do the response channels on them need to be closed explicitly?
  • A TransferBuffer should have a limit on the number of readers it will support. Otherwise, a caller who floods the TransferBuffer with new readers could prevent the switch from ever calling the clause that receives new data from the source.
  • We should explicitly test concurrent BufferReaders on a single TransferBuffer, i.e. launch a few go func() { b := tr.MakeBufferReader(); n, err := b.Read(in); c.Check... } and then start filling the TransferBuffer.
Style, documentation etc:
  • Rename the package and the public types to be more descriptive. The fact that a buffer is involved is relatively unimportant to what the package is supposed to do. Open to ideas, but here are some suggestions that are easier for me to absorb:
    • package buffer -> package streamer
    • TransferBuffer -> AsyncStream
    • BufferReader -> StreamReader
    • buffer.StartTransferFromReader -> streamer.MakeFromReader
    • buffer.StartTransferFromSlice -> streamer.MakeFromSlice
  • Almost all of the methods here modify the receiver in some way (adjusting an offset, closing a channel, etc). Using pointer receivers here is much clearer than having individual struct members declared as pointers.
  • An overview explaining the overall flow of a streaming transaction would be helpful here, particularly on these concepts which took me a while to understand (and I may still not have):
    • What different kinds of structs/objects are created: BufferReader, TransferBuffer, others?
    • What role is played by each of these types and, briefly, how they interact with each other, e.g.:
      • BufferReader.Read sends a request for data on its requests channel to the transfer goroutine, which delivers the next available chunk of data on the supplied responses channel.
      • transfer multiplexes data from the source to all active readers, receiving ReadRequests from each reader and returning a ReadResult. When the source is temporarily exhausted, the ReadRequest is added to a list of pending requests (allowing the reader to block until a result is ready). When new data is available from the source, it is added to a buffer and delivered to any pending requests.
  • bufferWriter appears to basically duplicate bytes.Buffer.
  • handleReadRequest:
    • has a leftover log.Printf debug statement.
    • Document the complete argument.
  • pending_requests looks like it would be clearer if implemented as a doubly linked list: http://golang.org/pkg/container/list/
  • buffer_test.go
    • The structure of these tests is odd -- if there isn't a compelling reason to use blocks to group test clauses, let's drop them (I keep looking to see if they wrap some kind of funny anonymous func() { } syntax).
    • Move TestReadIntoBuffer before ReadIntoBufferHelper
    • What is the value in testing ReadIntoBufferHelper with sizes of both 512 and 225? I see the point in using 224 (exactly the number of bytes available for writing) but both 512 and 225 look like they exercise exactly the same code path.
    • Suggest refactoring ReadIntoBufferHelper into helper functions Write128Bytes and Write96Bytes (which can take a bool argument telling it whether or not to launch a goroutine, for TestReadIntoShortBuffer).

#9 Updated by Peter Amstutz over 7 years ago

"Staggered writers" or "staggered readers"? By "staggered writers" do you mean incrementally filling the stream buffer? "Staggered readers" is the simply the ability to have new readers catch up from already buffered data.

If we wait to receive the entire block then the code is indeed much simpler, but at the cost of latency. The viability of Arvados rests not insignificantly on whether we can achieve high throughput with Keep, so minimizing end-to-end-latency is significant so that applications running on Arvados spend as much time as possible doing useful work and not waiting for blocks. It seems totally reasonable to design for efficiency and concurrency up front.

The design is specifically motivated by the following two cases:

  1. Keep proxy accepts a connection
  2. Proxy starts accepting the PUT body
  3. Proxy connects to 2 (or more) keep servers in parallel
  4. First keep server say ok, ready
  5. Proxy starts relaying data to server #1
  6. Second keep server stalls for a bit and then says oops, my disk is full/I'm slow/I crashed/go away I'm a teapot
  7. Now the proxy needs to connect to an alternate keep server
    • If we already started sending, we're stuffed, because we didn't buffer that data so there's nothing to re-send the new keep server.
    • If we wait to see what each Keep server will say before we start sending, we stall the entire upload due to one misbehaving keep server.
  8. Using AsyncStreamer, we just create a new StreamReader for the new connection which is able to start at the beginning of the buffer and send an intact PUT request.
Second case:
  1. Future arv-mount version written in Go
  2. Customer's program wants to read a file, so arv-mount starts downloading blocks from Keep
    • If arv-mount has to download a whole block before the client program can see it, the customer's program sits around with its thumb up its ass even if it could be doing useful work from incrementals reads
  3. arv-mount starts downloading the block using AsyncStream
  4. arv-mount can service FUSE reads on the block before the whole block is downloaded, so the customer's program can do useful work concurrently with the ongoing download of th eblock

#10 Updated by Peter Amstutz over 7 years ago

  1. Renamed package, types and methods as suggested
  2. AsyncStream.Close() should wait for all readers to be done. I will fix that.
  3. Since I need to track readers in order to ensure that they are all completed before closing the stream, I suppose I can put a limit on the number of concurrent readers.
  4. I will add a new test with concurrent StreamReaders on a single AsyncStream
  5. Your understanding is correct. I will add a "theory of operation" comment that outlines the different processes.
  6. So, according to the documentation bytes.Buffer.Write append()s to its underlying buffer, while my bufferWriter.Write() writes into an already allocated slice. Stupid, I know.
  7. I will add a comment about 'complete' and delete the spurious Printf
  8. Blame Go for not providing better library routines for manipulating slices as general purpose containers. I will add a comment to better explain what's going on there.
    • To split hairs (since we're talking about very small numbers of elements) but containers/list would almost certainly be less efficient than the current approach; it would require more heap allocation; it requires casting to get the values out; and on modern architectures linked lists are almost always slower than arrays due to cache effects...
  9. The use of blocks is stylistic, it is to narrow the scope of the variables so that they are only assigned once. It is easier to reason about and less error prone than re-assigning the same variables over and over again, and more elegant than creating n1, n2, n3, n4 etc...
  10. Also stylistic, I like to put dependent functions before their users. A habit from various programming languages that require that you pre-declare everything before you use it.
  11. That's correct, the size=512 test is redundant with the 225 test. The idea was to do a fencepost tests at buffer size 225, 224 and 223 (the 223 size ended up in its own test).
  12. My rule of thumb is if you do something twice, copy it, if you do it three times, make a function, but I can refactor the WriteBytes functions if you like.

#11 Updated by Peter Amstutz over 7 years ago

2, 3, 4, 5. Done
7. Hopefully covered by "theory of operation" section now
12. I added HelperWrite128andCheck, but HelperWrite96andCheck still can't be used by TestReadIntoShortBuffer because some of the other behavior is slightly different. Aggressively refactoring any more seems like a poor use of time.

#12 Updated by Tim Pierce over 7 years ago

Peter Amstutz wrote:

"Staggered writers" or "staggered readers"? By "staggered writers" do you mean incrementally filling the stream buffer? "Staggered readers" is the simply the ability to have new readers catch up from already buffered data.

If we wait to receive the entire block then the code is indeed much simpler, but at the cost of latency. The viability of Arvados rests not insignificantly on whether we can achieve high throughput with Keep, so minimizing end-to-end-latency is significant so that applications running on Arvados spend as much time as possible doing useful work and not waiting for blocks. It seems totally reasonable to design for efficiency and concurrency up front.

Sorry, I meant staggered readers. I agree that being able to start writing before the block is read is a reasonable goal. I don't object to the streaming architecture per se. Where I think this is getting into the weeds is allowing the caller to create additional readers after the transfer has begun. That's not a feature that helps us in any measurable way, and I believe that dropping it would allow making the architecture substantially simpler.

I know that we're on a tight timetable, so I'm not going to hold up this branch over this, but it makes me very anxious to move forward with something this complex without seeing a compelling justification for the degree of complexity.

Re bufferWriter and pending_requests: this doesn't seem like enough of an efficiency improvement to warrant reimplementing the standard library. If we find in practice that this has become a bottleneck, we can look into the best way to optimize it.

(Also: although the documentation says that bytes.Buffer.Write() "appends" data to the buffer, it doesn't mean the "append" built-in. The implementation is essentially identical to what you've written here, with the exception that bytes.Buffer.Write() grows the buffer as necessary to accommodate the input -- which is unlikely to happen here, since it's starting with a buffer that's our max blocksize anyway.)

ReadIntoBufferHelper: my issue here is just that I find it confusing to read this function before reading the test that it depends on. If you feel strongly about having dependent functions come earlier in the code, that's fine (should we have a coding guideline for that?) but in that case it helps to have it documented so the new reader doesn't get lost in the weeds.

"twice = copy, three times = refactor" makes sense as a rule of thumb. In this case I think the length and repetitiveness of the code makes it worthwhile to refactor just for comprehension. I get that the nature of the tests makes it hard to refactor deeply, I just wanted to improve readability as much as possible.

#13 Updated by Tim Pierce over 7 years ago

LGTM

#14 Updated by Anonymous over 7 years ago

  • Status changed from New to Resolved

Applied in changeset arvados|commit:d5823126a51b7c31915e01fee100abe9468014e5.

Also available in: Atom PDF