Statistics
| Branch: | Tag: | Revision:

arvados / sdk / python / tests / test_keep_client.py @ 76d9365a

History | View | Annotate | Download (52.9 KB)

1
from __future__ import absolute_import
2
from __future__ import division
3
from future import standard_library
4
standard_library.install_aliases()
5
from builtins import str
6
from builtins import range
7
from builtins import object
8
import hashlib
9
import mock
10
import os
11
import pycurl
12
import random
13
import re
14
import socket
15
import sys
16
import threading
17
import time
18
import unittest
19
import urllib.parse
20

    
21
import arvados
22
import arvados.retry
23
from . import arvados_testutil as tutil
24
from . import keepstub
25
from . import run_test_server
26

    
27
class KeepTestCase(run_test_server.TestCaseWithServers):
28
    MAIN_SERVER = {}
29
    KEEP_SERVER = {}
30

    
31
    @classmethod
32
    def setUpClass(cls):
33
        super(KeepTestCase, cls).setUpClass()
34
        run_test_server.authorize_with("admin")
35
        cls.api_client = arvados.api('v1')
36
        cls.keep_client = arvados.KeepClient(api_client=cls.api_client,
37
                                             proxy='', local_store='')
38

    
39
    def test_KeepBasicRWTest(self):
40
        self.assertEqual(0, self.keep_client.upload_counter.get())
41
        foo_locator = self.keep_client.put('foo')
