Statistics
| Branch: | Tag: | Revision:

arvados / sdk / python / tests / test_arv_put.py @ a83e4de2

History | View | Annotate | Download (36.1 KB)

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3

    
4
import apiclient
5
import io
6
import mock
7
import os
8
import pwd
9
import re
10
import shutil
11
import subprocess
12
import sys
13
import tempfile
14
import time
15
import unittest
16
import yaml
17
import threading
18
import hashlib
19
import random
20

    
21
from cStringIO import StringIO
22

    
23
import arvados
24
import arvados.commands.put as arv_put
25
import arvados_testutil as tutil
26

    
27
from arvados_testutil import ArvadosBaseTestCase, fake_httplib2_response
28
import run_test_server
29

    
30
class ArvadosPutResumeCacheTest(ArvadosBaseTestCase):
31
    CACHE_ARGSET = [
32
        [],
33
        ['/dev/null'],
34
        ['/dev/null', '--filename', 'empty'],
35
        ['/tmp']
36
        ]
37

    
38
    def tearDown(self):
39
        super(ArvadosPutResumeCacheTest, self).tearDown()
40
        try:
41
            self.last_cache.destroy()
42
        except AttributeError:
43
            pass
44

    
45
    def cache_path_from_arglist(self, arglist):
46
        return arv_put.ResumeCache.make_path(arv_put.parse_arguments(arglist))
47

    
48
    def test_cache_names_stable(self):
49
        for argset in self.CACHE_ARGSET:
50
            self.assertEqual(self.cache_path_from_arglist(argset),
51
                              self.cache_path_from_arglist(argset),
52
                              "cache name changed for {}".format(argset))
53

    
54
    def test_cache_names_unique(self):
55
        results = []
56
        for argset in self.CACHE_ARGSET:
57
            path = self.cache_path_from_arglist(argset)
58
            self.assertNotIn(path, results)
59
            results.append(path)
60

    
61
    def test_cache_names_simple(self):
62
        # The goal here is to make sure the filename doesn't use characters
63
        # reserved by the filesystem.  Feel free to adjust this regexp as
64
        # long as it still does that.
65
        bad_chars = re.compile(r'[^-\.\w]')
66
        for argset in self.CACHE_ARGSET:
67
            path = self.cache_path_from_arglist(argset)
68
            self.assertFalse(bad_chars.search(os.path.basename(path)),
69
                             "path too exotic: {}".format(path))
70

    
71
    def test_cache_names_ignore_argument_order(self):
72
        self.assertEqual(
73
            self.cache_path_from_arglist(['a', 'b', 'c']),
74
            self.cache_path_from_arglist(['c', 'a', 'b']))
75
        self.assertEqual(
76
            self.cache_path_from_arglist(['-', '--filename', 'stdin']),
77
            self.cache_path_from_arglist(['--filename', 'stdin', '-']))
78

    
79
    def test_cache_names_differ_for_similar_paths(self):
80
        # This test needs names at / that don't exist on the real filesystem.
81
        self.assertNotEqual(
82
            self.cache_path_from_arglist(['/_arvputtest1', '/_arvputtest2']),
83
            self.cache_path_from_arglist(['/_arvputtest1/_arvputtest2']))
84

    
85
    def test_cache_names_ignore_irrelevant_arguments(self):
86
        # Workaround: parse_arguments bails on --filename with a directory.
87
        path1 = self.cache_path_from_arglist(['/tmp'])
88
        args = arv_put.parse_arguments(['/tmp'])
89
        args.filename = 'tmp'
90
        path2 = arv_put.ResumeCache.make_path(args)
91
        self.assertEqual(path1, path2,
92
                         "cache path considered --filename for directory")
93
        self.assertEqual(
94
            self.cache_path_from_arglist(['-']),
95
            self.cache_path_from_arglist(['-', '--max-manifest-depth', '1']),
96
            "cache path considered --max-manifest-depth for file")
97

    
98
    def test_cache_names_treat_negative_manifest_depths_identically(self):
99
        base_args = ['/tmp', '--max-manifest-depth']
100
        self.assertEqual(
101
            self.cache_path_from_arglist(base_args + ['-1']),
102
            self.cache_path_from_arglist(base_args + ['-2']))
103

    
104
    def test_cache_names_treat_stdin_consistently(self):
105
        self.assertEqual(
106
            self.cache_path_from_arglist(['-', '--filename', 'test']),
107
            self.cache_path_from_arglist(['/dev/stdin', '--filename', 'test']))
108

    
109
    def test_cache_names_identical_for_synonymous_names(self):
110
        self.assertEqual(
111
            self.cache_path_from_arglist(['.']),
112
            self.cache_path_from_arglist([os.path.realpath('.')]))
113
        testdir = self.make_tmpdir()
114
        looplink = os.path.join(testdir, 'loop')
115
        os.symlink(testdir, looplink)
