Statistics
| Branch: | Tag: | Revision:

arvados / sdk / python / arvados / commands / put.py @ a83e4de2

History | View | Annotate | Download (37.4 KB)

1
#!/usr/bin/env python
2

    
3
# TODO:
4
# --md5sum - display md5 of each file as read from disk
5

    
6
import argparse
7
import arvados
8
import arvados.collection
9
import base64
10
import copy
11
import datetime
12
import errno
13
import fcntl
14
import hashlib
15
import json
16
import logging
17
import os
18
import pwd
19
import re
20
import signal
21
import socket
22
import sys
23
import tempfile
24
import threading
25
import time
26
from apiclient import errors as apiclient_errors
27
from arvados._version import __version__
28

    
29
import arvados.commands._util as arv_cmd
30

    
31
CAUGHT_SIGNALS = [signal.SIGINT, signal.SIGQUIT, signal.SIGTERM]
32
api_client = None
33

    
34
upload_opts = argparse.ArgumentParser(add_help=False)
35

    
36
upload_opts.add_argument('--version', action='version',
37
                         version="%s %s" % (sys.argv[0], __version__),
38
                         help='Print version and exit.')
39
upload_opts.add_argument('paths', metavar='path', type=str, nargs='*',
40
                         help="""
41
Local file or directory. Default: read from standard input.
42
""")
43

    
44
_group = upload_opts.add_mutually_exclusive_group()
45

    
46
_group.add_argument('--max-manifest-depth', type=int, metavar='N',
47
                    default=-1, help=argparse.SUPPRESS)
48

    
49
_group.add_argument('--normalize', action='store_true',
50
                    help="""
51
Normalize the manifest by re-ordering files and streams after writing
52
data.
53
""")
54

    
55
_group.add_argument('--dry-run', action='store_true', default=False,
56
                    help="""
57
Don't actually upload files, but only check if any file should be
58
uploaded. Exit with code=2 when files are pending for upload.
59
""")
60

    
61
_group = upload_opts.add_mutually_exclusive_group()
62

    
63
_group.add_argument('--as-stream', action='store_true', dest='stream',
64
                    help="""
65
Synonym for --stream.
66
""")
67

    
68
_group.add_argument('--stream', action='store_true',
69
                    help="""
70
Store the file content and display the resulting manifest on
71
stdout. Do not write the manifest to Keep or save a Collection object
72
in Arvados.
73
""")
74

    
75
_group.add_argument('--as-manifest', action='store_true', dest='manifest',
76
                    help="""
77
Synonym for --manifest.
78
""")
79

    
80
_group.add_argument('--in-manifest', action='store_true', dest='manifest',
81
                    help="""
82
Synonym for --manifest.
83
""")
84

    
85
_group.add_argument('--manifest', action='store_true',
86
                    help="""
87
Store the file data and resulting manifest in Keep, save a Collection
88
object in Arvados, and display the manifest locator (Collection uuid)
89
on stdout. This is the default behavior.
90
""")
91

    
92
_group.add_argument('--as-raw', action='store_true', dest='raw',
93
                    help="""
94
Synonym for --raw.
95
""")
96

    
97
_group.add_argument('--raw', action='store_true',
98
                    help="""
99
Store the file content and display the data block locators on stdout,
100
separated by commas, with a trailing newline. Do not store a
101
manifest.
102
""")
103

    
104
upload_opts.add_argument('--update-collection', type=str, default=None,
105
                         dest='update_collection', metavar="UUID", help="""
106
Update an existing collection identified by the given Arvados collection
107
UUID. All new local files will be uploaded.
108
""")
109

    
110
upload_opts.add_argument('--use-filename', type=str, default=None,
111
                         dest='filename', help="""
112
Synonym for --filename.
113
""")
114

    
115
upload_opts.add_argument('--filename', type=str, default=None,
116
                         help="""
117
Use the given filename in the manifest, instead of the name of the
118
local file. This is useful when "-" or "/dev/stdin" is given as an
119
input file. It can be used only if there is exactly one path given and
120
it is not a directory. Implies --manifest.
121
""")
122

    
123
upload_opts.add_argument('--portable-data-hash', action='store_true',
124
                         help="""
125
Print the portable data hash instead of the Arvados UUID for the collection
126
created by the upload.
127
""")
128

    
129
upload_opts.add_argument('--replication', type=int, metavar='N', default=None,
130
                         help="""
131
Set the replication level for the new collection: how many different
132
physical storage devices (e.g., disks) should have a copy of each data
133
block. Default is to use the server-provided default (if any) or 2.
134
""")
135

    
136
upload_opts.add_argument('--threads', type=int, metavar='N', default=None,
137
                         help="""
138
Set the number of upload threads to be used. Take into account that
139
using lots of threads will increase the RAM requirements. Default is
140
to use 2 threads.
141
On high latency installations, using a greater number will improve
142
overall throughput.
143
""")
144

    
145
run_opts = argparse.ArgumentParser(add_help=False)
146

    
147
run_opts.add_argument('--project-uuid', metavar='UUID', help="""
148
Store the collection in the specified project, instead of your Home
149
project.
150
""")
151

    
152
run_opts.add_argument('--name', help="""
153
Save the collection with the specified name.
154
""")
155

    
156
_group = run_opts.add_mutually_exclusive_group()
157
_group.add_argument('--progress', action='store_true',
158
                    help="""
159
Display human-readable progress on stderr (bytes and, if possible,
160
percentage of total data size). This is the default behavior when
161
stderr is a tty.
162
""")
163

    
164
_group.add_argument('--no-progress', action='store_true',
165
                    help="""
166
Do not display human-readable progress on stderr, even if stderr is a
167
tty.
168
""")
169

    
170
_group.add_argument('--batch-progress', action='store_true',
171
                    help="""
172
Display machine-readable progress on stderr (bytes and, if known,
173
total data size).
174
""")
175

    
176
_group = run_opts.add_mutually_exclusive_group()
177
_group.add_argument('--resume', action='store_true', default=True,
178
                    help="""
179
Continue interrupted uploads from cached state (default).
180
""")
181
_group.add_argument('--no-resume', action='store_false', dest='resume',
182
                    help="""
183
Do not continue interrupted uploads from cached state.
184
""")
185

    
186
_group = run_opts.add_mutually_exclusive_group()
187
_group.add_argument('--cache', action='store_true', dest='use_cache', default=True,
188
                    help="""
189
Save upload state in a cache file for resuming (default).
190
""")
191
_group.add_argument('--no-cache', action='store_false', dest='use_cache',
192
                    help="""
193
Do not save upload state in a cache file for resuming.
194
""")
195

    
196
arg_parser = argparse.ArgumentParser(
197
    description='Copy data from the local filesystem to Keep.',
198
    parents=[upload_opts, run_opts, arv_cmd.retry_opt])
