Idea #7658
closed[SDK] Python websockets reconnect on unexpected close
Description
In order to start having long-running tasks that rely on websockets events to trigger state changes, we need to ensure that the Python websockets client will automatically reconnect when there is an unexpected disconnect.
Acceptance criteria¶
- The public interface of EventClient does not change.
- Clients that use the public interface of EventClient are automatically get reconnected to websockets. In other words, the
on_event
callback is called with an uninterrupted stream of events, even if the underlying client has to connect to the websockets server multiple times to provide it. - Existing tests should continue to pass without modifying the public methods called and the results they return.
- Add a test that forcibly disconnects the client and asserts that it automatically reconnects and passes in the correct last_log_id.
Suggested implementation¶
- The current EventClient class becomes an internal class (_EventClient)
- Add a
on_closed()
callback to _EventClient. - Create a new EventClient class that inherits from object.
- EventClient takes
on_event
in the constructor and has the following public API:subscribe()
,unsubscribe()
,close()
- EventClient creates an instance of _EventClient and sets callbacks on_event() and on_closed() to itself
- When subscribe() and unsubscribe() are called EventClient, record in an array and then forward to _EventClient
- When events are called from _EventClient, record the sequence number "id" in last_log_id and forward them to the real on_event() handler.
- If close() is called by user, set a flag
- If on_closed() event is called on EventClient, but close flag is false, disconnect from old _EventClient object, create a new _EventClient object, and replay the subscriptions with last_log_id set
Related issues
Updated by Brett Smith almost 9 years ago
- Target version set to Arvados Future Sprints
Updated by Brett Smith over 8 years ago
- Target version changed from Arvados Future Sprints to 2016-04-13 sprint
Updated by Brett Smith over 8 years ago
- Description updated (diff)
- Story points changed from 1.0 to 2.0
Updated by Radhika Chippada over 8 years ago
- Status changed from New to In Progress
Updated by Peter Amstutz over 8 years ago
Comments @ 7b2cd50
Could you refactor the code for managing the list of "filters" so that it is the same between EventClient
and PollClient
? Maybe add a small helper class.
_EventClient.__init__()
the on_closed
parameter should not be optional.
In EventClient.on_closed()
it should catch exceptions around self.ec.connect()
and keep retrying self.ec.connect()
with a 5-10 second wait in between tries until it succeeds or self.is_closed == True
. I didn't think of this detail when I wrote the original description but it is important for robustness (for example, when the websockets service reboots and is unavailable for a minute or two). The retry logic will need a test.
The test isn't quite correct. It creates human2
before calling close_connection(), so it is possible for it to receive the event before disconnecting. It should create human2 after it calls close_connection()
to demonstrate that it reconnected after close_connection()
.
Updated by Radhika Chippada over 8 years ago
- Could you refactor the code for managing the list of "filters" so that it is the same between EventClient and PollClient? Maybe add a small helper class.
The PollClient is using [filters] where as EventClient is using filters as is which as an array of array, so I did not do this. However, I noticed an issue with the way I was managing "subscriptions" and re-subscribing on unexpected closes. I updated the logic to first connect (with the original filters) and then issue submit calls for each entry found in the unsubscribed subscriptions.
- EventClient._init__() the on_closed parameter should not be optional.
Done
- In EventClient.on_closed() it should catch exceptions around self.ec.connect() and keep retrying self.ec.connect() with a 5-10 second wait in between tries until it succeeds or self.is_closed == True.
Added this code update
- The retry logic will need a test.
In order to write a test that checks the retry logic, I would need to mock the EventClient.connect with side_effects array [pass, some_error, pass], where the first and third side_effects should pass and do the actual connect work. I could not figure out how to define side_effects such that they pass the first and third time onwards but fail the second time. Please provide a working code sample for this so that I can write this test. Thanks.
- The test isn't quite correct. It creates human2 before calling close_connection(), so it is possible for it to receive the event before disconnecting. It should create human2 after it calls close_connection() to demonstrate that it reconnected after close_connection().
Done
Updated by Radhika Chippada over 8 years ago
Added test for retry during reconnect.
Updated by Peter Amstutz over 8 years ago
Suggest using StreamIO.StreamIO instead of logging.FileHandler. This buffers in memory instead of creating a temporary file:
logstream = StreamIO.StreamIO() rootLogger = logging.getLogger() streamHandler = logging.StreamHandler(logstream) rootLogger.addHandler(streamHandler) ... log_messages = logstream.getvalue()
In EventClient.on_closed, this should catch Exception and log the exception:
except Exception as e: _logger.warn("Error '%s' during websocket reconnect. Will retry after 5s.", e) time.sleep(5)
Updated by Radhika Chippada over 8 years ago
- Status changed from In Progress to Resolved
- % Done changed from 0 to 100
Applied in changeset arvados|commit:4ab126d1574b7db2fdb5b0cea253b2df28d7b130.