116
        self.assertEqual(
117
            self.cache_path_from_arglist([testdir]),
118
            self.cache_path_from_arglist([looplink]))
119

    
120
    def test_cache_names_different_by_api_host(self):
121
        config = arvados.config.settings()
122
        orig_host = config.get('ARVADOS_API_HOST')
123
        try:
124
            name1 = self.cache_path_from_arglist(['.'])
125
            config['ARVADOS_API_HOST'] = 'x' + (orig_host or 'localhost')
126
            self.assertNotEqual(name1, self.cache_path_from_arglist(['.']))
127
        finally:
128
            if orig_host is None:
129
                del config['ARVADOS_API_HOST']
130
            else:
131
                config['ARVADOS_API_HOST'] = orig_host
132

    
133
    @mock.patch('arvados.keep.KeepClient.head')
134
    def test_resume_cache_with_current_stream_locators(self, keep_client_head):
135
        keep_client_head.side_effect = [True]
136
        thing = {}
137
        thing['_current_stream_locators'] = ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']
138
        with tempfile.NamedTemporaryFile() as cachefile:
139
            self.last_cache = arv_put.ResumeCache(cachefile.name)
140
        self.last_cache.save(thing)
141
        self.last_cache.close()
142
        resume_cache = arv_put.ResumeCache(self.last_cache.filename)
143
        self.assertNotEqual(None, resume_cache)
144

    
145
    @mock.patch('arvados.keep.KeepClient.head')
146
    def test_resume_cache_with_finished_streams(self, keep_client_head):
147
        keep_client_head.side_effect = [True]
148
        thing = {}
149
        thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
150
        with tempfile.NamedTemporaryFile() as cachefile:
151
            self.last_cache = arv_put.ResumeCache(cachefile.name)
152
        self.last_cache.save(thing)
153
        self.last_cache.close()
154
        resume_cache = arv_put.ResumeCache(self.last_cache.filename)
155
        self.assertNotEqual(None, resume_cache)
156

    
157
    @mock.patch('arvados.keep.KeepClient.head')
158
    def test_resume_cache_with_finished_streams_error_on_head(self, keep_client_head):
159
        keep_client_head.side_effect = Exception('Locator not found')
160
        thing = {}
161
        thing['_finished_streams'] = [['.', ['098f6bcd4621d373cade4e832627b4f6+4', '1f253c60a2306e0ee12fb6ce0c587904+6']]]
162
        with tempfile.NamedTemporaryFile() as cachefile:
163
            self.last_cache = arv_put.ResumeCache(cachefile.name)
164
        self.last_cache.save(thing)
165
        self.last_cache.close()
166
        resume_cache = arv_put.ResumeCache(self.last_cache.filename)
167
        self.assertNotEqual(None, resume_cache)
168
        self.assertRaises(None, resume_cache.check_cache())
169

    
170
    def test_basic_cache_storage(self):
171
        thing = ['test', 'list']
172
        with tempfile.NamedTemporaryFile() as cachefile:
173
            self.last_cache = arv_put.ResumeCache(cachefile.name)
174
        self.last_cache.save(thing)
175
        self.assertEqual(thing, self.last_cache.load())
176

    
177
    def test_empty_cache(self):
178
        with tempfile.NamedTemporaryFile() as cachefile:
179
            cache = arv_put.ResumeCache(cachefile.name)
180
        self.assertRaises(ValueError, cache.load)
181

    
182
    def test_cache_persistent(self):
183
        thing = ['test', 'list']
184
        path = os.path.join(self.make_tmpdir(), 'cache')
185
        cache = arv_put.ResumeCache(path)
186
        cache.save(thing)
187
        cache.close()
188
        self.last_cache = arv_put.ResumeCache(path)
189
        self.assertEqual(thing, self.last_cache.load())
190

    
191
    def test_multiple_cache_writes(self):
192
        thing = ['short', 'list']
193
        with tempfile.NamedTemporaryFile() as cachefile:
194
            self.last_cache = arv_put.ResumeCache(cachefile.name)
195
        # Start writing an object longer than the one we test, to make
196
        # sure the cache file gets truncated.
197
        self.last_cache.save(['long', 'long', 'list'])
198
        self.last_cache.save(thing)
199
        self.assertEqual(thing, self.last_cache.load())
200

    
201
    def test_cache_is_locked(self):
202
        with tempfile.NamedTemporaryFile() as cachefile:
203
            cache = arv_put.ResumeCache(cachefile.name)
204
            self.assertRaises(arv_put.ResumeCacheConflict,
205
                              arv_put.ResumeCache, cachefile.name)
206

    
207
    def test_cache_stays_locked(self):
208
        with tempfile.NamedTemporaryFile() as cachefile:
209
            self.last_cache = arv_put.ResumeCache(cachefile.name)
210
            path = cachefile.name
211
        self.last_cache.save('test')
212
        self.assertRaises(arv_put.ResumeCacheConflict,
213
                          arv_put.ResumeCache, path)
