Project

General

Profile

Websocket server » History » Revision 8

Revision 7 (Peter Amstutz, 08/19/2016 02:40 AM) → Revision 8/12 (Tom Clegg, 10/25/2016 06:03 PM)

h1. Websocket server 

 (draft) (early draft) 

 {{toc}} 

 See also: [[Events API]] 

 h1. Messages 

 Each message is JSON-encoded as an object with exactly one key. The key indicates the message type, and the value contains the message content. 

 This allows clients and servers to decode messages efficiently: decode the first token to determine the message type, then (if the message content is relevant) decode the message payload into an appropriate data structure. 

 <pre><code class="javascript"> 
 good: {"error":{"code":418,"text":"I'm a teapot"}} 

 bad:    {"errorCode":418,"errorText":"I'm a teapot"} 
 </code></pre> 

 Clients must ignore any unrecognized keys they encounter in the payload. This allows the server to add features without breaking existing clients. 

 h2. setAuth Background 

 After establishing a connection, and before subscribing to any streams, the client must supply an authorization token. 

 Successful authorization is acknowledged. 

 <pre><code class="javascript"> 
 client: { 
           "setAuth":{"token":"3kg6k6lzmp9kj5cpkcoxie963cmvjahbt2fod9zru30k1jqdmi"} 
         } 

 server: { 
           "auth":{"uuid":"zzzzz-gj3su-077z32aux8dg2s1"} 
         } 
 </code></pre> 

 Unsuccessful authorization results in an error. 

 <pre><code class="javascript"> 
 client: { 
           "setAuth":{ 
             "token":"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"}} 

 server: { 
           "authError":{ 
             "errorText":"invalid or expired token"}} 
 </code></pre> 

 h2. subscribe 

 Subscribe to an event stream. 

 If the given ETag does not match the current ETag, the server should send an update event right away: this means the client has already missed one or more updates since the version it has cached. 

 <pre><code class="javascript"> 
 client: { 
           "subscribe":{ 
             "uuid":"zzzzz-4zz18-1g4g0vhpjn9wq7i", 
             "etag":"9u32836jpz7i046sd84gu190h"}} 

 server: { 
           "event":{ 
             "msgID":12345, 
             "type":"update", 
             "uuid":"zzzzz-4zz18-1g4g0vhpjn9wq7i", 
             "etag":"1wfdizt65l5w597jf5lojf8jm"}} 
 </code></pre> 

 When a client subscribes to a stream X, but is not authorized to read the object with UUID X (or there is no such object), the server sends an error message. This does not terminate the connection, nor does it affect any other streams. 

 <pre><code class="javascript"> 
 client: { 
           "subscribe":{ 
             "uuid":"zzzzz-tpzed-000000000000000", 
             "etag":"x"}} 

 server: { 
           "subscribeError":{ 
             "uuid":"zzzzz-tpzed-000000000000000", 
             "errorText":"forbidden"}} 
 </code></pre> 

 h2. Container and job logging events 

 [[Events API]] &rarr; "Non-state-changing events" 

 <pre><code class="javascript"> 
 client: { 
           "subscribe":{ 
             "uuid":"zzzzz-dz642-logscontainer03", 
             "etag":"2qtm62j6zb3nx5zud8b5v0ayl", 
             "select":["logs.event_type","logs.properties.text"]}} 

 server: { 
           "event":{ 
             "msgID":12346, 
             "type":"log", 
             "uuid":"zzzzz-dz642-logscontainer03", 
             "etag":"2qtm62j6zb3nx5zud8b5v0ayl", 
             "log":{ 
               "event_type":"stderr", 
               "properties":{ 
                 "text":"foo\n"}}}} 
 </code></pre> 

 h2. Update events 

 h2. Create events 

 h2. Delete events 

 h2. Missed events 

 Zero or more events for a single stream have been skipped: 

 <pre><code class="javascript"> 
 server: { 
           "eventsMissed":{ 
             "msgID":12347, 
             "uuid":"zzzzz-dz642-logscontainer03"}} 
 </code></pre> 

 Zero or more events on one or more of the subscribed streams have been skipped: 

 <pre><code class="javascript"> 
 server: { 
           "eventsMissed":{ 
             "msgID":12348}} 
 </code></pre> 

 h1. Server implementation 

 h2. Architecture 

 Go server with a goroutine serving each connection. 

 One goroutine receives incoming events and assigns msgID numbers. 

 Each connection has an outgoing event queue. Leave room for ability to resize a connection's outgoing queue dynamically, provided no subscriptions are active: this way privileged clients can request bigger queues. 

 Common events should be serialized once and distributed to all connections. This avoids serializing each event N times, and allows outgoing queues to share a single message buffer for a given event. 

 If practical, when a connection's outgoing queue fills up, send a "missed events" signal and discard all buffered events (and, of course, any incoming events that arrive while the buffer is full). After a "missed events" signal the client needs to assume its cache is out of date anyway. Expect a faster recovery from a temporary backlog if, when skipping events, we skip as many as we can. 

 h2. Logging 

 Print JSON-formatted log entries on stderr. 

 Print a log entry when a client connects. 

 Print a log entry when a client disconnects. Show counters for: 
 * Number of streams (UUIDs) added while connection was up 
 * Number of streams removed 
 * Number of events sent 
 * Number of bytes sent 
 * Total time spent waiting for Write() to return (or a better way to measure congestion?) 

 h2. Libraries 

 Websocket: 
 * https://godoc.org/golang.org/x/net/websocket 

 PostgreSQL: 
 * https://godoc.org/github.com/lib/pq via https://godoc.org/database/sql 
 * https://godoc.org/github.com/lib/pq#hdr-Notifications and https://godoc.org/github.com/lib/pq/listen_example 

 h1. Problems with old/current implementation 

 (Lessons to avoid re-learning next time...) 

 The Rails API server can function as a websocket server. Clients (notably Workbench, arv-mount, arv-ws) use it to listen for events without polling. 

 Problems with current implementation: 
 * Unreliable. See #9427, #8277 
 * Resource-heavy (one postgres connection per connected client, uses lots of memory) 
 * Logging is not very good 
 * Updates look like database records instead of API responses (e.g., computed fields are missing, collection manifest_text has no signatures) 
 * Offers an API for catching up on missed events after disconnecting/reconnecting, but this API (let alone the code) isn't enough to offer a "don't miss any events, don't send any events twice" guarantee. See #9388 

 #8460 

 h2. Desired features 

 Monotonically increasing event IDs, so clients can (meaningfully) request "all matching events since X" 

 h2. Design sketch (TC) 

 New server, written in Go. 

 One goroutine per connected client. 

 One database connection receiving notifications about new logs. (Possibly still N database connections serving "catch-up" messages to N clients.) 

 h2. Design sketch (PA) 

 When client connects, it can request a new session (with event filter), or asks to resume an existing session from a given event id. 

 Each session has a session id and is associated with a user, an event channel, an event queue, event filters, and exactly one websocket connection. 

 Clients can create multiple sessions on the same websocket.    When a session is created, the client can request a replay of events from database, either "last N" or "since time T". 

 Resuming a session associates the session to a new websocket connection (must be same user).    Resuming a session replays any queued events occurring after the given event id. 

 Orderly websocket disconnects tear down any associated sessions.    Abrupt disconnects maintain the sessions for N minutes.    Clients can also end individual sessions without disconnecting the websocket. 

 NOTIFY sends full json-encoded record. 

 Websocket server database LISTEN receives NOTIFY, deserializes record, assigns a sequence number to the event, gets @object_uuid@ and @object_owner_uuid@ and determines the set of users who have permission to view the log record using in-memory cache of permissions graph.   

 Intersect the set of users who can view the record with the users associated with active sessions and adds the record to the queue for each session with the associated user. 

 The goroutine for session gets a new event on the event channel.    It first applies the session filters to determine if the event should be propagated.    (This should be a Go-based implementation of Arvados query filters and not touch the database.)    If it passes filters it is added to a queue, and can be sent to the websocket client. 

 The websocket client receives the event and sends an acknowledgement with the sequence number.    The server receives the acknowledgement and removes messages from the queue with sequence number less than or equal to the acknowledgement. 


 h2. Libraries 

 Websocket: 
 * https://godoc.org/golang.org/x/net/websocket 

 PostgreSQL: 
 * https://godoc.org/github.com/lib/pq via https://godoc.org/database/sql 
 * https://godoc.org/github.com/lib/pq#hdr-Notifications and https://godoc.org/github.com/lib/pq/listen_example 

 h2. Obstacles 

 #8565, #8566 

 h2. Related 

 It might be expedient to offload synchronization to some existing software that does this well. 
 * "Apache Zookeeper":https://zookeeper.apache.org/doc/trunk/zookeeperOver.html -- "Coordination services are notoriously hard to get right. They are especially prone to errors such as race conditions and deadlock. The motivation behind ZooKeeper is to relieve distributed applications the responsibility of implementing coordination services from scratch." 
 * Google "Chubby":http://static.googleusercontent.com/media/research.google.com/en//archive/chubby-osdi06.pdf paper 
 * NSQ http://nsq.io/