199

    
200
def parse_arguments(arguments):
201
    args = arg_parser.parse_args(arguments)
202

    
203
    if len(args.paths) == 0:
204
        args.paths = ['-']
205

    
206
    args.paths = map(lambda x: "-" if x == "/dev/stdin" else x, args.paths)
207

    
208
    if len(args.paths) != 1 or os.path.isdir(args.paths[0]):
209
        if args.filename:
210
            arg_parser.error("""
211
    --filename argument cannot be used when storing a directory or
212
    multiple files.
213
    """)
214

    
215
    # Turn on --progress by default if stderr is a tty.
216
    if (not (args.batch_progress or args.no_progress)
217
        and os.isatty(sys.stderr.fileno())):
218
        args.progress = True
219

    
220
    # Turn off --resume (default) if --no-cache is used.
221
    if not args.use_cache:
222
        args.resume = False
223

    
224
    if args.paths == ['-']:
225
        if args.update_collection:
226
            arg_parser.error("""
227
    --update-collection cannot be used when reading from stdin.
228
    """)
229
        args.resume = False
230
        args.use_cache = False
231
        if not args.filename:
232
            args.filename = 'stdin'
233

    
234
    return args
235

    
236

    
237
class CollectionUpdateError(Exception):
238
    pass
239

    
240

    
241
class ResumeCacheConflict(Exception):
242
    pass
243

    
244

    
245
class ArvPutArgumentConflict(Exception):
246
    pass
247

    
248

    
249
class ArvPutUploadIsPending(Exception):
250
    pass
251

    
252

    
253
class ArvPutUploadNotPending(Exception):
254
    pass
255

    
256

    
257
class FileUploadList(list):
258
    def __init__(self, dry_run=False):
259
        list.__init__(self)
260
        self.dry_run = dry_run
261

    
262
    def append(self, other):
263
        if self.dry_run:
264
            raise ArvPutUploadIsPending()
265
        super(FileUploadList, self).append(other)
266

    
267

    
268
class ResumeCache(object):
269
    CACHE_DIR = '.cache/arvados/arv-put'
270

    
271
    def __init__(self, file_spec):
272
        self.cache_file = open(file_spec, 'a+')
273
        self._lock_file(self.cache_file)
274
        self.filename = self.cache_file.name
275

    
276
    @classmethod
277
    def make_path(cls, args):
278
        md5 = hashlib.md5()
279
        md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
280
        realpaths = sorted(os.path.realpath(path) for path in args.paths)
281
        md5.update('\0'.join(realpaths))