214

    
215
    def test_destroy_cache(self):
216
        cachefile = tempfile.NamedTemporaryFile(delete=False)
217
        try:
218
            cache = arv_put.ResumeCache(cachefile.name)
219
            cache.save('test')
220
            cache.destroy()
221
            try:
222
                arv_put.ResumeCache(cachefile.name)
223
            except arv_put.ResumeCacheConflict:
224
                self.fail("could not load cache after destroying it")
225
            self.assertRaises(ValueError, cache.load)
226
        finally:
227
            if os.path.exists(cachefile.name):
228
                os.unlink(cachefile.name)
229

    
230
    def test_restart_cache(self):
231
        path = os.path.join(self.make_tmpdir(), 'cache')
232
        cache = arv_put.ResumeCache(path)
233
        cache.save('test')
234
        cache.restart()
235
        self.assertRaises(ValueError, cache.load)
236
        self.assertRaises(arv_put.ResumeCacheConflict,
237
                          arv_put.ResumeCache, path)
238

    
239

    
240
class ArvPutUploadJobTest(run_test_server.TestCaseWithServers,
241
                          ArvadosBaseTestCase):
242

    
243
    def setUp(self):
244
        super(ArvPutUploadJobTest, self).setUp()
245
        run_test_server.authorize_with('active')
246
        # Temp files creation
247
        self.tempdir = tempfile.mkdtemp()
248
        subdir = os.path.join(self.tempdir, 'subdir')
249
        os.mkdir(subdir)
250
        data = "x" * 1024 # 1 KB
251
        for i in range(1, 5):
252
            with open(os.path.join(self.tempdir, str(i)), 'w') as f:
253
                f.write(data * i)
254
        with open(os.path.join(subdir, 'otherfile'), 'w') as f:
255
            f.write(data * 5)
256
        # Large temp file for resume test
257
        _, self.large_file_name = tempfile.mkstemp()
258
        fileobj = open(self.large_file_name, 'w')
259
        # Make sure to write just a little more than one block
260
        for _ in range((arvados.config.KEEP_BLOCK_SIZE/(1024*1024))+1):
261
            data = random.choice(['x', 'y', 'z']) * 1024 * 1024 # 1 MB
262
            fileobj.write(data)
263
        fileobj.close()
264
        # Temp dir containing small files to be repacked
265
        self.small_files_dir = tempfile.mkdtemp()
266
        data = 'y' * 1024 * 1024 # 1 MB
267
        for i in range(1, 70):
268
            with open(os.path.join(self.small_files_dir, str(i)), 'w') as f:
269
                f.write(data + str(i))
270
        self.arvfile_write = getattr(arvados.arvfile.ArvadosFileWriter, 'write')
271

    
272
    def tearDown(self):
273
        super(ArvPutUploadJobTest, self).tearDown()
274
        shutil.rmtree(self.tempdir)
275
        os.unlink(self.large_file_name)
276
        shutil.rmtree(self.small_files_dir)
277

    
278
    def test_writer_works_without_cache(self):
279
        cwriter = arv_put.ArvPutUploadJob(['/dev/null'], resume=False)
280
        cwriter.start(save_collection=False)
281
        self.assertEqual(". d41d8cd98f00b204e9800998ecf8427e+0 0:0:null\n", cwriter.manifest_text())
282

    
283
    def test_writer_works_with_cache(self):
284
        with tempfile.NamedTemporaryFile() as f:
285
            f.write('foo')
286
            f.flush()
287
            cwriter = arv_put.ArvPutUploadJob([f.name])
288
            cwriter.start(save_collection=False)
289
            self.assertEqual(3, cwriter.bytes_written - cwriter.bytes_skipped)
290
            # Don't destroy the cache, and start another upload
291
            cwriter_new = arv_put.ArvPutUploadJob([f.name])
292
            cwriter_new.start(save_collection=False)
293
            cwriter_new.destroy_cache()
294
            self.assertEqual(0, cwriter_new.bytes_written - cwriter_new.bytes_skipped)
295

    
296
    def make_progress_tester(self):
297
        progression = []
298
        def record_func(written, expected):
299
            progression.append((written, expected))
300
        return progression, record_func
301

    
302
    def test_progress_reporting(self):
303
        with tempfile.NamedTemporaryFile() as f:
304
            f.write('foo')
305
            f.flush()
306
            for expect_count in (None, 8):
307
                progression, reporter = self.make_progress_tester()
308
                cwriter = arv_put.ArvPutUploadJob([f.name],
309
                    reporter=reporter, bytes_expected=expect_count)
310
                cwriter.start(save_collection=False)
311
                cwriter.destroy_cache()
312
                self.assertIn((3, expect_count), progression)
313

    
314
    def test_writer_upload_directory(self):
315
        cwriter = arv_put.ArvPutUploadJob([self.tempdir])
316
        cwriter.start(save_collection=False)
