Revision 76d9365a sdk/python/tests/test_keep_client.py

View differences:

sdk/python/tests/test_keep_client.py
12 12
import random
13 13
import re
14 14
import socket
15
import sys
15 16
import threading
16 17
import time
17 18
import unittest
......
48 49

  
49 50
        self.assertEqual(0, self.keep_client.download_counter.get())
50 51
        self.assertEqual(self.keep_client.get(foo_locator),
51
                         'foo',
52
                         b'foo',
52 53
                         'wrong content from Keep.get(md5("foo"))')
53 54
        self.assertEqual(3, self.keep_client.download_counter.get())
54 55

  
......
64 65
                         'wrong content from Keep.get(md5(<binarydata>))')
65 66

  
66 67
    def test_KeepLongBinaryRWTest(self):
67
        blob_str = '\xff\xfe\xfd\xfc\x00\x01\x02\x03'
68
        blob_data = '\xff\xfe\xfd\xfc\x00\x01\x02\x03'
68 69
        for i in range(0,23):
69
            blob_str = blob_str + blob_str
70
        blob_locator = self.keep_client.put(blob_str)
70
            blob_data = blob_data + blob_data
71
        blob_locator = self.keep_client.put(blob_data)
71 72
        self.assertRegexpMatches(
72 73
            blob_locator,
73 74
            '^84d90fc0d8175dd5dcfab04b999bc956\+67108864',
74 75
            ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
75 76
        self.assertEqual(self.keep_client.get(blob_locator),
76
                         blob_str,
77
                         blob_data,
77 78
                         'wrong content from Keep.get(md5(<binarydata>))')
78 79

  
79 80
    @unittest.skip("unreliable test - please fix and close #8752")
80 81
    def test_KeepSingleCopyRWTest(self):
81
        blob_str = '\xff\xfe\xfd\xfc\x00\x01\x02\x03'
82
        blob_locator = self.keep_client.put(blob_str, copies=1)
82
        blob_data = b'\xff\xfe\xfd\xfc\x00\x01\x02\x03'
83
        blob_locator = self.keep_client.put(blob_data, copies=1)
83 84
        self.assertRegexpMatches(
84 85
            blob_locator,
85 86
            '^c902006bc98a3eb4a3663b65ab4a6fab\+8',
86 87
            ('wrong locator from Keep.put(<binarydata>): ' + blob_locator))
87 88
        self.assertEqual(self.keep_client.get(blob_locator),
88
                         blob_str,
89
                         blob_data,
89 90
                         'wrong content from Keep.get(md5(<binarydata>))')
90 91

  
91 92
    def test_KeepEmptyCollectionTest(self):
......
103 104
            '^acbd18db4cc2f85cedef654fccc4a4d8\+3',
104 105
            'wrong md5 hash from Keep.put("foo"): ' + foo_locator)
105 106

  
106
        with self.assertRaises(UnicodeEncodeError):
107
            # Error if it is not ASCII
108
            self.keep_client.put(u'\xe2')
107
        if sys.version_info < (3, 0):
108
            with self.assertRaises(UnicodeEncodeError):
109
                # Error if it is not ASCII
110
                self.keep_client.put(u'\xe2')
109 111

  
110 112
        with self.assertRaises(AttributeError):
111 113
            # Must be bytes or have an encode() method
......
119 121
            'wrong md5 hash from Keep.put for "test_head": ' + locator)
120 122
        self.assertEqual(True, self.keep_client.head(locator))
121 123
        self.assertEqual(self.keep_client.get(locator),
122
                         'test_head',
124
                         b'test_head',
123 125
                         'wrong content from Keep.get for "test_head"')
124 126

  
125 127
class KeepPermissionTestCase(run_test_server.TestCaseWithServers):
......
136 138
            r'^acbd18db4cc2f85cedef654fccc4a4d8\+3\+A[a-f0-9]+@[a-f0-9]+$',
137 139
            'invalid locator from Keep.put("foo"): ' + foo_locator)
138 140
        self.assertEqual(keep_client.get(foo_locator),
139
                         'foo',
141
                         b'foo',
140 142
                         'wrong content from Keep.get(md5("foo"))')
141 143

  
142 144
        # GET with an unsigned locator => NotFound
......
203 205
    def test_KeepAuthenticatedSignedTest(self):
204 206
        signed_locator = self._put_foo_and_check()
205 207
        self.assertEqual(self.keep_client.get(signed_locator),
206
                         'foo',
208
                         b'foo',
207 209
                         'wrong content from Keep.get(md5("foo"))')
208 210

  
209 211
    def test_KeepAuthenticatedUnsignedTest(self):
210 212
        signed_locator = self._put_foo_and_check()
211 213
        self.assertEqual(self.keep_client.get("acbd18db4cc2f85cedef654fccc4a4d8"),
212
                         'foo',
214
                         b'foo',
213 215
                         'wrong content from Keep.get(md5("foo"))')
214 216

  
215 217
    def test_KeepUnauthenticatedSignedTest(self):
......
218 220
        signed_locator = self._put_foo_and_check()
219 221
        self.keep_client.api_token = ''
220 222
        self.assertEqual(self.keep_client.get(signed_locator),
221
                         'foo',
223
                         b'foo',
222 224
                         'wrong content from Keep.get(md5("foo"))')
223 225

  
224 226
    def test_KeepUnauthenticatedUnsignedTest(self):
......
257 259
            '^73feffa4b7f6bb68e44cf984c85f6e88\+3',
258 260
            'wrong md5 hash from Keep.put("baz"): ' + baz_locator)
259 261
        self.assertEqual(keep_client.get(baz_locator),
260
                         'baz',
262
                         b'baz',
261 263
                         'wrong content from Keep.get(md5("baz"))')
262 264
        self.assertTrue(keep_client.using_proxy)
263 265

  
......
273 275
            '^91f372a266fe2bf2823cb8ec7fda31ce\+4',
274 276
            'wrong md5 hash from Keep.put("baz2"): ' + baz_locator)
275 277
        self.assertEqual(keep_client.get(baz_locator),
276
                         'baz2',
278
                         b'baz2',
277 279
                         'wrong content from Keep.get(md5("baz2"))')
278 280
        self.assertTrue(keep_client.using_proxy)
279 281

  
......
530 532
            list('9d81c02e76a3bf54'),
531 533
            ]
532 534
        self.blocks = [
533
            "{:064x}".format(x)
535
            "{:064x}".format(x).encode('utf-8')
534 536
            for x in range(len(self.expected_order))]
535 537
        self.hashes = [
536 538
            hashlib.md5(self.blocks[x]).hexdigest()

Also available in: Unified diff