Story #7658

[SDK] Python websockets reconnect on unexpected close

Added by Peter Amstutz almost 6 years ago. Updated over 5 years ago.

Status:
Resolved
Priority:
Normal
Assigned To:
Radhika Chippada
Category:
SDKs
Target version:
Start date:
04/04/2016
Due date:
% Done:

100%

Estimated time:
(Total: 0.00 h)
Story points:
2.0

Description

In order to start having long-running tasks that rely on websockets events to trigger state changes, we need to ensure that the Python websockets client will automatically reconnect when there is an unexpected disconnect.

Acceptance criteria

  • The public interface of EventClient does not change.
  • Clients that use the public interface of EventClient are automatically get reconnected to websockets. In other words, the on_event callback is called with an uninterrupted stream of events, even if the underlying client has to connect to the websockets server multiple times to provide it.
  • Existing tests should continue to pass without modifying the public methods called and the results they return.
  • Add a test that forcibly disconnects the client and asserts that it automatically reconnects and passes in the correct last_log_id.

Suggested implementation

  1. The current EventClient class becomes an internal class (_EventClient)
  2. Add a on_closed() callback to _EventClient.
  3. Create a new EventClient class that inherits from object.
  4. EventClient takes on_event in the constructor and has the following public API: subscribe(), unsubscribe(), close()
  5. EventClient creates an instance of _EventClient and sets callbacks on_event() and on_closed() to itself
  6. When subscribe() and unsubscribe() are called EventClient, record in an array and then forward to _EventClient
  7. When events are called from _EventClient, record the sequence number "id" in last_log_id and forward them to the real on_event() handler.
  8. If close() is called by user, set a flag
  9. If on_closed() event is called on EventClient, but close flag is false, disconnect from old _EventClient object, create a new _EventClient object, and replay the subscriptions with last_log_id set

Subtasks

Task #8853: Review branch: 7658-websockets-reconnect-on-closeResolvedPeter Amstutz


Related issues

Related to Arvados - Bug #9135: [SDKs] `arv ws -j` exits immediately with an errorResolved05/06/2016

Associated revisions

Revision 4ab126d1
Added by Radhika Chippada over 5 years ago

closes #7658
Merge branch '7658-websockets-reconnect-on-close'

History

#1 Updated by Peter Amstutz almost 6 years ago

  • Description updated (diff)

#2 Updated by Brett Smith almost 6 years ago

  • Target version set to Arvados Future Sprints

#3 Updated by Peter Amstutz over 5 years ago

  • Category set to SDKs

#4 Updated by Brett Smith over 5 years ago

  • Target version changed from Arvados Future Sprints to 2016-04-13 sprint

#5 Updated by Brett Smith over 5 years ago

  • Description updated (diff)
  • Story points changed from 1.0 to 2.0

#6 Updated by Brett Smith over 5 years ago

  • Assigned To set to Radhika Chippada

#7 Updated by Radhika Chippada over 5 years ago

  • Status changed from New to In Progress

#8 Updated by Peter Amstutz over 5 years ago

  • Description updated (diff)

#9 Updated by Peter Amstutz over 5 years ago

Comments @ 7b2cd50

Could you refactor the code for managing the list of "filters" so that it is the same between EventClient and PollClient? Maybe add a small helper class.

_EventClient.__init__() the on_closed parameter should not be optional.

In EventClient.on_closed() it should catch exceptions around self.ec.connect() and keep retrying self.ec.connect() with a 5-10 second wait in between tries until it succeeds or self.is_closed == True. I didn't think of this detail when I wrote the original description but it is important for robustness (for example, when the websockets service reboots and is unavailable for a minute or two). The retry logic will need a test.

The test isn't quite correct. It creates human2 before calling close_connection(), so it is possible for it to receive the event before disconnecting. It should create human2 after it calls close_connection() to demonstrate that it reconnected after close_connection().

#10 Updated by Radhika Chippada over 5 years ago

  • Could you refactor the code for managing the list of "filters" so that it is the same between EventClient and PollClient? Maybe add a small helper class.

The PollClient is using [filters] where as EventClient is using filters as is which as an array of array, so I did not do this. However, I noticed an issue with the way I was managing "subscriptions" and re-subscribing on unexpected closes. I updated the logic to first connect (with the original filters) and then issue submit calls for each entry found in the unsubscribed subscriptions.

  • EventClient._init__() the on_closed parameter should not be optional.

Done

  • In EventClient.on_closed() it should catch exceptions around self.ec.connect() and keep retrying self.ec.connect() with a 5-10 second wait in between tries until it succeeds or self.is_closed == True.

Added this code update

  • The retry logic will need a test.

In order to write a test that checks the retry logic, I would need to mock the EventClient.connect with side_effects array [pass, some_error, pass], where the first and third side_effects should pass and do the actual connect work. I could not figure out how to define side_effects such that they pass the first and third time onwards but fail the second time. Please provide a working code sample for this so that I can write this test. Thanks.

  • The test isn't quite correct. It creates human2 before calling close_connection(), so it is possible for it to receive the event before disconnecting. It should create human2 after it calls close_connection() to demonstrate that it reconnected after close_connection().

Done

#11 Updated by Radhika Chippada over 5 years ago

Added test for retry during reconnect.

#12 Updated by Peter Amstutz over 5 years ago

Suggest using StreamIO.StreamIO instead of logging.FileHandler. This buffers in memory instead of creating a temporary file:

        logstream = StreamIO.StreamIO()
        rootLogger = logging.getLogger()
        streamHandler = logging.StreamHandler(logstream)
        rootLogger.addHandler(streamHandler)

        ...
        log_messages = logstream.getvalue()

In EventClient.on_closed, this should catch Exception and log the exception:

              except Exception as e:
                  _logger.warn("Error '%s' during websocket reconnect. Will retry after 5s.", e)
                  time.sleep(5)

#13 Updated by Radhika Chippada over 5 years ago

  • Status changed from In Progress to Resolved
  • % Done changed from 0 to 100

Applied in changeset arvados|commit:4ab126d1574b7db2fdb5b0cea253b2df28d7b130.

Also available in: Atom PDF