317
        cwriter.destroy_cache()
318
        self.assertEqual(1024*(1+2+3+4+5), cwriter.bytes_written)
319

    
320
    def test_resume_large_file_upload(self):
321
        def wrapped_write(*args, **kwargs):
322
            data = args[1]
323
            # Exit only on last block
324
            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
325
                raise SystemExit("Simulated error")
326
            return self.arvfile_write(*args, **kwargs)
327

    
328
        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
329
                        autospec=True) as mocked_write:
330
            mocked_write.side_effect = wrapped_write
331
            writer = arv_put.ArvPutUploadJob([self.large_file_name],
332
                                             replication_desired=1)
333
            with self.assertRaises(SystemExit):
334
                writer.start(save_collection=False)
335
            # Confirm that the file was partially uploaded
336
            self.assertGreater(writer.bytes_written, 0)
337
            self.assertLess(writer.bytes_written,
338
                            os.path.getsize(self.large_file_name))
339
        # Retry the upload
340
        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
341
                                          replication_desired=1)
342
        writer2.start(save_collection=False)
343
        self.assertEqual(writer.bytes_written + writer2.bytes_written - writer2.bytes_skipped,
344
                         os.path.getsize(self.large_file_name))
345
        writer2.destroy_cache()
346

    
347
    # Test for bug #11002
348
    def test_graceful_exit_while_repacking_small_blocks(self):
349
        def wrapped_commit(*args, **kwargs):
350
            raise KeyboardInterrupt("Simulated error")
351

    
352
        with mock.patch('arvados.arvfile._BlockManager.commit_bufferblock',
353
                        autospec=True) as mocked_commit:
354
            mocked_commit.side_effect = wrapped_commit
355
            # Upload a little more than 1 block, wrapped_commit will make the first block
356
            # commit to fail.
357
            # arv-put should not exit with an exception by trying to commit the collection
358
            # as it's in an inconsistent state.
359
            writer = arv_put.ArvPutUploadJob([self.small_files_dir],
360
                                             replication_desired=1)
361
            try:
362
                with self.assertRaises(KeyboardInterrupt):
363
                    writer.start(save_collection=False)
364
            except AttributeError:
365
                self.fail("arv-put command is trying to use a corrupted BlockManager. See https://dev.arvados.org/issues/11002")
366
        writer.destroy_cache()
367

    
368
    def test_no_resume_when_asked(self):
369
        def wrapped_write(*args, **kwargs):
370
            data = args[1]
371
            # Exit only on last block
372
            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
373
                raise SystemExit("Simulated error")
374
            return self.arvfile_write(*args, **kwargs)
375

    
376
        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
377
                        autospec=True) as mocked_write:
378
            mocked_write.side_effect = wrapped_write
379
            writer = arv_put.ArvPutUploadJob([self.large_file_name],
380
                                             replication_desired=1)
381
            with self.assertRaises(SystemExit):
382
                writer.start(save_collection=False)
383
            # Confirm that the file was partially uploaded
384
            self.assertGreater(writer.bytes_written, 0)
385
            self.assertLess(writer.bytes_written,
386
                            os.path.getsize(self.large_file_name))
387
        # Retry the upload, this time without resume
388
        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
389
                                          replication_desired=1,
390
                                          resume=False)
391
        writer2.start(save_collection=False)
392
        self.assertEqual(writer2.bytes_skipped, 0)
393
        self.assertEqual(writer2.bytes_written,
394
                         os.path.getsize(self.large_file_name))
395
        writer2.destroy_cache()
396

    
397
    def test_no_resume_when_no_cache(self):
398
        def wrapped_write(*args, **kwargs):
399
            data = args[1]
400
            # Exit only on last block
401
            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
402
                raise SystemExit("Simulated error")
403
            return self.arvfile_write(*args, **kwargs)
404

    
405
        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
406
                        autospec=True) as mocked_write:
407
            mocked_write.side_effect = wrapped_write
408
            writer = arv_put.ArvPutUploadJob([self.large_file_name],
409
                                             replication_desired=1)
410
            with self.assertRaises(SystemExit):
411
                writer.start(save_collection=False)
412
            # Confirm that the file was partially uploaded
413
            self.assertGreater(writer.bytes_written, 0)
414
            self.assertLess(writer.bytes_written,
415
                            os.path.getsize(self.large_file_name))
416
        # Retry the upload, this time without cache usage
417
        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
418
                                          replication_desired=1,
419
                                          resume=False,
420
                                          use_cache=False)
421
        writer2.start(save_collection=False)
422
        self.assertEqual(writer2.bytes_skipped, 0)
423
        self.assertEqual(writer2.bytes_written,
424
                         os.path.getsize(self.large_file_name))
425
        writer2.destroy_cache()
426

    
427

    
428
    def test_dry_run_feature(self):
429
        def wrapped_write(*args, **kwargs):
430
            data = args[1]
431
            # Exit only on last block