42
        self.assertRegexpMatches(
43
            foo_locator,
44
            '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
45
            'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
46

    
47
        # 6 bytes because uploaded 2 copies
48
        self.assertEqual(6, self.keep_client.upload_counter.get())
49

    
50
        self.assertEqual(0, self.keep_client.download_counter.get())
51
        self.assertEqual(self.keep_client.get(foo_locator),
52
                         b'foo',
53
                         'wrong content from Keep.get(md5("foo"))')
54
        self.assertEqual(3, self.keep_client.download_counter.get())
55

    
56
    def test_KeepBinaryRWTest(self):
57
        blob_str = '\xff\xfe\xf7\x00\x01\x02'
58
        blob_locator = self.keep_client.put(blob_str)
59
        self.assertRegexpMatches(
60
            blob_locator,
61
            '^7fc7c53b45e53926ba52821140fef396\+6',
62
            ('wrong locator from Keep.put(<binarydata>):' + blob_locator))
63
        self.assertEqual(self.keep_client.get(blob_locator),
64
                         blob_str,
65
                         'wrong content from Keep.get(md5(<binarydata>))')
66

    
67
    def test_KeepLongBinaryRWTest(self):
68
        blob_data = '\xff\xfe\xfd\xfc\x00\x01\x02\x03'
69
        for i in range(0,23):
70
            blob_data = blob_data + blob_data
71
        blob_locator = self.keep_client.put(blob_data)
72
        self.assertRegexpMatches(
73
            blob_locator,
74
            '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
75
            ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
76
        self.assertEqual(self.keep_client.get(blob_locator),
77
                         blob_data,
78
                         'wrong content from Keep.get(md5(<binarydata>))')
79

    
80
    @unittest.skip("unreliable test - please fix and close #8752")
81
    def test_KeepSingleCopyRWTest(self):
82
        blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
83
        blob_locator = self.keep_client.put(blob_data, copies=1)
84
        self.assertRegexpMatches(
85
            blob_locator,
86
            '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
87
            ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
88
        self.assertEqual(self.keep_client.get(blob_locator),
89
                         blob_data,
90
                         'wrong content from Keep.get(md5(<binarydata>))')
91

    
92
    def test_KeepEmptyCollectionTest(self):
93
        blob_locator = self.keep_client.put('', copies=1)
94
        self.assertRegexpMatches(
95
            blob_locator,
96
            '^d41d8cd98f00b204e9800998ecf8427e\+0',
97
            ('wrong locator from Keep.put(""): ' + blob_locator))
98

    
99
    def test_unicode_must_be_ascii(self):
100
        # If unicode type, must only consist of valid ASCII
101
        foo_locator = self.keep_client.put(u'foo')
102
        self.assertRegexpMatches(
103
            foo_locator,
104
            '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
105
            'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
106

    
107
        if sys.version_info < (3, 0):
108
            with self.assertRaises(UnicodeEncodeError):
109
                # Error if it is not ASCII
110
                self.keep_client.put(u'\xe2')
111

    
112
        with self.assertRaises(AttributeError):
113
            # Must be bytes or have an encode() method
114
            self.keep_client.put({})
115

    
116
    def test_KeepHeadTest(self):
117
        locator = self.keep_client.put('test_head')
118
        self.assertRegexpMatches(
119
            locator,
120
            '^b9a772c7049325feb7130fff1f8333e9\+9',
121
            'wrong md5 hash from Keep.put for "test_head": ' + locator)
122
        self.assertEqual(True, self.keep_client.head(locator))
123
        self.assertEqual(self.keep_client.get(locator),
124
                         b'test_head',
125
                         'wrong content from Keep.get for "test_head"')
126

    
127
class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
128
    MAIN_SERVER = {}
129
    KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
130
                   'enforce_permissions': True}
131

    
132
    def test_KeepBasicRWTest(self):
133
        run_test_server.authorize_with('active')
134
        keep_client = arvados.KeepClient()
135
        foo_locator = keep_client.put('foo')
136
        self.assertRegexpMatches(
137
            foo_locator,
138
            r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
139
            'invalid locator from Keep.put("foo"): ' + foo_locator)
140
        self.assertEqual(keep_client.get(foo_locator),
141
                         b'foo',
142
                         'wrong content from Keep.get(md5("foo"))')
143

    
144
        # GET with an unsigned locator => NotFound
145
        bar_locator = keep_client.put('bar')
146
        unsigned_bar_locator = "37b51d194a7513e45b56f6524f2d51f2+3"
147
        self.assertRegexpMatches(
148
            bar_locator,
149
            r'^37b51d194a7513e45b56f6524f2d51f2\+3\+A[a-f0-9]+@[a-f0-9]+$',
150
            'invalid locator from Keep.put("bar"): ' + bar_locator)
151
        self.assertRaises(arvados.errors.NotFoundError,
152
                          keep_client.get,
153
                          unsigned_bar_locator)
154

    
155
        # GET from a different user => NotFound
156
        run_test_server.authorize_with('spectator')
157
        self.assertRaises(arvados.errors.NotFoundError,
158
                          arvados.Keep.get,
159
                          bar_locator)
160

    
161
        # Unauthenticated GET for a signed locator => NotFound
162
        # Unauthenticated GET for an unsigned locator => NotFound
163
        keep_client.api_token = ''
164
        self.assertRaises(arvados.errors.NotFoundError,
165
                          keep_client.get,
166
                          bar_locator)
167
        self.assertRaises(arvados.errors.NotFoundError,
168
                          keep_client.get,
169
                          unsigned_bar_locator)
170

    
171

    
172
# KeepOptionalPermission: starts Keep with --permission-key-file
173
# but not --enforce-permissions (i.e. generate signatures on PUT
174
# requests, but do not require them for GET requests)
175
#
176
# All of these requests should succeed when permissions are optional:
177
# * authenticated request, signed locator
178
# * authenticated request, unsigned locator
179
# * unauthenticated request, signed locator
180
# * unauthenticated request, unsigned locator
181
class KeepOptionalPermission(run_test_server.TestCaseWithServers):
182
    MAIN_SERVER = {}
183
    KEEP_SERVER = {'blob_signing_key': 'abcdefghijk0123456789',
184
                   'enforce_permissions': False}
185

    
186
    @classmethod
187
    def setUpClass(cls):
188
        super(KeepOptionalPermission, cls).setUpClass()
189
        run_test_server.authorize_with("admin")
190
        cls.api_client = arvados.api('v1')
191

    
192
    def setUp(self):
193
        super(KeepOptionalPermission, self).setUp()
194
        self.keep_client = arvados.KeepClient(api_client=self.api_client,
195
                                              proxy='', local_store='')
196

    
197
    def _put_foo_and_check(self):
198
        signed_locator = self.keep_client.put('foo')
199
        self.assertRegexpMatches(
200
            signed_locator,
201
            r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
202
            'invalid locator from Keep.put("foo"): ' + signed_locator)
203
        return signed_locator
204

    
205
    def test_KeepAuthenticatedSignedTest(self):
206
        signed_locator = self._put_foo_and_check()
207
        self.assertEqual(self.keep_client.get(signed_locator),
208
                         b'foo',
209
                         'wrong content from Keep.get(md5("foo"))')
210

    
211
    def test_KeepAuthenticatedUnsignedTest(self):
212
        signed_locator = self._put_foo_and_check()
213
        self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
214
                         b'foo',
215
                         'wrong content from Keep.get(md5("foo"))')
216

    
217
    def test_KeepUnauthenticatedSignedTest(self):
218
        # Check that signed GET requests work even when permissions
219
        # enforcement is off.
220
        signed_locator = self._put_foo_and_check()
221
        self.keep_client.api_token = ''
222
        self.assertEqual(self.keep_client.get(signed_locator),
223
                         b'foo',
224
                         'wrong content from Keep.get(md5("foo"))')
225

    
226
    def test_KeepUnauthenticatedUnsignedTest(self):
227
        # Since --enforce-permissions is not in effect, GET requests
228
        # need not be authenticated.
229
        signed_locator = self._put_foo_and_check()
230
        self.keep_client.api_token = ''
231
        self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
232
                         'foo',
233
                         'wrong content from Keep.get(md5("foo"))')
234

    
235

    
236
class KeepProxyTestCase(run_test_server.TestCaseWithServers):
237
    MAIN_SERVER = {}
238
    KEEP_SERVER = {}
239
    KEEP_PROXY_SERVER = {}
240

    
241
    @classmethod
242
    def setUpClass(cls):
243
        super(KeepProxyTestCase, cls).setUpClass()
244
        run_test_server.authorize_with('active')
245
        cls.api_client = arvados.api('v1')
246

    
247
    def tearDown(self):
248
        arvados.config.settings().pop('ARVADOS_EXTERNAL_CLIENT', None)
249
        super(KeepProxyTestCase, self).tearDown()
250

    
251
    def test_KeepProxyTest1(self):
252
        # Will use ARVADOS_KEEP_SERVICES environment variable that
253
        # is set by setUpClass().
254
        keep_client = arvados.KeepClient(api_client=self.api_client,
255
                                         local_store='')
256
        baz_locator = keep_client.put('baz')
257
        self.assertRegexpMatches(
258
            baz_locator,
259
            '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
260
            'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
261
        self.assertEqual(keep_client.get(baz_locator),
262
                         b'baz',
263
                         'wrong content from Keep.get(md5("baz"))')
264
        self.assertTrue(keep_client.using_proxy)
265

    
266
    def test_KeepProxyTest2(self):
267
        # Don't instantiate the proxy directly, but set the X-External-Client
268
        # header.  The API server should direct us to the proxy.
269
        arvados.config.settings()['ARVADOS_EXTERNAL_CLIENT'] = 'true'
270
        keep_client = arvados.KeepClient(api_client=self.api_client,
271
                                         proxy='', local_store='')
272
        baz_locator = keep_client.put('baz2')
273
        self.assertRegexpMatches(
274
            baz_locator,
275
            '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
276
            'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
277
        self.assertEqual(keep_client.get(baz_locator),
278
                         b'baz2',
279
                         'wrong content from Keep.get(md5("baz2"))')
280
        self.assertTrue(keep_client.using_proxy)
281

    
282
    def test_KeepProxyTestMultipleURIs(self):
283
        # Test using ARVADOS_KEEP_SERVICES env var overriding any
284
        # existing proxy setting and setting multiple proxies
285
        arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'http://10.0.0.1 https://foo.example.org:1234/'
286
        keep_client = arvados.KeepClient(api_client=self.api_client,
287
                                         local_store='')
288
        uris = [x['_service_root'] for x in keep_client._keep_services]
289
        self.assertEqual(uris, ['http://10.0.0.1/',
290
                                'https://foo.example.org:1234/'])
291

    
292
    def test_KeepProxyTestInvalidURI(self):
293
        arvados.config.settings()['ARVADOS_KEEP_SERVICES'] = 'bad.uri.org'
294
        with self.assertRaises(arvados.errors.ArgumentError):
295
            keep_client = arvados.KeepClient(api_client=self.api_client,
296
                                             local_store='')
297

    
298

    
299
class KeepClientServiceTestCase(unittest.TestCase, tutil.ApiClientMock):
300
    def get_service_roots(self, api_client):
301
        keep_client = arvados.KeepClient(api_client=api_client)
302
        services = keep_client.weighted_service_roots(arvados.KeepLocator('0'*32))
303
        return [urllib.parse.urlparse(url) for url in sorted(services)]
304

    
305
    def test_ssl_flag_respected_in_roots(self):
306
        for ssl_flag in [False, True]:
307
            services = self.get_service_roots(self.mock_keep_services(
308
                service_ssl_flag=ssl_flag))
309
            self.assertEqual(
310
                ('https' if ssl_flag else 'http'), services[0].scheme)
311

    
312
    def test_correct_ports_with_ipv6_addresses(self):
313
        service = self.get_service_roots(self.mock_keep_services(
314
            service_type='proxy', service_host='100::1', service_port=10, count=1))[0]
315
        self.assertEqual('100::1', service.hostname)
316
        self.assertEqual(10, service.port)
317

    
318
    # test_*_timeout verify that KeepClient instructs pycurl to use
319
    # the appropriate connection and read timeouts. They don't care
320
    # whether pycurl actually exhibits the expected timeout behavior
321
    # -- those tests are in the KeepClientTimeout test class.
322

    
323
    def test_get_timeout(self):
324
        api_client = self.mock_keep_services(count=1)
325
        force_timeout = socket.timeout("timed out")
326
        with tutil.mock_keep_responses(force_timeout, 0) as mock:
327
            keep_client = arvados.KeepClient(api_client=api_client)
328
            with self.assertRaises(arvados.errors.KeepReadError):
329
                keep_client.get('ffffffffffffffffffffffffffffffff')
330
            self.assertEqual(
331
                mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
332
                int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
333
            self.assertEqual(
334
                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
335
                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
336
            self.assertEqual(
337
                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
338
                int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
339

    
340
    def test_put_timeout(self):
341
        api_client = self.mock_keep_services(count=1)
342
        force_timeout = socket.timeout("timed out")
343
        with tutil.mock_keep_responses(force_timeout, 0) as mock:
344
            keep_client = arvados.KeepClient(api_client=api_client)
345
            with self.assertRaises(arvados.errors.KeepWriteError):
346
                keep_client.put('foo')
347
            self.assertEqual(
348
                mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
349
                int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
350
            self.assertEqual(
351
                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
352
                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
353
            self.assertEqual(
354
                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
355
                int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
356

    
357
    def test_head_timeout(self):
358
        api_client = self.mock_keep_services(count=1)
359
        force_timeout = socket.timeout("timed out")
360
        with tutil.mock_keep_responses(force_timeout, 0) as mock:
361
            keep_client = arvados.KeepClient(api_client=api_client)
362
            with self.assertRaises(arvados.errors.KeepReadError):
363
                keep_client.head('ffffffffffffffffffffffffffffffff')
364
            self.assertEqual(
365
                mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
366
                int(arvados.KeepClient.DEFAULT_TIMEOUT[0]*1000))
367
            self.assertEqual(
368
                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
369
                int(arvados.KeepClient.DEFAULT_TIMEOUT[1]))
370
            self.assertEqual(
371
                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
372
                int(arvados.KeepClient.DEFAULT_TIMEOUT[2]))
373

    
374
    def test_proxy_get_timeout(self):
375
        api_client = self.mock_keep_services(service_type='proxy', count=1)
376
        force_timeout = socket.timeout("timed out")
377
        with tutil.mock_keep_responses(force_timeout, 0) as mock:
378
            keep_client = arvados.KeepClient(api_client=api_client)
379
            with self.assertRaises(arvados.errors.KeepReadError):
380
                keep_client.get('ffffffffffffffffffffffffffffffff')
381
            self.assertEqual(
382
                mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
383
                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
384
            self.assertEqual(
385
                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
386
                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
387
            self.assertEqual(
388
                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
389
                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
390

    
391
    def test_proxy_head_timeout(self):
392
        api_client = self.mock_keep_services(service_type='proxy', count=1)
393
        force_timeout = socket.timeout("timed out")
394
        with tutil.mock_keep_responses(force_timeout, 0) as mock:
395
            keep_client = arvados.KeepClient(api_client=api_client)
396
            with self.assertRaises(arvados.errors.KeepReadError):
397
                keep_client.head('ffffffffffffffffffffffffffffffff')
398
            self.assertEqual(
399
                mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
400
                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
401
            self.assertEqual(
402
                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
403
                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
404
            self.assertEqual(
405
                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
406
                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
407

    
408
    def test_proxy_put_timeout(self):
409
        api_client = self.mock_keep_services(service_type='proxy', count=1)
410
        force_timeout = socket.timeout("timed out")
411
        with tutil.mock_keep_responses(force_timeout, 0) as mock:
412
            keep_client = arvados.KeepClient(api_client=api_client)
413
            with self.assertRaises(arvados.errors.KeepWriteError):
414
                keep_client.put('foo')
415
            self.assertEqual(
416
                mock.responses[0].getopt(pycurl.CONNECTTIMEOUT_MS),
417
                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[0]*1000))
418
            self.assertEqual(
419
                mock.responses[0].getopt(pycurl.LOW_SPEED_TIME),
420
                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[1]))
421
            self.assertEqual(
422
                mock.responses[0].getopt(pycurl.LOW_SPEED_LIMIT),
423
                int(arvados.KeepClient.DEFAULT_PROXY_TIMEOUT[2]))
424

    
425
    def check_no_services_error(self, verb, exc_class):
426
        api_client = mock.MagicMock(name='api_client')
427
        api_client.keep_services().accessible().execute.side_effect = (
428
            arvados.errors.ApiError)
429
        keep_client = arvados.KeepClient(api_client=api_client)
430
        with self.assertRaises(exc_class) as err_check:
431
            getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0')
432
        self.assertEqual(0, len(err_check.exception.request_errors()))
433

    
434
    def test_get_error_with_no_services(self):
435
        self.check_no_services_error('get', arvados.errors.KeepReadError)
436

    
437
    def test_head_error_with_no_services(self):
438
        self.check_no_services_error('head', arvados.errors.KeepReadError)
439

    
440
    def test_put_error_with_no_services(self):
441
        self.check_no_services_error('put', arvados.errors.KeepWriteError)
442

    
443
    def check_errors_from_last_retry(self, verb, exc_class):
444
        api_client = self.mock_keep_services(count=2)
445
        req_mock = tutil.mock_keep_responses(
446
            "retry error reporting test", 500, 500, 403, 403)
447
        with req_mock, tutil.skip_sleep, \
448
                self.assertRaises(exc_class) as err_check:
449
            keep_client = arvados.KeepClient(api_client=api_client)
450
            getattr(keep_client, verb)('d41d8cd98f00b204e9800998ecf8427e+0',
451
                                       num_retries=3)
452
        self.assertEqual([403, 403], [
453
                getattr(error, 'status_code', None)
454
                for error in err_check.exception.request_errors().values()])
455

    
456
    def test_get_error_reflects_last_retry(self):
457
        self.check_errors_from_last_retry('get', arvados.errors.KeepReadError)
458

    
459
    def test_head_error_reflects_last_retry(self):
460
        self.check_errors_from_last_retry('head', arvados.errors.KeepReadError)
461

    
462
    def test_put_error_reflects_last_retry(self):
463
        self.check_errors_from_last_retry('put', arvados.errors.KeepWriteError)
464

    
465
    def test_put_error_does_not_include_successful_puts(self):
466
        data = 'partial failure test'
467
        data_loc = tutil.str_keep_locator(data)
468
        api_client = self.mock_keep_services(count=3)
469
        with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
470
                self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
471
            keep_client = arvados.KeepClient(api_client=api_client)
472
            keep_client.put(data)
473
        self.assertEqual(2, len(exc_check.exception.request_errors()))
474

    
475
    def test_proxy_put_with_no_writable_services(self):
476
        data = 'test with no writable services'
477
        data_loc = tutil.str_keep_locator(data)
478
        api_client = self.mock_keep_services(service_type='proxy', read_only=True, count=1)
479
        with tutil.mock_keep_responses(data_loc, 200, 500, 500) as req_mock, \
480
                self.assertRaises(arvados.errors.KeepWriteError) as exc_check:
481
          keep_client = arvados.KeepClient(api_client=api_client)
482
          keep_client.put(data)
483
        self.assertEqual(True, ("no Keep services available" in str(exc_check.exception)))
484
        self.assertEqual(0, len(exc_check.exception.request_errors()))
485

    
486
    def test_oddball_service_get(self):
487
        body = 'oddball service get'
488
        api_client = self.mock_keep_services(service_type='fancynewblobstore')
489
        with tutil.mock_keep_responses(body, 200):
490
            keep_client = arvados.KeepClient(api_client=api_client)
491
            actual = keep_client.get(tutil.str_keep_locator(body))
492
        self.assertEqual(body, actual)
493

    
494
    def test_oddball_service_put(self):
495
        body = 'oddball service put'
496
        pdh = tutil.str_keep_locator(body)
497
        api_client = self.mock_keep_services(service_type='fancynewblobstore')
498
        with tutil.mock_keep_responses(pdh, 200):
499
            keep_client = arvados.KeepClient(api_client=api_client)
500
            actual = keep_client.put(body, copies=1)
501
        self.assertEqual(pdh, actual)
502

    
503
    def test_oddball_service_writer_count(self):
504
        body = 'oddball service writer count'
505
        pdh = tutil.str_keep_locator(body)
506
        api_client = self.mock_keep_services(service_type='fancynewblobstore',
507
                                             count=4)
508
        headers = {'x-keep-replicas-stored': 3}
509
        with tutil.mock_keep_responses(pdh, 200, 418, 418, 418,
510
                                       **headers) as req_mock:
511
            keep_client = arvados.KeepClient(api_client=api_client)
512
            actual = keep_client.put(body, copies=2)
513
        self.assertEqual(pdh, actual)
514
        self.assertEqual(1, req_mock.call_count)
515

    
516

    
517
@tutil.skip_sleep
518
class KeepClientRendezvousTestCase(unittest.TestCase, tutil.ApiClientMock):
519

    
520
    def setUp(self):
521
        # expected_order[i] is the probe order for
522
        # hash=md5(sprintf("%064x",i)) where there are 16 services
523
        # with uuid sprintf("anything-%015x",j) with j in 0..15. E.g.,
524
        # the first probe for the block consisting of 64 "0"
525
        # characters is the service whose uuid is
526
        # "zzzzz-bi6l4-000000000000003", so expected_order[0][0]=='3'.
527
        self.services = 16
528
        self.expected_order = [
529
            list('3eab2d5fc9681074'),
530
            list('097dba52e648f1c3'),
531
            list('c5b4e023f8a7d691'),
532
            list('9d81c02e76a3bf54'),
533
            ]
534
        self.blocks = [
535
            "{:064x}".format(x).encode('utf-8')
536
            for x in range(len(self.expected_order))]
537
        self.hashes = [
538
            hashlib.md5(self.blocks[x]).hexdigest()
539
            for x in range(len(self.expected_order))]
540
        self.api_client = self.mock_keep_services(count=self.services)
541
        self.keep_client = arvados.KeepClient(api_client=self.api_client)
542

    
543
    def test_weighted_service_roots_against_reference_set(self):
544
        # Confirm weighted_service_roots() returns the correct order
545
        for i, hash in enumerate(self.hashes):
546
            roots = self.keep_client.weighted_service_roots(arvados.KeepLocator(hash))
547
            got_order = [
548
                re.search(r'//\[?keep0x([0-9a-f]+)', root).group(1)
549
                for root in roots]
550
            self.assertEqual(self.expected_order[i], got_order)
551

    
552
    def test_get_probe_order_against_reference_set(self):
553
        self._test_probe_order_against_reference_set(
554
            lambda i: self.keep_client.get(self.hashes[i], num_retries=1))
555

    
556
    def test_head_probe_order_against_reference_set(self):
557
        self._test_probe_order_against_reference_set(
558
            lambda i: self.keep_client.head(self.hashes[i], num_retries=1))
559

    
560
    def test_put_probe_order_against_reference_set(self):
561
        # copies=1 prevents the test from being sensitive to races
562
        # between writer threads.
563
        self._test_probe_order_against_reference_set(
564
            lambda i: self.keep_client.put(self.blocks[i], num_retries=1, copies=1))
565

    
566
    def _test_probe_order_against_reference_set(self, op):
567
        for i in range(len(self.blocks)):
568
            with tutil.mock_keep_responses('', *[500 for _ in range(self.services*2)]) as mock, \
569
                 self.assertRaises(arvados.errors.KeepRequestError):
570
                op(i)
571
            got_order = [
572
                re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL)).group(1)
573
                for resp in mock.responses]
574
            self.assertEqual(self.expected_order[i]*2, got_order)
575

    
576
    def test_put_probe_order_multiple_copies(self):
577
        for copies in range(2, 4):
578
            for i in range(len(self.blocks)):
579
                with tutil.mock_keep_responses('', *[500 for _ in range(self.services*3)]) as mock, \
580
                     self.assertRaises(arvados.errors.KeepWriteError):
581
                    self.keep_client.put(self.blocks[i], num_retries=2, copies=copies)
582
                got_order = [
583
                    re.search(r'//\[?keep0x([0-9a-f]+)', resp.getopt(pycurl.URL)).group(1)
584
                    for resp in mock.responses]
585
                # With T threads racing to make requests, the position
586
                # of a given server in the sequence of HTTP requests
587
                # (got_order) cannot be more than T-1 positions
588
                # earlier than that server's position in the reference
589
                # probe sequence (expected_order).
590
                #
591
                # Loop invariant: we have accounted for +pos+ expected
592
                # probes, either by seeing them in +got_order+ or by
593
                # putting them in +pending+ in the hope of seeing them
594
                # later. As long as +len(pending)<T+, we haven't
595
                # started a request too early.
596
                pending = []
597
                for pos, expected in enumerate(self.expected_order[i]*3):
598
                    got = got_order[pos-len(pending)]
599
                    while got in pending:
600
                        del pending[pending.index(got)]
601
                        got = got_order[pos-len(pending)]
602
                    if got != expected:
603
                        pending.append(expected)
604
                        self.assertLess(
605
                            len(pending), copies,
606
                            "pending={}, with copies={}, got {}, expected {}".format(
607
                                pending, copies, repr(got_order), repr(self.expected_order[i]*3)))
608

    
609
    def test_probe_waste_adding_one_server(self):
610
        hashes = [
611
            hashlib.md5("{:064x}".format(x)).hexdigest() for x in range(100)]
612
        initial_services = 12
613
        self.api_client = self.mock_keep_services(count=initial_services)
614
        self.keep_client = arvados.KeepClient(api_client=self.api_client)
615
        probes_before = [
616
            self.keep_client.weighted_service_roots(arvados.KeepLocator(hash)) for hash in hashes]
617
        for added_services in range(1, 12):
618
            api_client = self.mock_keep_services(count=initial_services+added_services)
619
            keep_client = arvados.KeepClient(api_client=api_client)
620
            total_penalty = 0
621
            for hash_index in range(len(hashes)):
622
                probe_after = keep_client.weighted_service_roots(
623
                    arvados.KeepLocator(hashes[hash_index]))
624
                penalty = probe_after.index(probes_before[hash_index][0])
625
                self.assertLessEqual(penalty, added_services)
626
                total_penalty += penalty
627
            # Average penalty per block should not exceed
628
            # N(added)/N(orig) by more than 20%, and should get closer
629
            # to the ideal as we add data points.
630
            expect_penalty = (
631
                added_services *
632
                len(hashes) / initial_services)
633
            max_penalty = (
634
                expect_penalty *
635
                (120 - added_services)/100)
636
            min_penalty = (
637
                expect_penalty * 8/10)
638
            self.assertTrue(
639
                min_penalty <= total_penalty <= max_penalty,
640
                "With {}+{} services, {} blocks, penalty {} but expected {}..{}".format(
641
                    initial_services,
642
                    added_services,
643
                    len(hashes),
644
                    total_penalty,
645
                    min_penalty,
646
                    max_penalty))
647

    
648
    def check_64_zeros_error_order(self, verb, exc_class):
649
        data = '0' * 64
650
        if verb == 'get':
651
            data = tutil.str_keep_locator(data)
652
        # Arbitrary port number:
653
        aport = random.randint(1024,65535)
654
        api_client = self.mock_keep_services(service_port=aport, count=self.services)
655
        keep_client = arvados.KeepClient(api_client=api_client)
656
        with mock.patch('pycurl.Curl') as curl_mock, \
657
             self.assertRaises(exc_class) as err_check:
658
            curl_mock.return_value.side_effect = socket.timeout
659
            getattr(keep_client, verb)(data)
660
        urls = [urllib.parse.urlparse(url)
661
                for url in err_check.exception.request_errors()]
662
        self.assertEqual([('keep0x' + c, aport) for c in '3eab2d5fc9681074'],
663
                         [(url.hostname, url.port) for url in urls])
664

    
665
    def test_get_error_shows_probe_order(self):
666
        self.check_64_zeros_error_order('get', arvados.errors.KeepReadError)
667

    
668
    def test_put_error_shows_probe_order(self):
669
        self.check_64_zeros_error_order('put', arvados.errors.KeepWriteError)
670

    
671

    
672
class KeepClientTimeout(unittest.TestCase, tutil.ApiClientMock):
673
    # BANDWIDTH_LOW_LIM must be less than len(DATA) so we can transfer
674
    # 1s worth of data and then trigger bandwidth errors before running
675
    # out of data.
676
    DATA = 'x'*2**11
677
    BANDWIDTH_LOW_LIM = 1024
678
    TIMEOUT_TIME = 1.0
679

    
680
    class assertTakesBetween(unittest.TestCase):
681
        def __init__(self, tmin, tmax):
682
            self.tmin = tmin
683
            self.tmax = tmax
684

    
685
        def __enter__(self):
686
            self.t0 = time.time()
687

    
688
        def __exit__(self, *args, **kwargs):
689
            # Round times to milliseconds, like CURL. Otherwise, we
690
            # fail when CURL reaches a 1s timeout at 0.9998s.
691
            delta = round(time.time() - self.t0, 3)
692
            self.assertGreaterEqual(delta, self.tmin)
693
            self.assertLessEqual(delta, self.tmax)
694

    
695
    class assertTakesGreater(unittest.TestCase):
696
        def __init__(self, tmin):
697
            self.tmin = tmin
698

    
699
        def __enter__(self):
700
            self.t0 = time.time()
701

    
702
        def __exit__(self, *args, **kwargs):
703
            delta = round(time.time() - self.t0, 3)
704
            self.assertGreaterEqual(delta, self.tmin)
705

    
706
    def setUp(self):
707
        sock = socket.socket()
708
        sock.bind(('0.0.0.0', 0))
709
        self.port = sock.getsockname()[1]
710
        sock.close()
711
        self.server = keepstub.Server(('0.0.0.0', self.port), keepstub.Handler)
712
        self.thread = threading.Thread(target=self.server.serve_forever)
713
        self.thread.daemon = True # Exit thread if main proc exits
714
        self.thread.start()
715
        self.api_client = self.mock_keep_services(
716
            count=1,
717
            service_host='localhost',
718
            service_port=self.port,
719
        )
720

    
721
    def tearDown(self):
722
        self.server.shutdown()
723

    
724
    def keepClient(self, timeouts=(0.1, TIMEOUT_TIME, BANDWIDTH_LOW_LIM)):
725
        return arvados.KeepClient(
726
            api_client=self.api_client,
727
            timeout=timeouts)
728

    
729
    def test_timeout_slow_connect(self):
730
        # Can't simulate TCP delays with our own socket. Leave our
731
        # stub server running uselessly, and try to connect to an
732
        # unroutable IP address instead.
733
        self.api_client = self.mock_keep_services(
734
            count=1,
735
            service_host='240.0.0.0',
736
        )
737
        with self.assertTakesBetween(0.1, 0.5):
738
            with self.assertRaises(arvados.errors.KeepWriteError):
739
                self.keepClient().put(self.DATA, copies=1, num_retries=0)
740

    
741
    def test_low_bandwidth_no_delays_success(self):
742
        self.server.setbandwidth(2*self.BANDWIDTH_LOW_LIM)
743
        kc = self.keepClient()
744
        loc = kc.put(self.DATA, copies=1, num_retries=0)
745
        self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
746

    
747
    def test_too_low_bandwidth_no_delays_failure(self):
748
        # Check that lessening bandwidth corresponds to failing
749
        kc = self.keepClient()
750
        loc = kc.put(self.DATA, copies=1, num_retries=0)
751
        self.server.setbandwidth(0.5*self.BANDWIDTH_LOW_LIM)
752
        with self.assertTakesGreater(self.TIMEOUT_TIME):
753
            with self.assertRaises(arvados.errors.KeepReadError) as e:
754
                kc.get(loc, num_retries=0)
755
        with self.assertTakesGreater(self.TIMEOUT_TIME):
756
            with self.assertRaises(arvados.errors.KeepWriteError):
757
                kc.put(self.DATA, copies=1, num_retries=0)
758

    
759
    def test_low_bandwidth_with_server_response_delay_failure(self):
760
        kc = self.keepClient()
761
        loc = kc.put(self.DATA, copies=1, num_retries=0)
762
        self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
763
        self.server.setdelays(response=self.TIMEOUT_TIME)
764
        with self.assertTakesGreater(self.TIMEOUT_TIME):
765
            with self.assertRaises(arvados.errors.KeepReadError) as e:
766
                kc.get(loc, num_retries=0)
767
        with self.assertTakesGreater(self.TIMEOUT_TIME):
768
            with self.assertRaises(arvados.errors.KeepWriteError):
769
                kc.put(self.DATA, copies=1, num_retries=0)
770
        with self.assertTakesGreater(self.TIMEOUT_TIME):
771
            with self.assertRaises(arvados.errors.KeepReadError) as e:
772
                kc.head(loc, num_retries=0)
773

    
774
    def test_low_bandwidth_with_server_mid_delay_failure(self):
775
        kc = self.keepClient()
776
        loc = kc.put(self.DATA, copies=1, num_retries=0)
777
        self.server.setbandwidth(self.BANDWIDTH_LOW_LIM)
778
        self.server.setdelays(mid_write=self.TIMEOUT_TIME, mid_read=self.TIMEOUT_TIME)
779
        with self.assertTakesGreater(self.TIMEOUT_TIME):
780
            with self.assertRaises(arvados.errors.KeepReadError) as e:
781
                kc.get(loc, num_retries=0)
782
        with self.assertTakesGreater(self.TIMEOUT_TIME):
783
            with self.assertRaises(arvados.errors.KeepWriteError):
784
                kc.put(self.DATA, copies=1, num_retries=0)
785

    
786
    def test_timeout_slow_request(self):
787
        loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
788
        self.server.setdelays(request=.2)
789
        self._test_connect_timeout_under_200ms(loc)
790
        self.server.setdelays(request=2)
791
        self._test_response_timeout_under_2s(loc)
792

    
793
    def test_timeout_slow_response(self):
794
        loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
795
        self.server.setdelays(response=.2)
796
        self._test_connect_timeout_under_200ms(loc)
797
        self.server.setdelays(response=2)
798
        self._test_response_timeout_under_2s(loc)
799

    
800
    def test_timeout_slow_response_body(self):
801
        loc = self.keepClient().put(self.DATA, copies=1, num_retries=0)
802
        self.server.setdelays(response_body=.2)
803
        self._test_connect_timeout_under_200ms(loc)
804
        self.server.setdelays(response_body=2)
805
        self._test_response_timeout_under_2s(loc)
806

    
807
    def _test_connect_timeout_under_200ms(self, loc):
808
        # Allow 100ms to connect, then 1s for response. Everything
809
        # should work, and everything should take at least 200ms to
810
        # return.
811
        kc = self.keepClient(timeouts=(.1, 1))
812
        with self.assertTakesBetween(.2, .3):
813
            kc.put(self.DATA, copies=1, num_retries=0)
814
        with self.assertTakesBetween(.2, .3):
815
            self.assertEqual(self.DATA, kc.get(loc, num_retries=0))
816

    
817
    def _test_response_timeout_under_2s(self, loc):
818
        # Allow 10s to connect, then 1s for response. Nothing should
819
        # work, and everything should take at least 1s to return.
820
        kc = self.keepClient(timeouts=(10, 1))
821
        with self.assertTakesBetween(1, 9):
822
            with self.assertRaises(arvados.errors.KeepReadError):
823
                kc.get(loc, num_retries=0)
824
        with self.assertTakesBetween(1, 9):
825
            with self.assertRaises(arvados.errors.KeepWriteError):
826
                kc.put(self.DATA, copies=1, num_retries=0)
827

    
828

    
829
class KeepClientGatewayTestCase(unittest.TestCase, tutil.ApiClientMock):
830
    def mock_disks_and_gateways(self, disks=3, gateways=1):
831
        self.gateways = [{
832
                'uuid': 'zzzzz-bi6l4-gateway{:08d}'.format(i),
833
                'owner_uuid': 'zzzzz-tpzed-000000000000000',
834
                'service_host': 'gatewayhost{}'.format(i),
835
                'service_port': 12345,
836
                'service_ssl_flag': True,
837
                'service_type': 'gateway:test',
838
        } for i in range(gateways)]
839
        self.gateway_roots = [
840
            "https://{service_host}:{service_port}/".format(**gw)
841
            for gw in self.gateways]
842
        self.api_client = self.mock_keep_services(
843
            count=disks, additional_services=self.gateways)
844
        self.keepClient = arvados.KeepClient(api_client=self.api_client)
845

    
846
    @mock.patch('pycurl.Curl')
847
    def test_get_with_gateway_hint_first(self, MockCurl):
848
        MockCurl.return_value = tutil.FakeCurl.make(
849
            code=200, body='foo', headers={'Content-Length': 3})
850
        self.mock_disks_and_gateways()
851
        locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@' + self.gateways[0]['uuid']
852
        self.assertEqual('foo', self.keepClient.get(locator))
853
        self.assertEqual(self.gateway_roots[0]+locator,
854
                         MockCurl.return_value.getopt(pycurl.URL))
855
        self.assertEqual(True, self.keepClient.head(locator))
856

    
857
    @mock.patch('pycurl.Curl')
858
    def test_get_with_gateway_hints_in_order(self, MockCurl):
859
        gateways = 4
860
        disks = 3
861
        mocks = [
862
            tutil.FakeCurl.make(code=404, body='')
863
            for _ in range(gateways+disks)
864
        ]
865
        MockCurl.side_effect = tutil.queue_with(mocks)
866
        self.mock_disks_and_gateways(gateways=gateways, disks=disks)
867
        locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
868
                           ['K@'+gw['uuid'] for gw in self.gateways])
869
        with self.assertRaises(arvados.errors.NotFoundError):
870
            self.keepClient.get(locator)
871
        # Gateways are tried first, in the order given.
872
        for i, root in enumerate(self.gateway_roots):
873
            self.assertEqual(root+locator,
874
                             mocks[i].getopt(pycurl.URL))
875
        # Disk services are tried next.
876
        for i in range(gateways, gateways+disks):
877
            self.assertRegexpMatches(
878
                mocks[i].getopt(pycurl.URL),
879
                r'keep0x')
880

    
881
    @mock.patch('pycurl.Curl')
882
    def test_head_with_gateway_hints_in_order(self, MockCurl):
883
        gateways = 4
884
        disks = 3
885
        mocks = [
886
            tutil.FakeCurl.make(code=404, body='')
887
            for _ in range(gateways+disks)
888
        ]
889
        MockCurl.side_effect = tutil.queue_with(mocks)
890
        self.mock_disks_and_gateways(gateways=gateways, disks=disks)
891
        locator = '+'.join(['acbd18db4cc2f85cedef654fccc4a4d8+3'] +
892
                           ['K@'+gw['uuid'] for gw in self.gateways])
893
        with self.assertRaises(arvados.errors.NotFoundError):
894
            self.keepClient.head(locator)
895
        # Gateways are tried first, in the order given.
896
        for i, root in enumerate(self.gateway_roots):
897
            self.assertEqual(root+locator,
898
                             mocks[i].getopt(pycurl.URL))
899
        # Disk services are tried next.
900
        for i in range(gateways, gateways+disks):
901
            self.assertRegexpMatches(
902
                mocks[i].getopt(pycurl.URL),
903
                r'keep0x')
904

    
905
    @mock.patch('pycurl.Curl')
906
    def test_get_with_remote_proxy_hint(self, MockCurl):
907
        MockCurl.return_value = tutil.FakeCurl.make(
908
            code=200, body='foo', headers={'Content-Length': 3})
909
        self.mock_disks_and_gateways()
910
        locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
911
        self.assertEqual('foo', self.keepClient.get(locator))
912
        self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
913
                         MockCurl.return_value.getopt(pycurl.URL))
914

    
915
    @mock.patch('pycurl.Curl')
916
    def test_head_with_remote_proxy_hint(self, MockCurl):
917
        MockCurl.return_value = tutil.FakeCurl.make(
918
            code=200, body='foo', headers={'Content-Length': 3})
919
        self.mock_disks_and_gateways()
920
        locator = 'acbd18db4cc2f85cedef654fccc4a4d8+3+K@xyzzy'
921
        self.assertEqual(True, self.keepClient.head(locator))
922
        self.assertEqual('https://keep.xyzzy.arvadosapi.com/'+locator,
923
                         MockCurl.return_value.getopt(pycurl.URL))
924

    
925

    
926
class KeepClientRetryTestMixin(object):
927
    # Testing with a local Keep store won't exercise the retry behavior.
928
    # Instead, our strategy is:
929
    # * Create a client with one proxy specified (pointed at a black
930
    #   hole), so there's no need to instantiate an API client, and
931
    #   all HTTP requests come from one place.
932
    # * Mock httplib's request method to provide simulated responses.
933
    # This lets us test the retry logic extensively without relying on any
934
    # supporting servers, and prevents side effects in case something hiccups.
935
    # To use this mixin, define DEFAULT_EXPECT, DEFAULT_EXCEPTION, and
936
    # run_method().
937
    #
938
    # Test classes must define TEST_PATCHER to a method that mocks
939
    # out appropriate methods in the client.
940

    
941
    PROXY_ADDR = 'http://[%s]:65535/' % (tutil.TEST_HOST,)
942
    TEST_DATA = 'testdata'
943
    TEST_LOCATOR = 'ef654c40ab4f1747fc699915d4f70902+8'
944

    
945
    def setUp(self):
946
        self.client_kwargs = {'proxy': self.PROXY_ADDR, 'local_store': ''}
947

    
948
    def new_client(self, **caller_kwargs):
949
        kwargs = self.client_kwargs.copy()
950
        kwargs.update(caller_kwargs)
951
        return arvados.KeepClient(**kwargs)
952

    
953
    def run_method(self, *args, **kwargs):
954
        raise NotImplementedError("test subclasses must define run_method")
955

    
956
    def check_success(self, expected=None, *args, **kwargs):
957
        if expected is None:
958
            expected = self.DEFAULT_EXPECT
959
        self.assertEqual(expected, self.run_method(*args, **kwargs))
960

    
961
    def check_exception(self, error_class=None, *args, **kwargs):
962
        if error_class is None:
963
            error_class = self.DEFAULT_EXCEPTION
964
        self.assertRaises(error_class, self.run_method, *args, **kwargs)
965

    
966
    def test_immediate_success(self):
967
        with self.TEST_PATCHER(self.DEFAULT_EXPECT, 200):
968
            self.check_success()
969

    
970
    def test_retry_then_success(self):
971
        with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
972
            self.check_success(num_retries=3)
973

    
974
    def test_exception_then_success(self):
975
        with self.TEST_PATCHER(self.DEFAULT_EXPECT, Exception('mock err'), 200):
976
            self.check_success(num_retries=3)
977

    
978
    def test_no_default_retry(self):
979
        with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
980
            self.check_exception()
981

    
982
    def test_no_retry_after_permanent_error(self):
983
        with self.TEST_PATCHER(self.DEFAULT_EXPECT, 403, 200):
984
            self.check_exception(num_retries=3)
985

    
986
    def test_error_after_retries_exhausted(self):
987
        with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 500, 200):
988
            self.check_exception(num_retries=1)
989

    
990
    def test_num_retries_instance_fallback(self):
991
        self.client_kwargs['num_retries'] = 3
992
        with self.TEST_PATCHER(self.DEFAULT_EXPECT, 500, 200):
993
            self.check_success()
994

    
995

    
996
@tutil.skip_sleep
997
class KeepClientRetryGetTestCase(KeepClientRetryTestMixin, unittest.TestCase):
998
    DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_DATA
999
    DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1000
    HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1001
    TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1002

    
1003
    def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1004
                   *args, **kwargs):
1005
        return self.new_client().get(locator, *args, **kwargs)
1006

    
1007
    def test_specific_exception_when_not_found(self):
1008
        with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1009
            self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1010

    
1011
    def test_general_exception_with_mixed_errors(self):
1012
        # get should raise a NotFoundError if no server returns the block,
1013
        # and a high threshold of servers report that it's not found.
1014
        # This test rigs up 50/50 disagreement between two servers, and
1015
        # checks that it does not become a NotFoundError.
1016
        client = self.new_client()
1017
        with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1018
            with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1019
                client.get(self.HINTED_LOCATOR)
1020
            self.assertNotIsInstance(
1021
                exc_check.exception, arvados.errors.NotFoundError,
1022
                "mixed errors raised NotFoundError")
1023

    
1024
    def test_hint_server_can_succeed_without_retries(self):
1025
        with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1026
            self.check_success(locator=self.HINTED_LOCATOR)
1027

    
1028
    def test_try_next_server_after_timeout(self):
1029
        with tutil.mock_keep_responses(
1030
                (socket.timeout("timed out"), 200),
1031
                (self.DEFAULT_EXPECT, 200)):
1032
            self.check_success(locator=self.HINTED_LOCATOR)
1033

    
1034
    def test_retry_data_with_wrong_checksum(self):
1035
        with tutil.mock_keep_responses(
1036
                ('baddata', 200),
1037
                (self.DEFAULT_EXPECT, 200)):
1038
            self.check_success(locator=self.HINTED_LOCATOR)
1039

    
1040
@tutil.skip_sleep
1041
class KeepClientRetryHeadTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1042
    DEFAULT_EXPECT = True
1043
    DEFAULT_EXCEPTION = arvados.errors.KeepReadError
1044
    HINTED_LOCATOR = KeepClientRetryTestMixin.TEST_LOCATOR + '+K@xyzzy'
1045
    TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1046

    
1047
    def run_method(self, locator=KeepClientRetryTestMixin.TEST_LOCATOR,
1048
                   *args, **kwargs):
1049
        return self.new_client().head(locator, *args, **kwargs)
1050

    
1051
    def test_specific_exception_when_not_found(self):
1052
        with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200):
1053
            self.check_exception(arvados.errors.NotFoundError, num_retries=3)
1054

    
1055
    def test_general_exception_with_mixed_errors(self):
1056
        # head should raise a NotFoundError if no server returns the block,
1057
        # and a high threshold of servers report that it's not found.
1058
        # This test rigs up 50/50 disagreement between two servers, and
1059
        # checks that it does not become a NotFoundError.
1060
        client = self.new_client()
1061
        with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 500):
1062
            with self.assertRaises(arvados.errors.KeepReadError) as exc_check:
1063
                client.head(self.HINTED_LOCATOR)
1064
            self.assertNotIsInstance(
1065
                exc_check.exception, arvados.errors.NotFoundError,
1066
                "mixed errors raised NotFoundError")
1067

    
1068
    def test_hint_server_can_succeed_without_retries(self):
1069
        with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 404, 200, 500):
1070
            self.check_success(locator=self.HINTED_LOCATOR)
1071

    
1072
    def test_try_next_server_after_timeout(self):
1073
        with tutil.mock_keep_responses(
1074
                (socket.timeout("timed out"), 200),
1075
                (self.DEFAULT_EXPECT, 200)):
1076
            self.check_success(locator=self.HINTED_LOCATOR)
1077

    
1078
@tutil.skip_sleep
1079
class KeepClientRetryPutTestCase(KeepClientRetryTestMixin, unittest.TestCase):
1080
    DEFAULT_EXPECT = KeepClientRetryTestMixin.TEST_LOCATOR
1081
    DEFAULT_EXCEPTION = arvados.errors.KeepWriteError
1082
    TEST_PATCHER = staticmethod(tutil.mock_keep_responses)
1083

    
1084
    def run_method(self, data=KeepClientRetryTestMixin.TEST_DATA,
1085
                   copies=1, *args, **kwargs):
1086
        return self.new_client().put(data, copies, *args, **kwargs)
1087

    
1088
    def test_do_not_send_multiple_copies_to_same_server(self):
1089
        with tutil.mock_keep_responses(self.DEFAULT_EXPECT, 200):
1090
            self.check_exception(copies=2, num_retries=3)
1091

    
1092

    
1093
class AvoidOverreplication(unittest.TestCase, tutil.ApiClientMock):
1094

    
1095
    class FakeKeepService(object):
1096
        def __init__(self, delay, will_succeed=False, will_raise=None, replicas=1):
1097
            self.delay = delay
1098
            self.will_succeed = will_succeed
1099
            self.will_raise = will_raise
1100
            self._result = {}
1101
            self._result['headers'] = {}
1102
            self._result['headers']['x-keep-replicas-stored'] = str(replicas)
1103
            self._result['body'] = 'foobar'
1104

    
1105
        def put(self, data_hash, data, timeout):
1106
            time.sleep(self.delay)
1107
            if self.will_raise is not None:
1108
                raise self.will_raise
1109
            return self.will_succeed
1110

    
1111
        def last_result(self):
1112
            if self.will_succeed:
1113
                return self._result
1114

    
1115
        def finished(self):
1116
            return False
1117
    
1118
    def setUp(self):
1119
        self.copies = 3
1120
        self.pool = arvados.KeepClient.KeepWriterThreadPool(
1121
            data = 'foo',
1122
            data_hash = 'acbd18db4cc2f85cedef654fccc4a4d8+3',
1123
            max_service_replicas = self.copies,
1124
            copies = self.copies
1125
        )
1126

    
1127
    def test_only_write_enough_on_success(self):
1128
        for i in range(10):
1129
            ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1130
            self.pool.add_task(ks, None)
1131
        self.pool.join()
1132
        self.assertEqual(self.pool.done(), self.copies)
1133

    
1134
    def test_only_write_enough_on_partial_success(self):
1135
        for i in range(5):
1136
            ks = self.FakeKeepService(delay=i/10.0, will_succeed=False)
1137
            self.pool.add_task(ks, None)
1138
            ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1139
            self.pool.add_task(ks, None)
1140
        self.pool.join()
1141
        self.assertEqual(self.pool.done(), self.copies)
1142

    
1143
    def test_only_write_enough_when_some_crash(self):
1144
        for i in range(5):
1145
            ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1146
            self.pool.add_task(ks, None)
1147
            ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1148
            self.pool.add_task(ks, None)
1149
        self.pool.join()
1150
        self.assertEqual(self.pool.done(), self.copies)
1151

    
1152
    def test_fail_when_too_many_crash(self):
1153
        for i in range(self.copies+1):
1154
            ks = self.FakeKeepService(delay=i/10.0, will_raise=Exception())
1155
            self.pool.add_task(ks, None)
1156
        for i in range(self.copies-1):
1157
            ks = self.FakeKeepService(delay=i/10.0, will_succeed=True)
1158
            self.pool.add_task(ks, None)
1159
        self.pool.join()
1160
        self.assertEqual(self.pool.done(), self.copies-1)
1161
    
1162

    
1163
@tutil.skip_sleep
1164
class RetryNeedsMultipleServices(unittest.TestCase, tutil.ApiClientMock):
1165
    # Test put()s that need two distinct servers to succeed, possibly
1166
    # requiring multiple passes through the retry loop.
1167

    
1168
    def setUp(self):
1169
        self.api_client = self.mock_keep_services(count=2)
1170
        self.keep_client = arvados.KeepClient(api_client=self.api_client)
1171

    
1172
    def test_success_after_exception(self):
1173
        with tutil.mock_keep_responses(
1174
                'acbd18db4cc2f85cedef654fccc4a4d8+3',
1175
                Exception('mock err'), 200, 200) as req_mock:
1176
            self.keep_client.put('foo', num_retries=1, copies=2)
1177
        self.assertEqual(3, req_mock.call_count)
1178

    
1179
    def test_success_after_retryable_error(self):
1180
        with tutil.mock_keep_responses(
1181
                'acbd18db4cc2f85cedef654fccc4a4d8+3',
1182
                500, 200, 200) as req_mock:
1183
            self.keep_client.put('foo', num_retries=1, copies=2)
1184
        self.assertEqual(3, req_mock.call_count)
1185

    
1186
    def test_fail_after_final_error(self):
1187
        # First retry loop gets a 200 (can't achieve replication by
1188
        # storing again on that server) and a 400 (can't retry that
1189
        # server at all), so we shouldn't try a third request.
1190
        with tutil.mock_keep_responses(
1191
                'acbd18db4cc2f85cedef654fccc4a4d8+3',
1192
                200, 400, 200) as req_mock:
1193
            with self.assertRaises(arvados.errors.KeepWriteError):
1194
                self.keep_client.put('foo', num_retries=1, copies=2)
1195
        self.assertEqual(2, req_mock.call_count)