282
        if any(os.path.isdir(path) for path in realpaths):
283
            md5.update("-1")
284
        elif args.filename:
285
            md5.update(args.filename)
286
        return os.path.join(
287
            arv_cmd.make_home_conf_dir(cls.CACHE_DIR, 0o700, 'raise'),
288
            md5.hexdigest())
289

    
290
    def _lock_file(self, fileobj):
291
        try:
292
            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
293
        except IOError:
294
            raise ResumeCacheConflict("{} locked".format(fileobj.name))
295

    
296
    def load(self):
297
        self.cache_file.seek(0)
298
        return json.load(self.cache_file)
299

    
300
    def check_cache(self, api_client=None, num_retries=0):
301
        try:
302
            state = self.load()
303
            locator = None
304
            try:
305
                if "_finished_streams" in state and len(state["_finished_streams"]) > 0:
306
                    locator = state["_finished_streams"][0][1][0]
307
                elif "_current_stream_locators" in state and len(state["_current_stream_locators"]) > 0:
308
                    locator = state["_current_stream_locators"][0]
309
                if locator is not None:
310
                    kc = arvados.keep.KeepClient(api_client=api_client)
311
                    kc.head(locator, num_retries=num_retries)
312
            except Exception as e:
313
                self.restart()
314
        except (ValueError):
315
            pass
316

    
317
    def save(self, data):
318
        try:
319
            new_cache_fd, new_cache_name = tempfile.mkstemp(
320
                dir=os.path.dirname(self.filename))
321
            self._lock_file(new_cache_fd)
322
            new_cache = os.fdopen(new_cache_fd, 'r+')
323
            json.dump(data, new_cache)
324
            os.rename(new_cache_name, self.filename)
325
        except (IOError, OSError, ResumeCacheConflict) as error:
326
            try:
327
                os.unlink(new_cache_name)
328
            except NameError:  # mkstemp failed.
329
                pass
330
        else:
331
            self.cache_file.close()
332
            self.cache_file = new_cache
333

    
334
    def close(self):
335
        self.cache_file.close()
336

    
337
    def destroy(self):
338
        try:
339
            os.unlink(self.filename)
340
        except OSError as error:
341
            if error.errno != errno.ENOENT:  # That's what we wanted anyway.
342
                raise
343
        self.close()
344

    
345
    def restart(self):
346
        self.destroy()
347
        self.__init__(self.filename)
348

    
349

    
350
class ArvPutUploadJob(object):
351
    CACHE_DIR = '.cache/arvados/arv-put'
352
    EMPTY_STATE = {
353
        'manifest' : None, # Last saved manifest checkpoint
354
        'files' : {} # Previous run file list: {path : {size, mtime}}
355
    }
356

    
357
    def __init__(self, paths, resume=True, use_cache=True, reporter=None,
358
                 bytes_expected=None, name=None, owner_uuid=None,
359
                 ensure_unique_name=False, num_retries=None,
360
                 put_threads=None, replication_desired=None,
361
                 filename=None, update_time=60.0, update_collection=None,
362
                 logger=logging.getLogger('arvados.arv_put'), dry_run=False):
363
        self.paths = paths
364
        self.resume = resume
365
        self.use_cache = use_cache
366
        self.update = False
367
        self.reporter = reporter
368
        self.bytes_expected = bytes_expected
369
        self.bytes_written = 0
370
        self.bytes_skipped = 0
371
        self.name = name
372
        self.owner_uuid = owner_uuid
373
        self.ensure_unique_name = ensure_unique_name
374
        self.num_retries = num_retries
375
        self.replication_desired = replication_desired
376
        self.put_threads = put_threads
377
        self.filename = filename
378
        self._state_lock = threading.Lock()
379
        self._state = None # Previous run state (file list & manifest)
380
        self._current_files = [] # Current run file list
381
        self._cache_file = None
382
        self._collection_lock = threading.Lock()
383
        self._remote_collection = None # Collection being updated (if asked)
384
        self._local_collection = None # Collection from previous run manifest
385
        self._file_paths = set() # Files to be updated in remote collection
386
        self._stop_checkpointer = threading.Event()
387
        self._checkpointer = threading.Thread(target=self._update_task)
388
        self._checkpointer.daemon = True
389
        self._update_task_time = update_time  # How many seconds wait between update runs
390
        self._files_to_upload = FileUploadList(dry_run=dry_run)
391
        self._upload_started = False
392
        self.logger = logger
393
        self.dry_run = dry_run
394
        self._checkpoint_before_quit = True
395

    
396
        if not self.use_cache and self.resume:
397
            raise ArvPutArgumentConflict('resume cannot be True when use_cache is False')
398

    
399
        # Check for obvious dry-run responses