432
            if len(data) < arvados.config.KEEP_BLOCK_SIZE:
433
                raise SystemExit("Simulated error")
434
            return self.arvfile_write(*args, **kwargs)
435

    
436
        with mock.patch('arvados.arvfile.ArvadosFileWriter.write',
437
                        autospec=True) as mocked_write:
438
            mocked_write.side_effect = wrapped_write
439
            writer = arv_put.ArvPutUploadJob([self.large_file_name],
440
                                             replication_desired=1)
441
            with self.assertRaises(SystemExit):
442
                writer.start(save_collection=False)
443
            # Confirm that the file was partially uploaded
444
            self.assertGreater(writer.bytes_written, 0)
445
            self.assertLess(writer.bytes_written,
446
                            os.path.getsize(self.large_file_name))
447
        # Retry the upload using dry_run to check if there is a pending upload
448
        writer2 = arv_put.ArvPutUploadJob([self.large_file_name],
449
                                          replication_desired=1,
450
                                          dry_run=True)
451
        with self.assertRaises(arv_put.ArvPutUploadIsPending):
452
            writer2.start(save_collection=False)
453
        # Complete the pending upload
454
        writer3 = arv_put.ArvPutUploadJob([self.large_file_name],
455
                                          replication_desired=1)
456
        writer3.start(save_collection=False)
457
        # Confirm there's no pending upload with dry_run=True
458
        writer4 = arv_put.ArvPutUploadJob([self.large_file_name],
459
                                          replication_desired=1,
460
                                          dry_run=True)
461
        with self.assertRaises(arv_put.ArvPutUploadNotPending):
462
            writer4.start(save_collection=False)
463
        writer4.destroy_cache()
464
        # Test obvious cases
465
        with self.assertRaises(arv_put.ArvPutUploadIsPending):
466
            arv_put.ArvPutUploadJob([self.large_file_name],
467
                                    replication_desired=1,
468
                                    dry_run=True,
469
                                    resume=False,
470
                                    use_cache=False)
471
        with self.assertRaises(arv_put.ArvPutUploadIsPending):
472
            arv_put.ArvPutUploadJob([self.large_file_name],
473
                                    replication_desired=1,
474
                                    dry_run=True,
475
                                    resume=False)
476

    
477

    
478
class ArvadosExpectedBytesTest(ArvadosBaseTestCase):
479
    TEST_SIZE = os.path.getsize(__file__)
480

    
481
    def test_expected_bytes_for_file(self):
482
        self.assertEqual(self.TEST_SIZE,
483
                          arv_put.expected_bytes_for([__file__]))
484

    
485
    def test_expected_bytes_for_tree(self):
486
        tree = self.make_tmpdir()
487
        shutil.copyfile(__file__, os.path.join(tree, 'one'))
488
        shutil.copyfile(__file__, os.path.join(tree, 'two'))
489
        self.assertEqual(self.TEST_SIZE * 2,
490
                          arv_put.expected_bytes_for([tree]))
491
        self.assertEqual(self.TEST_SIZE * 3,
492
                          arv_put.expected_bytes_for([tree, __file__]))
493

    
494
    def test_expected_bytes_for_device(self):
495
        self.assertIsNone(arv_put.expected_bytes_for(['/dev/null']))
496
        self.assertIsNone(arv_put.expected_bytes_for([__file__, '/dev/null']))
497

    
498

    
499
class ArvadosPutReportTest(ArvadosBaseTestCase):
500
    def test_machine_progress(self):
501
        for count, total in [(0, 1), (0, None), (1, None), (235, 9283)]:
502
            expect = ": {} written {} total\n".format(
503
                count, -1 if (total is None) else total)
504
            self.assertTrue(
505
                arv_put.machine_progress(count, total).endswith(expect))
506

    
507
    def test_known_human_progress(self):
508
        for count, total in [(0, 1), (2, 4), (45, 60)]:
509
            expect = '{:.1%}'.format(float(count) / total)
510
            actual = arv_put.human_progress(count, total)
511
            self.assertTrue(actual.startswith('\r'))
512
            self.assertIn(expect, actual)
513

    
514
    def test_unknown_human_progress(self):
515
        for count in [1, 20, 300, 4000, 50000]:
516
            self.assertTrue(re.search(r'\b{}\b'.format(count),
517
                                      arv_put.human_progress(count, None)))
518

    
519

    
520
class ArvadosPutTest(run_test_server.TestCaseWithServers, ArvadosBaseTestCase):
521
    MAIN_SERVER = {}
522
    Z_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
523

    
524
    def call_main_with_args(self, args):
525
        self.main_stdout = StringIO()
526
        self.main_stderr = StringIO()
527
        return arv_put.main(args, self.main_stdout, self.main_stderr)
528

    
529
    def call_main_on_test_file(self, args=[]):
530
        with self.make_test_file() as testfile:
531
            path = testfile.name
532
            self.call_main_with_args(['--stream', '--no-progress'] + args + [path])
533
        self.assertTrue(
534
            os.path.exists(os.path.join(os.environ['KEEP_LOCAL_STORE'],
535
                                        '098f6bcd4621d373cade4e832627b4f6')),
536
            "did not find file stream in Keep store")
537

    
538
    def setUp(self):
539
        super(ArvadosPutTest, self).setUp()
540
        run_test_server.authorize_with('active')
541
        arv_put.api_client = None
542

    
543
    def tearDown(self):
544
        for outbuf in ['main_stdout', 'main_stderr']:
545
            if hasattr(self, outbuf):
546
                getattr(self, outbuf).close()
547
                delattr(self, outbuf)
548
        super(ArvadosPutTest, self).tearDown()
549

    
550
    def test_version_argument(self):
551
        err = io.BytesIO()
552
        out = io.BytesIO()
553
        with tutil.redirected_streams(stdout=out, stderr=err):
554
            with self.assertRaises(SystemExit):
555
                self.call_main_with_args(['--version'])
556
        self.assertEqual(out.getvalue(), '')
557
        self.assertRegexpMatches(err.getvalue(), "[0-9]+\.[0-9]+\.[0-9]+")
558

    
559
    def test_simple_file_put(self):
560
        self.call_main_on_test_file()
561

    
562
    def test_put_with_unwriteable_cache_dir(self):
563
        orig_cachedir = arv_put.ResumeCache.CACHE_DIR
564
        cachedir = self.make_tmpdir()
565
        os.chmod(cachedir, 0o0)
566
        arv_put.ResumeCache.CACHE_DIR = cachedir
567
        try:
568
            self.call_main_on_test_file()
569
        finally:
570
            arv_put.ResumeCache.CACHE_DIR = orig_cachedir
571
            os.chmod(cachedir, 0o700)
572

    
573
    def test_put_with_unwritable_cache_subdir(self):
574
        orig_cachedir = arv_put.ResumeCache.CACHE_DIR
575
        cachedir = self.make_tmpdir()
576
        os.chmod(cachedir, 0o0)
577
        arv_put.ResumeCache.CACHE_DIR = os.path.join(cachedir, 'cachedir')
578
        try:
579
            self.call_main_on_test_file()
580
        finally:
581
            arv_put.ResumeCache.CACHE_DIR = orig_cachedir
582
            os.chmod(cachedir, 0o700)
583

    
584
    def test_put_block_replication(self):
585
        self.call_main_on_test_file()
586
        with mock.patch('arvados.collection.KeepClient.local_store_put') as put_mock:
587
            put_mock.return_value = 'acbd18db4cc2f85cedef654fccc4a4d8+3'
588
            self.call_main_on_test_file(['--replication', '1'])
589
            self.call_main_on_test_file(['--replication', '4'])
590
            self.call_main_on_test_file(['--replication', '5'])
591
            self.assertEqual(
592
                [x[-1].get('copies') for x in put_mock.call_args_list],
593
                [1, 4, 5])
594

    
595
    def test_normalize(self):
596
        testfile1 = self.make_test_file()
597
        testfile2 = self.make_test_file()
598
        test_paths = [testfile1.name, testfile2.name]
599
        # Reverse-sort the paths, so normalization must change their order.
600
        test_paths.sort(reverse=True)
601
        self.call_main_with_args(['--stream', '--no-progress', '--normalize'] +
602
                                 test_paths)
603
        manifest = self.main_stdout.getvalue()
604
        # Assert the second file we specified appears first in the manifest.
605
        file_indices = [manifest.find(':' + os.path.basename(path))
606
                        for path in test_paths]
607
        self.assertGreater(*file_indices)
608

    
609
    def test_error_name_without_collection(self):
610
        self.assertRaises(SystemExit, self.call_main_with_args,
611
                          ['--name', 'test without Collection',
612
                           '--stream', '/dev/null'])
613

    
614
    def test_error_when_project_not_found(self):
615
        self.assertRaises(SystemExit,
616
                          self.call_main_with_args,
617
                          ['--project-uuid', self.Z_UUID])
618

    
619
    def test_error_bad_project_uuid(self):
620
        self.assertRaises(SystemExit,
621
                          self.call_main_with_args,
622
                          ['--project-uuid', self.Z_UUID, '--stream'])
623

    
624
    def test_api_error_handling(self):
625
        coll_save_mock = mock.Mock(name='arv.collection.Collection().save_new()')
626
        coll_save_mock.side_effect = arvados.errors.ApiError(
627
            fake_httplib2_response(403), '{}')
628
        with mock.patch('arvados.collection.Collection.save_new',
629
                        new=coll_save_mock):
630
            with self.assertRaises(SystemExit) as exc_test:
631
                self.call_main_with_args(['/dev/null'])
632
            self.assertLess(0, exc_test.exception.args[0])
633
            self.assertLess(0, coll_save_mock.call_count)