400
        if self.dry_run and (not self.use_cache or not self.resume):
401
            raise ArvPutUploadIsPending()
402

    
403
        # Load cached data if any and if needed
404
        self._setup_state(update_collection)
405

    
406
    def start(self, save_collection):
407
        """
408
        Start supporting thread & file uploading
409
        """
410
        if not self.dry_run:
411
            self._checkpointer.start()
412
        try:
413
            for path in self.paths:
414
                # Test for stdin first, in case some file named '-' exist
415
                if path == '-':
416
                    if self.dry_run:
417
                        raise ArvPutUploadIsPending()
418
                    self._write_stdin(self.filename or 'stdin')
419
                elif os.path.isdir(path):
420
                    # Use absolute paths on cache index so CWD doesn't interfere
421
                    # with the caching logic.
422
                    prefixdir = path = os.path.abspath(path)
423
                    if prefixdir != '/':
424
                        prefixdir += '/'
425
                    for root, dirs, files in os.walk(path):
426
                        # Make os.walk()'s dir traversing order deterministic
427
                        dirs.sort()
428
                        files.sort()
429
                        for f in files:
430
                            self._check_file(os.path.join(root, f),
431
                                             os.path.join(root[len(prefixdir):], f))
432
                else:
433
                    self._check_file(os.path.abspath(path),
434
                                     self.filename or os.path.basename(path))
435
            # If dry-mode is on, and got up to this point, then we should notify that
436
            # there aren't any file to upload.
437
            if self.dry_run:
438
                raise ArvPutUploadNotPending()
439
            # Remove local_collection's files that don't exist locally anymore, so the
440
            # bytes_written count is correct.
441
            for f in self.collection_file_paths(self._local_collection,
442
                                                path_prefix=""):
443
                if f != 'stdin' and f != self.filename and not f in self._file_paths:
444
                    self._local_collection.remove(f)
445
            # Update bytes_written from current local collection and
446
            # report initial progress.
447
            self._update()
448
            # Actual file upload
449
            self._upload_started = True # Used by the update thread to start checkpointing
450
            self._upload_files()
451
        except KeyboardInterrupt:
452
            self.logger.warning("User interrupt request, cleaning up before exiting.")
453
            self._checkpoint_before_quit = False
454
            raise
455
        finally:
456
            if not self.dry_run:
457
                # Stop the thread before doing anything else
458
                self._stop_checkpointer.set()
459
                self._checkpointer.join()
460
                if self._checkpoint_before_quit:
461
                    # Commit all pending blocks & one last _update()
462
                    self._local_collection.manifest_text()
463
                    self._update(final=True)
464
                    if save_collection:
465
                        self.save_collection()
466
            if self.use_cache:
467
                self._cache_file.close()
468

    
469
    def save_collection(self):
470
        if self.update:
471
            # Check if files should be updated on the remote collection.
472
            for fp in self._file_paths:
473
                remote_file = self._remote_collection.find(fp)
474
                if not remote_file:
475
                    # File don't exist on remote collection, copy it.
476
                    self._remote_collection.copy(fp, fp, self._local_collection)
477
                elif remote_file != self._local_collection.find(fp):
478
                    # A different file exist on remote collection, overwrite it.
479
                    self._remote_collection.copy(fp, fp, self._local_collection, overwrite=True)
480
                else:
481
                    # The file already exist on remote collection, skip it.
482
                    pass
483
            self._remote_collection.save(num_retries=self.num_retries)
484
        else:
485
            self._local_collection.save_new(
486
                name=self.name, owner_uuid=self.owner_uuid,
487
                ensure_unique_name=self.ensure_unique_name,
488
                num_retries=self.num_retries)
489

    
490
    def destroy_cache(self):
491
        if self.use_cache:
492
            try:
493
                os.unlink(self._cache_filename)
494
            except OSError as error:
495
                # That's what we wanted anyway.
496
                if error.errno != errno.ENOENT:
497
                    raise
498
            self._cache_file.close()
499

    
500
    def _collection_size(self, collection):
501
        """
502
        Recursively get the total size of the collection
503
        """
504
        size = 0
505
        for item in collection.values():
506
            if isinstance(item, arvados.collection.Collection) or isinstance(item, arvados.collection.Subcollection):
507
                size += self._collection_size(item)
508
            else:
509
                size += item.size()
510
        return size
511

    
512
    def _update_task(self):
513
        """
514
        Periodically called support task. File uploading is
515
        asynchronous so we poll status from the collection.
516
        """
517
        while not self._stop_checkpointer.wait(1 if not self._upload_started else self._update_task_time):
518
            self._update()
519

    
520
    def _update(self, final=False):
521
        """
522
        Update cached manifest text and report progress.
523
        """
524
        if self._upload_started:
525
            with self._collection_lock:
526
                self.bytes_written = self._collection_size(self._local_collection)
527
                if self.use_cache:
528
                    if final:
529
                        manifest = self._local_collection.manifest_text()
530
                    else:
531
                        # Get the manifest text without comitting pending blocks
532
                        manifest = self._local_collection.manifest_text(strip=False,
533
                                                                        normalize=False,
534
                                                                        only_committed=True)
535
                    # Update cache
536
                    with self._state_lock:
537
                        self._state['manifest'] = manifest
538
            if self.use_cache:
539
                self._save_state()
540
        else:
541
            self.bytes_written = self.bytes_skipped
542
        # Call the reporter, if any
543
        self.report_progress()
544

    
545
    def report_progress(self):
546
        if self.reporter is not None:
547
            self.reporter(self.bytes_written, self.bytes_expected)
548

    
549
    def _write_stdin(self, filename):
550
        output = self._local_collection.open(filename, 'w')
551
        self._write(sys.stdin, output)
552
        output.close()
553

    
554
    def _check_file(self, source, filename):
555
        """Check if this file needs to be uploaded"""
556
        resume_offset = 0
557
        should_upload = False
558
        new_file_in_cache = False
559
        # Record file path for updating the remote collection before exiting
560
        self._file_paths.add(filename)
561

    
562
        with self._state_lock:
563
            # If no previous cached data on this file, store it for an eventual
564
            # repeated run.
565
            if source not in self._state['files']:
566
                self._state['files'][source] = {
567
                    'mtime': os.path.getmtime(source),
568
                    'size' : os.path.getsize(source)
569
                }
570
                new_file_in_cache = True
571
            cached_file_data = self._state['files'][source]
572

    
573
        # Check if file was already uploaded (at least partially)
574
        file_in_local_collection = self._local_collection.find(filename)
575

    
576
        # If not resuming, upload the full file.
577
        if not self.resume:
578
            should_upload = True
579
        # New file detected from last run, upload it.
580
        elif new_file_in_cache:
581
            should_upload = True
582
        # Local file didn't change from last run.
583
        elif cached_file_data['mtime'] == os.path.getmtime(source) and cached_file_data['size'] == os.path.getsize(source):
584
            if not file_in_local_collection:
585
                # File not uploaded yet, upload it completely
586
                should_upload = True
587
            elif file_in_local_collection.permission_expired():
588
                # Permission token expired, re-upload file. This will change whenever
589
                # we have a API for refreshing tokens.
590
                should_upload = True
591
                self._local_collection.remove(filename)
592
            elif cached_file_data['size'] == file_in_local_collection.size():
593
                # File already there, skip it.
594
                self.bytes_skipped += cached_file_data['size']
595
            elif cached_file_data['size'] > file_in_local_collection.size():
596
                # File partially uploaded, resume!
597
                resume_offset = file_in_local_collection.size()
598
                self.bytes_skipped += resume_offset
599
                should_upload = True
600
            else:
601
                # Inconsistent cache, re-upload the file
602
                should_upload = True
603
                self._local_collection.remove(filename)
604
                self.logger.warning("Uploaded version of file '{}' is bigger than local version, will re-upload it from scratch.".format(source))
605
        # Local file differs from cached data, re-upload it.
606
        else:
607
            if file_in_local_collection:
608
                self._local_collection.remove(filename)
609
            should_upload = True
610

    
611
        if should_upload:
612
            self._files_to_upload.append((source, resume_offset, filename))
613

    
614
    def _upload_files(self):
615
        for source, resume_offset, filename in self._files_to_upload:
616
            with open(source, 'r') as source_fd:
617
                with self._state_lock:
618
                    self._state['files'][source]['mtime'] = os.path.getmtime(source)
619
                    self._state['files'][source]['size'] = os.path.getsize(source)
620
                if resume_offset > 0:
621
                    # Start upload where we left off
622
                    output = self._local_collection.open(filename, 'a')
623
                    source_fd.seek(resume_offset)
624
                else:
625
                    # Start from scratch
626
                    output = self._local_collection.open(filename, 'w')
627
                self._write(source_fd, output)
628
                output.close(flush=False)
629

    
630
    def _write(self, source_fd, output):
631
        while True:
632
            data = source_fd.read(arvados.config.KEEP_BLOCK_SIZE)
633
            if not data:
634
                break
635
            output.write(data)
636

    
637
    def _my_collection(self):
638
        return self._remote_collection if self.update else self._local_collection
639

    
640
    def _setup_state(self, update_collection):
641
        """
642
        Create a new cache file or load a previously existing one.
643
        """
644
        # Load an already existing collection for update