634
            self.assertEqual("", self.main_stdout.getvalue())
635

    
636

    
637
class ArvPutIntegrationTest(run_test_server.TestCaseWithServers,
638
                            ArvadosBaseTestCase):
639
    def _getKeepServerConfig():
640
        for config_file, mandatory in [
641
                ['application.yml', False], ['application.default.yml', True]]:
642
            path = os.path.join(run_test_server.SERVICES_SRC_DIR,
643
                                "api", "config", config_file)
644
            if not mandatory and not os.path.exists(path):
645
                continue
646
            with open(path) as f:
647
                rails_config = yaml.load(f.read())
648
                for config_section in ['test', 'common']:
649
                    try:
650
                        key = rails_config[config_section]["blob_signing_key"]
651
                    except (KeyError, TypeError):
652
                        pass
653
                    else:
654
                        return {'blob_signing_key': key,
655
                                'enforce_permissions': True}
656
        return {'blog_signing_key': None, 'enforce_permissions': False}
657

    
658
    MAIN_SERVER = {}
659
    KEEP_SERVER = _getKeepServerConfig()
660
    PROJECT_UUID = run_test_server.fixture('groups')['aproject']['uuid']
661

    
662
    @classmethod
663
    def setUpClass(cls):
664
        super(ArvPutIntegrationTest, cls).setUpClass()
665
        cls.ENVIRON = os.environ.copy()
666
        cls.ENVIRON['PYTHONPATH'] = ':'.join(sys.path)
667

    
668
    def setUp(self):
669
        super(ArvPutIntegrationTest, self).setUp()
670
        arv_put.api_client = None
671

    
672
    def authorize_with(self, token_name):
673
        run_test_server.authorize_with(token_name)
674
        for v in ["ARVADOS_API_HOST",
675
                  "ARVADOS_API_HOST_INSECURE",
676
                  "ARVADOS_API_TOKEN"]:
677
            self.ENVIRON[v] = arvados.config.settings()[v]
678
        arv_put.api_client = arvados.api('v1')
679

    
680
    def current_user(self):
681
        return arv_put.api_client.users().current().execute()
682

    
683
    def test_check_real_project_found(self):
684
        self.authorize_with('active')
685
        self.assertTrue(arv_put.desired_project_uuid(arv_put.api_client, self.PROJECT_UUID, 0),
686
                        "did not correctly find test fixture project")
687

    
688
    def test_check_error_finding_nonexistent_uuid(self):
689
        BAD_UUID = 'zzzzz-zzzzz-zzzzzzzzzzzzzzz'
690
        self.authorize_with('active')
691
        try:
692
            result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
693
                                                  0)
694
        except ValueError as error:
695
            self.assertIn(BAD_UUID, error.message)
696
        else:
697
            self.assertFalse(result, "incorrectly found nonexistent project")
698

    
699
    def test_check_error_finding_nonexistent_project(self):
700
        BAD_UUID = 'zzzzz-tpzed-zzzzzzzzzzzzzzz'
701
        self.authorize_with('active')
702
        with self.assertRaises(apiclient.errors.HttpError):
703
            result = arv_put.desired_project_uuid(arv_put.api_client, BAD_UUID,
704
                                                  0)
705

    
706
    def test_short_put_from_stdin(self):
707
        # Have to run this as an integration test since arv-put can't
708
        # read from the tests' stdin.
709
        # arv-put usually can't stat(os.path.realpath('/dev/stdin')) in this
710
        # case, because the /proc entry is already gone by the time it tries.
711
        pipe = subprocess.Popen(
712
            [sys.executable, arv_put.__file__, '--stream'],
713
            stdin=subprocess.PIPE, stdout=subprocess.PIPE,
714
            stderr=subprocess.STDOUT, env=self.ENVIRON)
715
        pipe.stdin.write('stdin test\n')
716
        pipe.stdin.close()
717
        deadline = time.time() + 5
718
        while (pipe.poll() is None) and (time.time() < deadline):
719
            time.sleep(.1)
720
        returncode = pipe.poll()
721
        if returncode is None:
722
            pipe.terminate()
723
            self.fail("arv-put did not PUT from stdin within 5 seconds")
724
        elif returncode != 0:
725
            sys.stdout.write(pipe.stdout.read())
726
            self.fail("arv-put returned exit code {}".format(returncode))
727
        self.assertIn('4a9c8b735dce4b5fa3acf221a0b13628+11', pipe.stdout.read())
728

    
729
    def test_ArvPutSignedManifest(self):
730
        # ArvPutSignedManifest runs "arv-put foo" and then attempts to get
731
        # the newly created manifest from the API server, testing to confirm
732
        # that the block locators in the returned manifest are signed.
733
        self.authorize_with('active')
734

    
735
        # Before doing anything, demonstrate that the collection
736
        # we're about to create is not present in our test fixture.
737
        manifest_uuid = "00b4e9f40ac4dd432ef89749f1c01e74+47"