645
        if update_collection and re.match(arvados.util.collection_uuid_pattern,
646
                                          update_collection):
647
            try:
648
                self._remote_collection = arvados.collection.Collection(update_collection)
649
            except arvados.errors.ApiError as error:
650
                raise CollectionUpdateError("Cannot read collection {} ({})".format(update_collection, error))
651
            else:
652
                self.update = True
653
        elif update_collection:
654
            # Collection locator provided, but unknown format
655
            raise CollectionUpdateError("Collection locator unknown: '{}'".format(update_collection))
656

    
657
        if self.use_cache:
658
            # Set up cache file name from input paths.
659
            md5 = hashlib.md5()
660
            md5.update(arvados.config.get('ARVADOS_API_HOST', '!nohost'))
661
            realpaths = sorted(os.path.realpath(path) for path in self.paths)
662
            md5.update('\0'.join(realpaths))
663
            if self.filename:
664
                md5.update(self.filename)
665
            cache_filename = md5.hexdigest()
666
            cache_filepath = os.path.join(
667
                arv_cmd.make_home_conf_dir(self.CACHE_DIR, 0o700, 'raise'),
668
                cache_filename)
669
            if self.resume:
670
                self._cache_file = open(cache_filepath, 'a+')
671
            else:
672
                # --no-resume means start with a empty cache file.
673
                self._cache_file = open(cache_filepath, 'w+')
674
            self._cache_filename = self._cache_file.name
675
            self._lock_file(self._cache_file)
676
            self._cache_file.seek(0)
677

    
678
        with self._state_lock:
679
            if self.use_cache:
680
                try:
681
                    self._state = json.load(self._cache_file)
682
                    if not set(['manifest', 'files']).issubset(set(self._state.keys())):
683
                        # Cache at least partially incomplete, set up new cache
684
                        self._state = copy.deepcopy(self.EMPTY_STATE)
685
                except ValueError:
686
                    # Cache file empty, set up new cache
687
                    self._state = copy.deepcopy(self.EMPTY_STATE)
688
            else:
689
                # No cache file, set empty state
690
                self._state = copy.deepcopy(self.EMPTY_STATE)
691
            # Load the previous manifest so we can check if files were modified remotely.
692
            self._local_collection = arvados.collection.Collection(self._state['manifest'], replication_desired=self.replication_desired, put_threads=self.put_threads)
693

    
694
    def collection_file_paths(self, col, path_prefix='.'):
695
        """Return a list of file paths by recursively go through the entire collection `col`"""
696
        file_paths = []
697
        for name, item in col.items():
698
            if isinstance(item, arvados.arvfile.ArvadosFile):
699
                file_paths.append(os.path.join(path_prefix, name))
700
            elif isinstance(item, arvados.collection.Subcollection):
701
                new_prefix = os.path.join(path_prefix, name)
702
                file_paths += self.collection_file_paths(item, path_prefix=new_prefix)
703
        return file_paths
704

    
705
    def _lock_file(self, fileobj):
706
        try:
707
            fcntl.flock(fileobj, fcntl.LOCK_EX | fcntl.LOCK_NB)
708
        except IOError:
709
            raise ResumeCacheConflict("{} locked".format(fileobj.name))
710

    
711
    def _save_state(self):
712
        """
713
        Atomically save current state into cache.
714
        """
715
        try:
716
            with self._state_lock:
717
                # We're not using copy.deepcopy() here because it's a lot slower
718
                # than json.dumps(), and we're already needing JSON format to be
719
                # saved on disk.
720
                state = json.dumps(self._state)
721
            new_cache_fd, new_cache_name = tempfile.mkstemp(
722
                dir=os.path.dirname(self._cache_filename))
723
            self._lock_file(new_cache_fd)
724
            new_cache = os.fdopen(new_cache_fd, 'r+')
725
            new_cache.write(state)
726
            new_cache.flush()
727
            os.fsync(new_cache)
728
            os.rename(new_cache_name, self._cache_filename)
729
        except (IOError, OSError, ResumeCacheConflict) as error:
730
            self.logger.error("There was a problem while saving the cache file: {}".format(error))
731
            try:
732
                os.unlink(new_cache_name)
733
            except NameError:  # mkstemp failed.
734
                pass
735
        else:
736
            self._cache_file.close()
737
            self._cache_file = new_cache
738

    
739
    def collection_name(self):
740
        return self._my_collection().api_response()['name'] if self._my_collection().api_response() else None
741

    
742
    def manifest_locator(self):
743
        return self._my_collection().manifest_locator()
744

    
745
    def portable_data_hash(self):
746
        pdh = self._my_collection().portable_data_hash()
747
        m = self._my_collection().stripped_manifest()
748
        local_pdh = hashlib.md5(m).hexdigest() + '+' + str(len(m))
749
        if pdh != local_pdh:
750
            logger.warning("\n".join([
751
                "arv-put: API server provided PDH differs from local manifest.",
752
                "         This should not happen; showing API server version."]))
753
        return pdh
754

    
755
    def manifest_text(self, stream_name=".", strip=False, normalize=False):
756
        return self._my_collection().manifest_text(stream_name, strip, normalize)
757

    
758
    def _datablocks_on_item(self, item):
759
        """
760
        Return a list of datablock locators, recursively navigating
761
        through subcollections
762
        """
763
        if isinstance(item, arvados.arvfile.ArvadosFile):
764
            if item.size() == 0:
765
                # Empty file locator
766
                return ["d41d8cd98f00b204e9800998ecf8427e+0"]
767
            else:
768
                locators = []
769
                for segment in item.segments():
770
                    loc = segment.locator
771
                    locators.append(loc)
772
                return locators
773
        elif isinstance(item, arvados.collection.Collection):
774
            l = [self._datablocks_on_item(x) for x in item.values()]
775
            # Fast list flattener method taken from:
776
            # http://stackoverflow.com/questions/952914/making-a-flat-list-out-of-list-of-lists-in-python
777
            return [loc for sublist in l for loc in sublist]
778
        else:
779
            return None
780

    
781
    def data_locators(self):
782
        with self._collection_lock:
783
            # Make sure all datablocks are flushed before getting the locators
784
            self._my_collection().manifest_text()
785
            datablocks = self._datablocks_on_item(self._my_collection())
786
        return datablocks
787

    
788

    
789
def expected_bytes_for(pathlist):
790
    # Walk the given directory trees and stat files, adding up file sizes,
791
    # so we can display progress as percent
792
    bytesum = 0
793
    for path in pathlist:
794
        if os.path.isdir(path):
795
            for filename in arvados.util.listdir_recursive(path):
796
                bytesum += os.path.getsize(os.path.join(path, filename))
797
        elif not os.path.isfile(path):
798
            return None
799
        else:
800
            bytesum += os.path.getsize(path)
801
    return bytesum
802

    
803
_machine_format = "{} {}: {{}} written {{}} total\n".format(sys.argv[0],
804
                                                            os.getpid())
805
def machine_progress(bytes_written, bytes_expected):
806
    return _machine_format.format(
807
        bytes_written, -1 if (bytes_expected is None) else bytes_expected)
808

    
809
def human_progress(bytes_written, bytes_expected):
810
    if bytes_expected:
811
        return "\r{}M / {}M {:.1%} ".format(
812
            bytes_written >> 20, bytes_expected >> 20,
813
            float(bytes_written) / bytes_expected)
814
    else:
815
        return "\r{} ".format(bytes_written)
816

    
817
def progress_writer(progress_func, outfile=sys.stderr):
818
    def write_progress(bytes_written, bytes_expected):
819
        outfile.write(progress_func(bytes_written, bytes_expected))
820
    return write_progress
821

    
822
def exit_signal_handler(sigcode, frame):
823
    sys.exit(-sigcode)
824

    
825
def desired_project_uuid(api_client, project_uuid, num_retries):
826
    if not project_uuid:
827
        query = api_client.users().current()
828
    elif arvados.util.user_uuid_pattern.match(project_uuid):
829
        query = api_client.users().get(uuid=project_uuid)
830
    elif arvados.util.group_uuid_pattern.match(project_uuid):
831
        query = api_client.groups().get(uuid=project_uuid)
832
    else:
833
        raise ValueError("Not a valid project UUID: {}".format(project_uuid))
834
    return query.execute(num_retries=num_retries)['uuid']
835

    
836
def main(arguments=None, stdout=sys.stdout, stderr=sys.stderr):
837
    global api_client
838

    
839
    logger = logging.getLogger('arvados.arv_put')
840
    logger.setLevel(logging.INFO)
841
    args = parse_arguments(arguments)
842
    status = 0
843
    if api_client is None:
844
        api_client = arvados.api('v1')
845

    
846
    # Determine the name to use
847
    if args.name:
848
        if args.stream or args.raw:
849
            logger.error("Cannot use --name with --stream or --raw")
850
            sys.exit(1)
851
        elif args.update_collection:
852
            logger.error("Cannot use --name with --update-collection")
853
            sys.exit(1)
854
        collection_name = args.name
855
    else:
856
        collection_name = "Saved at {} by {}@{}".format(
857
            datetime.datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC"),
858
            pwd.getpwuid(os.getuid()).pw_name,
859
            socket.gethostname())
860

    
861
    if args.project_uuid and (args.stream or args.raw):