738
        with self.assertRaises(apiclient.errors.HttpError):
739
            notfound = arv_put.api_client.collections().get(
740
                uuid=manifest_uuid).execute()
741

    
742
        datadir = self.make_tmpdir()
743
        with open(os.path.join(datadir, "foo"), "w") as f:
744
            f.write("The quick brown fox jumped over the lazy dog")
745
        p = subprocess.Popen([sys.executable, arv_put.__file__, datadir],
746
                             stdout=subprocess.PIPE, env=self.ENVIRON)
747
        (arvout, arverr) = p.communicate()
748
        self.assertEqual(arverr, None)
749
        self.assertEqual(p.returncode, 0)
750

    
751
        # The manifest text stored in the API server under the same
752
        # manifest UUID must use signed locators.
753
        c = arv_put.api_client.collections().get(uuid=manifest_uuid).execute()
754
        self.assertRegexpMatches(
755
            c['manifest_text'],
756
            r'^\. 08a008a01d498c404b0c30852b39d3b8\+44\+A[0-9a-f]+@[0-9a-f]+ 0:44:foo\n')
757

    
758
        os.remove(os.path.join(datadir, "foo"))
759
        os.rmdir(datadir)
760

    
761
    def run_and_find_collection(self, text, extra_args=[]):
762
        self.authorize_with('active')
763
        pipe = subprocess.Popen(
764
            [sys.executable, arv_put.__file__] + extra_args,
765
            stdin=subprocess.PIPE, stdout=subprocess.PIPE,
766
            stderr=subprocess.PIPE, env=self.ENVIRON)
767
        stdout, stderr = pipe.communicate(text)
768
        search_key = ('portable_data_hash'
769
                      if '--portable-data-hash' in extra_args else 'uuid')
770
        collection_list = arvados.api('v1').collections().list(
771
            filters=[[search_key, '=', stdout.strip()]]).execute().get('items', [])
772
        self.assertEqual(1, len(collection_list))
773
        return collection_list[0]
774

    
775
    def test_put_collection_with_later_update(self):
776
        tmpdir = self.make_tmpdir()
777
        with open(os.path.join(tmpdir, 'file1'), 'w') as f:
778
            f.write('Relaxing in basins at the end of inlets terminates the endless tests from the box')
779
        col = self.run_and_find_collection("", ['--no-progress', tmpdir])
780
        self.assertNotEqual(None, col['uuid'])
781
        # Add a new file to the directory
782
        with open(os.path.join(tmpdir, 'file2'), 'w') as f:
783
            f.write('The quick brown fox jumped over the lazy dog')
784
        updated_col = self.run_and_find_collection("", ['--no-progress', '--update-collection', col['uuid'], tmpdir])
785
        self.assertEqual(col['uuid'], updated_col['uuid'])
786
        # Get the manifest and check that the new file is being included
787
        c = arv_put.api_client.collections().get(uuid=updated_col['uuid']).execute()
788
        self.assertRegexpMatches(c['manifest_text'], r'^\. .*:44:file2\n')
789

    
790
    def test_put_collection_with_high_redundancy(self):
791
        # Write empty data: we're not testing CollectionWriter, just
792
        # making sure collections.create tells the API server what our
793
        # desired replication level is.
794
        collection = self.run_and_find_collection("", ['--replication', '4'])
795
        self.assertEqual(4, collection['replication_desired'])
796

    
797
    def test_put_collection_with_default_redundancy(self):
798
        collection = self.run_and_find_collection("")
799
        self.assertEqual(None, collection['replication_desired'])
800

    
801
    def test_put_collection_with_unnamed_project_link(self):
802
        link = self.run_and_find_collection(
803
            "Test unnamed collection",
804
            ['--portable-data-hash', '--project-uuid', self.PROJECT_UUID])
805
        username = pwd.getpwuid(os.getuid()).pw_name
806
        self.assertRegexpMatches(
807
            link['name'],
808
            r'^Saved at .* by {}@'.format(re.escape(username)))
809

    
810
    def test_put_collection_with_name_and_no_project(self):
811
        link_name = 'Test Collection Link in home project'
812
        collection = self.run_and_find_collection(
813
            "Test named collection in home project",
814
            ['--portable-data-hash', '--name', link_name])
815
        self.assertEqual(link_name, collection['name'])
816
        my_user_uuid = self.current_user()['uuid']
817
        self.assertEqual(my_user_uuid, collection['owner_uuid'])
818

    
819
    def test_put_collection_with_named_project_link(self):
820
        link_name = 'Test auto Collection Link'
821
        collection = self.run_and_find_collection("Test named collection",
822
                                      ['--portable-data-hash',
823
                                       '--name', link_name,
824
                                       '--project-uuid', self.PROJECT_UUID])
825
        self.assertEqual(link_name, collection['name'])
826

    
827

    
828
if __name__ == '__main__':
829
    unittest.main()