862
        logger.error("Cannot use --project-uuid with --stream or --raw")
863
        sys.exit(1)
864

    
865
    # Determine the parent project
866
    try:
867
        project_uuid = desired_project_uuid(api_client, args.project_uuid,
868
                                            args.retries)
869
    except (apiclient_errors.Error, ValueError) as error:
870
        logger.error(error)
871
        sys.exit(1)
872

    
873
    if args.progress:
874
        reporter = progress_writer(human_progress)
875
    elif args.batch_progress:
876
        reporter = progress_writer(machine_progress)
877
    else:
878
        reporter = None
879

    
880
    # If this is used by a human, and there's at least one directory to be
881
    # uploaded, the expected bytes calculation can take a moment.
882
    if args.progress and any([os.path.isdir(f) for f in args.paths]):
883
        logger.info("Calculating upload size, this could take some time...")
884
    bytes_expected = expected_bytes_for(args.paths)
885

    
886
    try:
887
        writer = ArvPutUploadJob(paths = args.paths,
888
                                 resume = args.resume,
889
                                 use_cache = args.use_cache,
890
                                 filename = args.filename,
891
                                 reporter = reporter,
892
                                 bytes_expected = bytes_expected,
893
                                 num_retries = args.retries,
894
                                 replication_desired = args.replication,
895
                                 put_threads = args.threads,
896
                                 name = collection_name,
897
                                 owner_uuid = project_uuid,
898
                                 ensure_unique_name = True,
899
                                 update_collection = args.update_collection,
900
                                 logger=logger,
901
                                 dry_run=args.dry_run)
902
    except ResumeCacheConflict:
903
        logger.error("\n".join([
904
            "arv-put: Another process is already uploading this data.",
905
            "         Use --no-cache if this is really what you want."]))
906
        sys.exit(1)
907
    except CollectionUpdateError as error:
908
        logger.error("\n".join([
909
            "arv-put: %s" % str(error)]))
910
        sys.exit(1)
911
    except ArvPutUploadIsPending:
912
        # Dry run check successful, return proper exit code.
913
        sys.exit(2)
914
    except ArvPutUploadNotPending:
915
        # No files pending for upload
916
        sys.exit(0)
917

    
918
    # Install our signal handler for each code in CAUGHT_SIGNALS, and save
919
    # the originals.
920
    orig_signal_handlers = {sigcode: signal.signal(sigcode, exit_signal_handler)
921
                            for sigcode in CAUGHT_SIGNALS}
922

    
923
    if not args.dry_run and not args.update_collection and args.resume and writer.bytes_written > 0:
924
        logger.warning("\n".join([
925
            "arv-put: Resuming previous upload from last checkpoint.",
926
            "         Use the --no-resume option to start over."]))
927

    
928
    if not args.dry_run:
929
        writer.report_progress()
930
    output = None
931
    try:
932
        writer.start(save_collection=not(args.stream or args.raw))
933
    except arvados.errors.ApiError as error:
934
        logger.error("\n".join([
935
            "arv-put: %s" % str(error)]))
936
        sys.exit(1)
937
    except ArvPutUploadIsPending:
938
        # Dry run check successful, return proper exit code.
939
        sys.exit(2)
940
    except ArvPutUploadNotPending:
941
        # No files pending for upload
942
        sys.exit(0)
943

    
944
    if args.progress:  # Print newline to split stderr from stdout for humans.
945
        logger.info("\n")
946

    
947
    if args.stream:
948
        if args.normalize:
949
            output = writer.manifest_text(normalize=True)
950
        else:
951
            output = writer.manifest_text()
952
    elif args.raw:
953
        output = ','.join(writer.data_locators())
954
    else:
955
        try:
956
            if args.update_collection:
957
                logger.info("Collection updated: '{}'".format(writer.collection_name()))
958
            else:
959
                logger.info("Collection saved as '{}'".format(writer.collection_name()))
960
            if args.portable_data_hash:
961
                output = writer.portable_data_hash()
962
            else:
963
                output = writer.manifest_locator()
964
        except apiclient_errors.Error as error:
965
            logger.error(
966
                "arv-put: Error creating Collection on project: {}.".format(
967
                    error))
968
            status = 1
969

    
970
    # Print the locator (uuid) of the new collection.
971
    if output is None:
972
        status = status or 1
973
    else:
974
        stdout.write(output)
975
        if not output.endswith('\n'):
976
            stdout.write('\n')
977

    
978
    for sigcode, orig_handler in orig_signal_handlers.items():
979
        signal.signal(sigcode, orig_handler)
980

    
981
    if status != 0:
982
        sys.exit(status)
983

    
984
    # Success!
985
    return output
986

    
987

    
988
if __name__ == '__main__':
989
    main()