Statistics
| Branch: | Tag: | Revision:

arvados / sdk / python / arvados / arvfile.py @ 76d9365a

History | View | Annotate | Download (42.2 KB)

1
from __future__ import absolute_import
2
from __future__ import division
3
from future import standard_library
4
standard_library.install_aliases()
5
from builtins import range
6
from builtins import object
7
import functools
8
import os
9
import zlib
10
import bz2
11
from . import config
12
import hashlib
13
import threading
14
import queue
15
import copy
16
import errno
17
import re
18
import logging
19
import collections
20
import uuid
21

    
22
from .errors import KeepWriteError, AssertionError, ArgumentError
23
from .keep import KeepLocator
24
from ._normalize_stream import normalize_stream
25
from ._ranges import locators_and_ranges, replace_range, Range
26
from .retry import retry_method
27

    
28
MOD = "mod"
29
WRITE = "write"
30

    
31
_logger = logging.getLogger('arvados.arvfile')
32

    
33
def split(path):
34
    """split(path) -> streamname, filename
35

36
    Separate the stream name and file name in a /-separated stream path and
37
    return a tuple (stream_name, file_name).  If no stream name is available,
38
    assume '.'.
39

40
    """
41
    try:
42
        stream_name, file_name = path.rsplit('/', 1)
43
    except ValueError:  # No / in string
44
        stream_name, file_name = '.', path
45
    return stream_name, file_name
46

    
47

    
48
class UnownedBlockError(Exception):
49
    """Raised when there's an writable block without an owner on the BlockManager."""
50
    pass
51

    
52

    
53
class _FileLikeObjectBase(object):
54
    def __init__(self, name, mode):
55
        self.name = name
56
        self.mode = mode
57
        self.closed = False
58

    
59
    @staticmethod
60
    def _before_close(orig_func):
61
        @functools.wraps(orig_func)
62
        def before_close_wrapper(self, *args, **kwargs):
63
            if self.closed:
64
                raise ValueError("I/O operation on closed stream file")
65
            return orig_func(self, *args, **kwargs)
66
        return before_close_wrapper
67

    
68
    def __enter__(self):
69
        return self
70

    
71
    def __exit__(self, exc_type, exc_value, traceback):
72
        try:
73
            self.close()
74
        except Exception:
75
            if exc_type is None:
76
                raise
77

    
78
    def close(self):
79
        self.closed = True
80

    
81

    
82
class ArvadosFileReaderBase(_FileLikeObjectBase):
83
    def __init__(self, name, mode, num_retries=None):
84
        super(ArvadosFileReaderBase, self).__init__(name, mode)
85
        self._filepos = 0
86
        self.num_retries = num_retries
87
        self._readline_cache = (None, None)
88

    
89
    def __iter__(self):
90
        while True:
91
            data = self.readline()
92
            if not data:
93
                break
94
            yield data
95

    
96
    def decompressed_name(self):
97
        return re.sub('\.(bz2|gz)$', '', self.name)
98

    
99
    @_FileLikeObjectBase._before_close
100
    def seek(self, pos, whence=os.SEEK_SET):
101
        if whence == os.SEEK_CUR:
102
            pos += self._filepos
103
        elif whence == os.SEEK_END:
104
            pos += self.size()
105
        self._filepos = min(max(pos, 0), self.size())
106

    
107
    def tell(self):
108
        return self._filepos
109

    
110
    @_FileLikeObjectBase._before_close
111
    @retry_method
112
    def readall(self, size=2**20, num_retries=None):
113
        while True:
114
            data = self.read(size, num_retries=num_retries)
115
            if len(data) == 0:
116
                break
117
            yield data
118

    
119
    @_FileLikeObjectBase._before_close
120
    @retry_method
121
    def readline(self, size=float('inf'), num_retries=None):
122
        cache_pos, cache_data = self._readline_cache
123
        if self.tell() == cache_pos:
124
            data = [cache_data]
125
            self._filepos += len(cache_data)
126
        else:
127
            data = [b'']
128
        data_size = len(data[-1])
129
        while (data_size < size) and (b'\n' not in data[-1]):
130
            next_read = self.read(2 ** 20, num_retries=num_retries)
131
            if not next_read:
132
                break
133
            data.append(next_read)
134
            data_size += len(next_read)
135
        data = b''.join(data)
136
        try:
137
            nextline_index = data.index(b'\n') + 1
138
        except ValueError:
139
            nextline_index = len(data)
140
        nextline_index = min(nextline_index, size)
141
        self._filepos -= len(data) - nextline_index
142
        self._readline_cache = (self.tell(), data[nextline_index:])
143
        return data[:nextline_index].decode()
144

    
145
    @_FileLikeObjectBase._before_close
146
    @retry_method
147
    def decompress(self, decompress, size, num_retries=None):
148
        for segment in self.readall(size, num_retries=num_retries):
149
            data = decompress(segment)
150
            if data:
151
                yield data
152

    
153
    @_FileLikeObjectBase._before_close
154
    @retry_method
155
    def readall_decompressed(self, size=2**20, num_retries=None):
156
        self.seek(0)
157
        if self.name.endswith('.bz2'):
158
            dc = bz2.BZ2Decompressor()
159
            return self.decompress(dc.decompress, size,
160
                                   num_retries=num_retries)
161
        elif self.name.endswith('.gz'):
162
            dc = zlib.decompressobj(16+zlib.MAX_WBITS)
163
            return self.decompress(lambda segment: dc.decompress(dc.unconsumed_tail + segment),
164
                                   size, num_retries=num_retries)
165
        else:
166
            return self.readall(size, num_retries=num_retries)
167

    
168
    @_FileLikeObjectBase._before_close
169
    @retry_method
170
    def readlines(self, sizehint=float('inf'), num_retries=None):
171
        data = []
172
        data_size = 0
173
        for s in self.readall(num_retries=num_retries):
174
            data.append(s)
175
            data_size += len(s)
176
            if data_size >= sizehint:
177
                break
178
        return b''.join(data).decode().splitlines(True)
179

    
180
    def size(self):
181
        raise NotImplementedError()
182

    
183
    def read(self, size, num_retries=None):
184
        raise NotImplementedError()
185

    
186
    def readfrom(self, start, size, num_retries=None):
187
        raise NotImplementedError()
188

    
189

    
190
class StreamFileReader(ArvadosFileReaderBase):
191
    class _NameAttribute(str):
192
        # The Python file API provides a plain .name attribute.
193
        # Older SDK provided a name() method.
194
        # This class provides both, for maximum compatibility.
195
        def __call__(self):
196
            return self
197

    
198
    def __init__(self, stream, segments, name):
199
        super(StreamFileReader, self).__init__(self._NameAttribute(name), 'rb', num_retries=stream.num_retries)
200
        self._stream = stream
201
        self.segments = segments
202

    
203
    def stream_name(self):
204
        return self._stream.name()
205

    
206
    def size(self):
207
        n = self.segments[-1]
208
        return n.range_start + n.range_size
209

    
210
    @_FileLikeObjectBase._before_close
211
    @retry_method
212
    def read(self, size, num_retries=None):
213
        """Read up to 'size' bytes from the stream, starting at the current file position"""
214
        if size == 0:
215
            return b''
216

    
217
        data = b''
218
        available_chunks = locators_and_ranges(self.segments, self._filepos, size)
219
        if available_chunks:
220
            lr = available_chunks[0]
221
            data = self._stream.readfrom(lr.locator+lr.segment_offset,
222
                                         lr.segment_size,
223
                                         num_retries=num_retries)
224

    
225
        self._filepos += len(data)
226
        return data
227

    
228
    @_FileLikeObjectBase._before_close
229
    @retry_method
230
    def readfrom(self, start, size, num_retries=None):
231
        """Read up to 'size' bytes from the stream, starting at 'start'"""
232
        if size == 0:
233
            return b''
234

    
235
        data = []
236
        for lr in locators_and_ranges(self.segments, start, size):
237
            data.append(self._stream.readfrom(lr.locator+lr.segment_offset, lr.segment_size,
238
                                              num_retries=num_retries))
239
        return b''.join(data)
240

    
241
    def as_manifest(self):
242
        segs = []
243
        for r in self.segments:
244
            segs.extend(self._stream.locators_and_ranges(r.locator, r.range_size))
245
        return " ".join(normalize_stream(".", {self.name: segs})) + "\n"
246

    
247

    
248
def synchronized(orig_func):
249
    @functools.wraps(orig_func)
250
    def synchronized_wrapper(self, *args, **kwargs):
251
        with self.lock:
252
            return orig_func(self, *args, **kwargs)
253
    return synchronized_wrapper
254

    
255

    
256
class StateChangeError(Exception):
257
    def __init__(self, message, state, nextstate):
258
        super(StateChangeError, self).__init__(message)
259
        self.state = state
260
        self.nextstate = nextstate
261

    
262
class _BufferBlock(object):
263
    """A stand-in for a Keep block that is in the process of being written.
264

265
    Writers can append to it, get the size, and compute the Keep locator.
266
    There are three valid states:
267

268
    WRITABLE
269
      Can append to block.
270

271
    PENDING
272
      Block is in the process of being uploaded to Keep, append is an error.
273

274
    COMMITTED
275
      The block has been written to Keep, its internal buffer has been
276
      released, fetching the block will fetch it via keep client (since we
277
      discarded the internal copy), and identifiers referring to the BufferBlock
278
      can be replaced with the block locator.
279

280
    """
281

    
282
    WRITABLE = 0
283
    PENDING = 1
284
    COMMITTED = 2
285
    ERROR = 3
286

    
287
    def __init__(self, blockid, starting_capacity, owner):
288
        """
289
        :blockid:
290
          the identifier for this block
291

292
        :starting_capacity:
293
          the initial buffer capacity
294

295
        :owner:
296
          ArvadosFile that owns this block
297

298
        """
299
        self.blockid = blockid
300
        self.buffer_block = bytearray(starting_capacity)
301
        self.buffer_view = memoryview(self.buffer_block)
302
        self.write_pointer = 0
303
        self._state = _BufferBlock.WRITABLE
304
        self._locator = None
305
        self.owner = owner
306
        self.lock = threading.Lock()
307
        self.wait_for_commit = threading.Event()
308
        self.error = None
309

    
310
    @synchronized
311
    def append(self, data):
312
        """Append some data to the buffer.
313

314
        Only valid if the block is in WRITABLE state.  Implements an expanding
315
        buffer, doubling capacity as needed to accomdate all the data.
316

317
        """
318
        if self._state == _BufferBlock.WRITABLE:
319
            if not isinstance(data, bytes):
320
                data = data.encode()
321
            while (self.write_pointer+len(data)) > len(self.buffer_block):
322
                new_buffer_block = bytearray(len(self.buffer_block) * 2)
323
                new_buffer_block[0:self.write_pointer] = self.buffer_block[0:self.write_pointer]
324
                self.buffer_block = new_buffer_block
325
                self.buffer_view = memoryview(self.buffer_block)
326
            self.buffer_view[self.write_pointer:self.write_pointer+len(data)] = data
327
            self.write_pointer += len(data)
328
            self._locator = None
329
        else:
330
            raise AssertionError("Buffer block is not writable")
331

    
332
    STATE_TRANSITIONS = frozenset([
333
            (WRITABLE, PENDING),
334
            (PENDING, COMMITTED),
335
            (PENDING, ERROR),
336
            (ERROR, PENDING)])
337

    
338
    @synchronized
339
    def set_state(self, nextstate, val=None):
340
        if (self._state, nextstate) not in self.STATE_TRANSITIONS:
341
            raise StateChangeError("Invalid state change from %s to %s" % (self._state, nextstate), self._state, nextstate)
342
        self._state = nextstate
343

    
344
        if self._state == _BufferBlock.PENDING:
345
            self.wait_for_commit.clear()
346

    
347
        if self._state == _BufferBlock.COMMITTED:
348
            self._locator = val
349
            self.buffer_view = None
350
            self.buffer_block = None
351
            self.wait_for_commit.set()
352

    
353
        if self._state == _BufferBlock.ERROR:
354
            self.error = val
355
            self.wait_for_commit.set()
356

    
357
    @synchronized
358
    def state(self):
359
        return self._state
360

    
361
    def size(self):
362
        """The amount of data written to the buffer."""
363
        return self.write_pointer
364

    
365
    @synchronized
366
    def locator(self):
367
        """The Keep locator for this buffer's contents."""
368
        if self._locator is None:
369
            self._locator = "%s+%i" % (hashlib.md5(self.buffer_view[0:self.write_pointer]).hexdigest(), self.size())
370
        return self._locator
371

    
372
    @synchronized
373
    def clone(self, new_blockid, owner):
374
        if self._state == _BufferBlock.COMMITTED:
375
            raise AssertionError("Cannot duplicate committed buffer block")
376
        bufferblock = _BufferBlock(new_blockid, self.size(), owner)
377
        bufferblock.append(self.buffer_view[0:self.size()])
378
        return bufferblock
379

    
380
    @synchronized
381
    def clear(self):
382
        self.owner = None
383
        self.buffer_block = None
384
        self.buffer_view = None
385

    
386

    
387
class NoopLock(object):
388
    def __enter__(self):
389
        return self
390

    
391
    def __exit__(self, exc_type, exc_value, traceback):
392
        pass
393

    
394
    def acquire(self, blocking=False):
395
        pass
396

    
397
    def release(self):
398
        pass
399

    
400

    
401
def must_be_writable(orig_func):
402
    @functools.wraps(orig_func)
403
    def must_be_writable_wrapper(self, *args, **kwargs):
404
        if not self.writable():
405
            raise IOError(errno.EROFS, "Collection is read-only.")
406
        return orig_func(self, *args, **kwargs)
407
    return must_be_writable_wrapper
408

    
409

    
410
class _BlockManager(object):
411
    """BlockManager handles buffer blocks.
412

413
    Also handles background block uploads, and background block prefetch for a
414
    Collection of ArvadosFiles.
415

416
    """
417

    
418
    DEFAULT_PUT_THREADS = 2
419
    DEFAULT_GET_THREADS = 2
420

    
421
    def __init__(self, keep, copies=None, put_threads=None):
422
        """keep: KeepClient object to use"""
423
        self._keep = keep
424
        self._bufferblocks = collections.OrderedDict()
425
        self._put_queue = None
426
        self._put_threads = None
427
        self._prefetch_queue = None
428
        self._prefetch_threads = None
429
        self.lock = threading.Lock()
430
        self.prefetch_enabled = True
431
        if put_threads:
432
            self.num_put_threads = put_threads
433
        else:
434
            self.num_put_threads = _BlockManager.DEFAULT_PUT_THREADS
435
        self.num_get_threads = _BlockManager.DEFAULT_GET_THREADS
436
        self.copies = copies
437
        self._pending_write_size = 0
438
        self.threads_lock = threading.Lock()
439

    
440
    @synchronized
441
    def alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
442
        """Allocate a new, empty bufferblock in WRITABLE state and return it.
443

444
        :blockid:
445
          optional block identifier, otherwise one will be automatically assigned
446

447
        :starting_capacity:
448
          optional capacity, otherwise will use default capacity
449

450
        :owner:
451
          ArvadosFile that owns this block
452

453
        """
454
        return self._alloc_bufferblock(blockid, starting_capacity, owner)
455

    
456
    def _alloc_bufferblock(self, blockid=None, starting_capacity=2**14, owner=None):
457
        if blockid is None:
458
            blockid = "%s" % uuid.uuid4()
459
        bufferblock = _BufferBlock(blockid, starting_capacity=starting_capacity, owner=owner)
460
        self._bufferblocks[bufferblock.blockid] = bufferblock
461
        return bufferblock
462

    
463
    @synchronized
464
    def dup_block(self, block, owner):
465
        """Create a new bufferblock initialized with the content of an existing bufferblock.
466

467
        :block:
468
          the buffer block to copy.
469

470
        :owner:
471
          ArvadosFile that owns the new block
472

473
        """
474
        new_blockid = "bufferblock%i" % len(self._bufferblocks)
475
        bufferblock = block.clone(new_blockid, owner)
476
        self._bufferblocks[bufferblock.blockid] = bufferblock
477
        return bufferblock
478

    
479
    @synchronized
480
    def is_bufferblock(self, locator):
481
        return locator in self._bufferblocks
482

    
483
    def _commit_bufferblock_worker(self):
484
        """Background uploader thread."""
485

    
486
        while True:
487
            try:
488
                bufferblock = self._put_queue.get()
489
                if bufferblock is None:
490
                    return
491

    
492
                if self.copies is None:
493
                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes())
494
                else:
495
                    loc = self._keep.put(bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes(), copies=self.copies)
496
                bufferblock.set_state(_BufferBlock.COMMITTED, loc)
497

    
498
            except Exception as e:
499
                bufferblock.set_state(_BufferBlock.ERROR, e)
500
            finally:
501
                if self._put_queue is not None:
502
                    self._put_queue.task_done()
503

    
504
    def start_put_threads(self):
505
        with self.threads_lock:
506
            if self._put_threads is None:
507
                # Start uploader threads.
508

    
509
                # If we don't limit the Queue size, the upload queue can quickly
510
                # grow to take up gigabytes of RAM if the writing process is
511
                # generating data more quickly than it can be send to the Keep
512
                # servers.
513
                #
514
                # With two upload threads and a queue size of 2, this means up to 4
515
                # blocks pending.  If they are full 64 MiB blocks, that means up to
516
                # 256 MiB of internal buffering, which is the same size as the
517
                # default download block cache in KeepClient.
518
                self._put_queue = queue.Queue(maxsize=2)
519

    
520
                self._put_threads = []
521
                for i in range(0, self.num_put_threads):
522
                    thread = threading.Thread(target=self._commit_bufferblock_worker)
523
                    self._put_threads.append(thread)
524
                    thread.daemon = True
525
                    thread.start()
526

    
527
    def _block_prefetch_worker(self):
528
        """The background downloader thread."""
529
        while True:
530
            try:
531
                b = self._prefetch_queue.get()
532
                if b is None:
533
                    return
534
                self._keep.get(b)
535
            except Exception:
536
                _logger.exception("Exception doing block prefetch")
537

    
538
    @synchronized
539
    def start_get_threads(self):
540
        if self._prefetch_threads is None:
541
            self._prefetch_queue = queue.Queue()
542
            self._prefetch_threads = []
543
            for i in range(0, self.num_get_threads):
544
                thread = threading.Thread(target=self._block_prefetch_worker)
545
                self._prefetch_threads.append(thread)
546
                thread.daemon = True
547
                thread.start()
548

    
549

    
550
    @synchronized
551
    def stop_threads(self):
552
        """Shut down and wait for background upload and download threads to finish."""
553

    
554
        if self._put_threads is not None:
555
            for t in self._put_threads:
556
                self._put_queue.put(None)
557
            for t in self._put_threads:
558
                t.join()
559
        self._put_threads = None
560
        self._put_queue = None
561

    
562
        if self._prefetch_threads is not None:
563
            for t in self._prefetch_threads:
564
                self._prefetch_queue.put(None)
565
            for t in self._prefetch_threads:
566
                t.join()
567
        self._prefetch_threads = None
568
        self._prefetch_queue = None
569

    
570
    def __enter__(self):
571
        return self
572

    
573
    def __exit__(self, exc_type, exc_value, traceback):
574
        self.stop_threads()
575

    
576
    @synchronized
577
    def repack_small_blocks(self, force=False, sync=False, closed_file_size=0):
578
        """Packs small blocks together before uploading"""
579
        self._pending_write_size += closed_file_size
580

    
581
        # Check if there are enough small blocks for filling up one in full
582
        if force or (self._pending_write_size >= config.KEEP_BLOCK_SIZE):
583

    
584
            # Search blocks ready for getting packed together before being committed to Keep.
585
            # A WRITABLE block always has an owner.
586
            # A WRITABLE block with its owner.closed() implies that it's
587
            # size is <= KEEP_BLOCK_SIZE/2.
588
            try:
589
                small_blocks = [b for b in list(self._bufferblocks.values()) if b.state() == _BufferBlock.WRITABLE and b.owner.closed()]
590
            except AttributeError:
591
                # Writable blocks without owner shouldn't exist.
592
                raise UnownedBlockError()
593

    
594
            if len(small_blocks) <= 1:
595
                # Not enough small blocks for repacking
596
                return
597

    
598
            # Update the pending write size count with its true value, just in case
599
            # some small file was opened, written and closed several times.
600
            self._pending_write_size = sum([b.size() for b in small_blocks])
601
            if self._pending_write_size < config.KEEP_BLOCK_SIZE and not force:
602
                return
603

    
604
            new_bb = self._alloc_bufferblock()
605
            while len(small_blocks) > 0 and (new_bb.write_pointer + small_blocks[0].size()) <= config.KEEP_BLOCK_SIZE:
606
                bb = small_blocks.pop(0)
607
                arvfile = bb.owner
608
                self._pending_write_size -= bb.size()
609
                new_bb.append(bb.buffer_view[0:bb.write_pointer].tobytes())
610
                arvfile.set_segments([Range(new_bb.blockid,
611
                                            0,
612
                                            bb.size(),
613
                                            new_bb.write_pointer - bb.size())])
614
                self._delete_bufferblock(bb.blockid)
615
            self.commit_bufferblock(new_bb, sync=sync)
616

    
617
    def commit_bufferblock(self, block, sync):
618
        """Initiate a background upload of a bufferblock.
619

620
        :block:
621
          The block object to upload
622

623
        :sync:
624
          If `sync` is True, upload the block synchronously.
625
          If `sync` is False, upload the block asynchronously.  This will
626
          return immediately unless the upload queue is at capacity, in
627
          which case it will wait on an upload queue slot.
628

629
        """
630
        try:
631
            # Mark the block as PENDING so to disallow any more appends.
632
            block.set_state(_BufferBlock.PENDING)
633
        except StateChangeError as e:
634
            if e.state == _BufferBlock.PENDING:
635
                if sync:
636
                    block.wait_for_commit.wait()
637
                else:
638
                    return
639
            if block.state() == _BufferBlock.COMMITTED:
640
                return
641
            elif block.state() == _BufferBlock.ERROR:
642
                raise block.error
643
            else:
644
                raise
645

    
646
        if sync:
647
            try:
648
                if self.copies is None:
649
                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes())
650
                else:
651
                    loc = self._keep.put(block.buffer_view[0:block.write_pointer].tobytes(), copies=self.copies)
652
                block.set_state(_BufferBlock.COMMITTED, loc)
653
            except Exception as e:
654
                block.set_state(_BufferBlock.ERROR, e)
655
                raise
656
        else:
657
            self.start_put_threads()
658
            self._put_queue.put(block)
659

    
660
    @synchronized
661
    def get_bufferblock(self, locator):
662
        return self._bufferblocks.get(locator)
663

    
664
    @synchronized
665
    def delete_bufferblock(self, locator):
666
        self._delete_bufferblock(locator)
667

    
668
    def _delete_bufferblock(self, locator):
669
        bb = self._bufferblocks[locator]
670
        bb.clear()
671
        del self._bufferblocks[locator]
672

    
673
    def get_block_contents(self, locator, num_retries, cache_only=False):
674
        """Fetch a block.
675

676
        First checks to see if the locator is a BufferBlock and return that, if
677
        not, passes the request through to KeepClient.get().
678

679
        """
680
        with self.lock:
681
            if locator in self._bufferblocks:
682
                bufferblock = self._bufferblocks[locator]
683
                if bufferblock.state() != _BufferBlock.COMMITTED:
684
                    return bufferblock.buffer_view[0:bufferblock.write_pointer].tobytes()
685
                else:
686
                    locator = bufferblock._locator
687
        if cache_only:
688
            return self._keep.get_from_cache(locator)
689
        else:
690
            return self._keep.get(locator, num_retries=num_retries)
691

    
692
    def commit_all(self):
693
        """Commit all outstanding buffer blocks.
694

695
        This is a synchronous call, and will not return until all buffer blocks
696
        are uploaded.  Raises KeepWriteError() if any blocks failed to upload.
697

698
        """
699
        self.repack_small_blocks(force=True, sync=True)
700

    
701
        with self.lock:
702
            items = list(self._bufferblocks.items())
703

    
704
        for k,v in items:
705
            if v.state() != _BufferBlock.COMMITTED and v.owner:
706
                v.owner.flush(sync=False)
707

    
708
        with self.lock:
709
            if self._put_queue is not None:
710
                self._put_queue.join()
711

    
712
                err = []
713
                for k,v in items:
714
                    if v.state() == _BufferBlock.ERROR:
715
                        err.append((v.locator(), v.error))
716
                if err:
717
                    raise KeepWriteError("Error writing some blocks", err, label="block")
718

    
719
        for k,v in items:
720
            # flush again with sync=True to remove committed bufferblocks from
721
            # the segments.
722
            if v.owner:
723
                v.owner.flush(sync=True)
724

    
725
    def block_prefetch(self, locator):
726
        """Initiate a background download of a block.
727

728
        This assumes that the underlying KeepClient implements a block cache,
729
        so repeated requests for the same block will not result in repeated
730
        downloads (unless the block is evicted from the cache.)  This method
731
        does not block.
732

733
        """
734

    
735
        if not self.prefetch_enabled:
736
            return
737

    
738
        if self._keep.get_from_cache(locator) is not None:
739
            return
740

    
741
        with self.lock:
742
            if locator in self._bufferblocks:
743
                return
744

    
745
        self.start_get_threads()
746
        self._prefetch_queue.put(locator)
747

    
748

    
749
class ArvadosFile(object):
750
    """Represent a file in a Collection.
751

752
    ArvadosFile manages the underlying representation of a file in Keep as a
753
    sequence of segments spanning a set of blocks, and implements random
754
    read/write access.
755

756
    This object may be accessed from multiple threads.
757

758
    """
759

    
760
    def __init__(self, parent, name, stream=[], segments=[]):
761
        """
762
        ArvadosFile constructor.
763

764
        :stream:
765
          a list of Range objects representing a block stream
766

767
        :segments:
768
          a list of Range objects representing segments
769
        """
770
        self.parent = parent
771
        self.name = name
772
        self._writers = set()
773
        self._committed = False
774
        self._segments = []
775
        self.lock = parent.root_collection().lock
776
        for s in segments:
777
            self._add_segment(stream, s.locator, s.range_size)
778
        self._current_bblock = None
779

    
780
    def writable(self):
781
        return self.parent.writable()
782

    
783
    @synchronized
784
    def permission_expired(self, as_of_dt=None):
785
        """Returns True if any of the segment's locators is expired"""
786
        for r in self._segments:
787
            if KeepLocator(r.locator).permission_expired(as_of_dt):
788
                return True
789
        return False
790

    
791
    @synchronized
792
    def segments(self):
793
        return copy.copy(self._segments)
794

    
795
    @synchronized
796
    def clone(self, new_parent, new_name):
797
        """Make a copy of this file."""
798
        cp = ArvadosFile(new_parent, new_name)
799
        cp.replace_contents(self)
800
        return cp
801

    
802
    @must_be_writable
803
    @synchronized
804
    def replace_contents(self, other):
805
        """Replace segments of this file with segments from another `ArvadosFile` object."""
806

    
807
        map_loc = {}
808
        self._segments = []
809
        for other_segment in other.segments():
810
            new_loc = other_segment.locator
811
            if other.parent._my_block_manager().is_bufferblock(other_segment.locator):
812
                if other_segment.locator not in map_loc:
813
                    bufferblock = other.parent._my_block_manager().get_bufferblock(other_segment.locator)
814
                    if bufferblock.state() != _BufferBlock.WRITABLE:
815
                        map_loc[other_segment.locator] = bufferblock.locator()
816
                    else:
817
                        map_loc[other_segment.locator] = self.parent._my_block_manager().dup_block(bufferblock, self).blockid
818
                new_loc = map_loc[other_segment.locator]
819

    
820
            self._segments.append(Range(new_loc, other_segment.range_start, other_segment.range_size, other_segment.segment_offset))
821

    
822
        self.set_committed(False)
823

    
824
    def __eq__(self, other):
825
        if other is self:
826
            return True
827
        if not isinstance(other, ArvadosFile):
828
            return False
829

    
830
        othersegs = other.segments()
831
        with self.lock:
832
            if len(self._segments) != len(othersegs):
833
                return False
834
            for i in range(0, len(othersegs)):
835
                seg1 = self._segments[i]
836
                seg2 = othersegs[i]
837
                loc1 = seg1.locator
838
                loc2 = seg2.locator
839

    
840
                if self.parent._my_block_manager().is_bufferblock(loc1):
841
                    loc1 = self.parent._my_block_manager().get_bufferblock(loc1).locator()
842

    
843
                if other.parent._my_block_manager().is_bufferblock(loc2):
844
                    loc2 = other.parent._my_block_manager().get_bufferblock(loc2).locator()
845

    
846
                if (KeepLocator(loc1).stripped() != KeepLocator(loc2).stripped() or
847
                    seg1.range_start != seg2.range_start or
848
                    seg1.range_size != seg2.range_size or
849
                    seg1.segment_offset != seg2.segment_offset):
850
                    return False
851

    
852
        return True
853

    
854
    def __ne__(self, other):
855
        return not self.__eq__(other)
856

    
857
    @synchronized
858
    def set_segments(self, segs):
859
        self._segments = segs
860

    
861
    @synchronized
862
    def set_committed(self, value=True):
863
        """Set committed flag.
864

865
        If value is True, set committed to be True.
866

867
        If value is False, set committed to be False for this and all parents.
868
        """
869
        if value == self._committed:
870
            return
871
        self._committed = value
872
        if self._committed is False and self.parent is not None:
873
            self.parent.set_committed(False)
874

    
875
    @synchronized
876
    def committed(self):
877
        """Get whether this is committed or not."""
878
        return self._committed
879

    
880
    @synchronized
881
    def add_writer(self, writer):
882
        """Add an ArvadosFileWriter reference to the list of writers"""
883
        if isinstance(writer, ArvadosFileWriter):
884
            self._writers.add(writer)
885

    
886
    @synchronized
887
    def remove_writer(self, writer, flush):
888
        """
889
        Called from ArvadosFileWriter.close(). Remove a writer reference from the list
890
        and do some block maintenance tasks.
891
        """
892
        self._writers.remove(writer)
893

    
894
        if flush or self.size() > config.KEEP_BLOCK_SIZE // 2:
895
            # File writer closed, not small enough for repacking
896
            self.flush()
897
        elif self.closed():
898
            # All writers closed and size is adequate for repacking
899
            self.parent._my_block_manager().repack_small_blocks(closed_file_size=self.size())
900

    
901
    def closed(self):
902
        """
903
        Get whether this is closed or not. When the writers list is empty, the file
904
        is supposed to be closed.
905
        """
906
        return len(self._writers) == 0
907

    
908
    @must_be_writable
909
    @synchronized
910
    def truncate(self, size):
911
        """Shrink the size of the file.
912

913
        If `size` is less than the size of the file, the file contents after
914
        `size` will be discarded.  If `size` is greater than the current size
915
        of the file, an IOError will be raised.
916

917
        """
918
        if size < self.size():
919
            new_segs = []
920
            for r in self._segments:
921
                range_end = r.range_start+r.range_size
922
                if r.range_start >= size:
923
                    # segment is past the trucate size, all done
924
                    break
925
                elif size < range_end:
926
                    nr = Range(r.locator, r.range_start, size - r.range_start, 0)
927
                    nr.segment_offset = r.segment_offset
928
                    new_segs.append(nr)
929
                    break
930
                else:
931
                    new_segs.append(r)
932

    
933
            self._segments = new_segs
934
            self.set_committed(False)
935
        elif size > self.size():
936
            raise IOError(errno.EINVAL, "truncate() does not support extending the file size")
937

    
938
    def readfrom(self, offset, size, num_retries, exact=False):
939
        """Read up to `size` bytes from the file starting at `offset`.
940

941
        :exact:
942
         If False (default), return less data than requested if the read
943
         crosses a block boundary and the next block isn't cached.  If True,
944
         only return less data than requested when hitting EOF.
945
        """
946

    
947
        with self.lock:
948
            if size == 0 or offset >= self.size():
949
                return b''
950
            readsegs = locators_and_ranges(self._segments, offset, size)
951
            prefetch = locators_and_ranges(self._segments, offset + size, config.KEEP_BLOCK_SIZE, limit=32)
952

    
953
        locs = set()
954
        data = []
955
        for lr in readsegs:
956
            block = self.parent._my_block_manager().get_block_contents(lr.locator, num_retries=num_retries, cache_only=(bool(data) and not exact))
957
            if block:
958
                blockview = memoryview(block)
959
                data.append(blockview[lr.segment_offset:lr.segment_offset+lr.segment_size].tobytes())
960
                locs.add(lr.locator)
961
            else:
962
                break
963

    
964
        for lr in prefetch:
965
            if lr.locator not in locs:
966
                self.parent._my_block_manager().block_prefetch(lr.locator)
967
                locs.add(lr.locator)
968

    
969
        return b''.join(data)
970

    
971
    def _repack_writes(self, num_retries):
972
        """Test if the buffer block has more data than actual segments.
973

974
        This happens when a buffered write over-writes a file range written in
975
        a previous buffered write.  Re-pack the buffer block for efficiency
976
        and to avoid leaking information.
977

978
        """
979
        segs = self._segments
980

    
981
        # Sum up the segments to get the total bytes of the file referencing
982
        # into the buffer block.
983
        bufferblock_segs = [s for s in segs if s.locator == self._current_bblock.blockid]
984
        write_total = sum([s.range_size for s in bufferblock_segs])
985

    
986
        if write_total < self._current_bblock.size():
987
            # There is more data in the buffer block than is actually accounted for by segments, so
988
            # re-pack into a new buffer by copying over to a new buffer block.
989
            contents = self.parent._my_block_manager().get_block_contents(self._current_bblock.blockid, num_retries)
990
            new_bb = self.parent._my_block_manager().alloc_bufferblock(self._current_bblock.blockid, starting_capacity=write_total, owner=self)
991
            for t in bufferblock_segs:
992
                new_bb.append(contents[t.segment_offset:t.segment_offset+t.range_size])
993
                t.segment_offset = new_bb.size() - t.range_size
994

    
995
            self._current_bblock = new_bb
996

    
997
    @must_be_writable
998
    @synchronized
999
    def writeto(self, offset, data, num_retries):
1000
        """Write `data` to the file starting at `offset`.
1001

1002
        This will update existing bytes and/or extend the size of the file as
1003
        necessary.
1004

1005
        """
1006
        if not isinstance(data, bytes):
1007
            data = data.encode()
1008
        if len(data) == 0:
1009
            return
1010

    
1011
        if offset > self.size():
1012
            raise ArgumentError("Offset is past the end of the file")
1013

    
1014
        if len(data) > config.KEEP_BLOCK_SIZE:
1015
            # Chunk it up into smaller writes
1016
            n = 0
1017
            dataview = memoryview(data)
1018
            while n < len(data):
1019
                self.writeto(offset+n, dataview[n:n + config.KEEP_BLOCK_SIZE].tobytes(), num_retries)
1020
                n += config.KEEP_BLOCK_SIZE
1021
            return
1022

    
1023
        self.set_committed(False)
1024

    
1025
        if self._current_bblock is None or self._current_bblock.state() != _BufferBlock.WRITABLE:
1026
            self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1027

    
1028
        if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1029
            self._repack_writes(num_retries)
1030
            if (self._current_bblock.size() + len(data)) > config.KEEP_BLOCK_SIZE:
1031
                self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=False)
1032
                self._current_bblock = self.parent._my_block_manager().alloc_bufferblock(owner=self)
1033

    
1034
        self._current_bblock.append(data)
1035

    
1036
        replace_range(self._segments, offset, len(data), self._current_bblock.blockid, self._current_bblock.write_pointer - len(data))
1037

    
1038
        self.parent.notify(WRITE, self.parent, self.name, (self, self))
1039

    
1040
        return len(data)
1041

    
1042
    @synchronized
1043
    def flush(self, sync=True, num_retries=0):
1044
        """Flush the current bufferblock to Keep.
1045

1046
        :sync:
1047
          If True, commit block synchronously, wait until buffer block has been written.
1048
          If False, commit block asynchronously, return immediately after putting block into
1049
          the keep put queue.
1050
        """
1051
        if self.committed():
1052
            return
1053

    
1054
        if self._current_bblock and self._current_bblock.state() != _BufferBlock.COMMITTED:
1055
            if self._current_bblock.state() == _BufferBlock.WRITABLE:
1056
                self._repack_writes(num_retries)
1057
            self.parent._my_block_manager().commit_bufferblock(self._current_bblock, sync=sync)
1058

    
1059
        if sync:
1060
            to_delete = set()
1061
            for s in self._segments:
1062
                bb = self.parent._my_block_manager().get_bufferblock(s.locator)
1063
                if bb:
1064
                    if bb.state() != _BufferBlock.COMMITTED:
1065
                        self.parent._my_block_manager().commit_bufferblock(bb, sync=True)
1066
                    to_delete.add(s.locator)
1067
                    s.locator = bb.locator()
1068
            for s in to_delete:
1069
               self.parent._my_block_manager().delete_bufferblock(s)
1070

    
1071
        self.parent.notify(MOD, self.parent, self.name, (self, self))
1072

    
1073
    @must_be_writable
1074
    @synchronized
1075
    def add_segment(self, blocks, pos, size):
1076
        """Add a segment to the end of the file.
1077

1078
        `pos` and `offset` reference a section of the stream described by
1079
        `blocks` (a list of Range objects)
1080

1081
        """
1082
        self._add_segment(blocks, pos, size)
1083

    
1084
    def _add_segment(self, blocks, pos, size):
1085
        """Internal implementation of add_segment."""
1086
        self.set_committed(False)
1087
        for lr in locators_and_ranges(blocks, pos, size):
1088
            last = self._segments[-1] if self._segments else Range(0, 0, 0, 0)
1089
            r = Range(lr.locator, last.range_start+last.range_size, lr.segment_size, lr.segment_offset)
1090
            self._segments.append(r)
1091

    
1092
    @synchronized
1093
    def size(self):
1094
        """Get the file size."""
1095
        if self._segments:
1096
            n = self._segments[-1]
1097
            return n.range_start + n.range_size
1098
        else:
1099
            return 0
1100

    
1101
    @synchronized
1102
    def manifest_text(self, stream_name=".", portable_locators=False,
1103
                      normalize=False, only_committed=False):
1104
        buf = ""
1105
        filestream = []
1106
        for segment in self.segments:
1107
            loc = segment.locator
1108
            if self.parent._my_block_manager().is_bufferblock(loc):
1109
                if only_committed:
1110
                    continue
1111
                loc = self._bufferblocks[loc].calculate_locator()
1112
            if portable_locators:
1113
                loc = KeepLocator(loc).stripped()
1114
            filestream.append(LocatorAndRange(loc, locator_block_size(loc),
1115
                                 segment.segment_offset, segment.range_size))
1116
        buf += ' '.join(normalize_stream(stream_name, {stream_name: filestream}))
1117
        buf += "\n"
1118
        return buf
1119

    
1120
    @must_be_writable
1121
    @synchronized
1122
    def _reparent(self, newparent, newname):
1123
        self.set_committed(False)
1124
        self.flush(sync=True)
1125
        self.parent.remove(self.name)
1126
        self.parent = newparent
1127
        self.name = newname
1128
        self.lock = self.parent.root_collection().lock
1129

    
1130

    
1131
class ArvadosFileReader(ArvadosFileReaderBase):
1132
    """Wraps ArvadosFile in a file-like object supporting reading only.
1133

1134
    Be aware that this class is NOT thread safe as there is no locking around
1135
    updating file pointer.
1136

1137
    """
1138

    
1139
    def __init__(self, arvadosfile, num_retries=None):
1140
        super(ArvadosFileReader, self).__init__(arvadosfile.name, "r", num_retries=num_retries)
1141
        self.arvadosfile = arvadosfile
1142

    
1143
    def size(self):
1144
        return self.arvadosfile.size()
1145

    
1146
    def stream_name(self):
1147
        return self.arvadosfile.parent.stream_name()
1148

    
1149
    @_FileLikeObjectBase._before_close
1150
    @retry_method
1151
    def read(self, size=None, num_retries=None):
1152
        """Read up to `size` bytes from the file and return the result.
1153

1154
        Starts at the current file position.  If `size` is None, read the
1155
        entire remainder of the file.
1156
        """
1157
        if size is None:
1158
            data = []
1159
            rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1160
            while rd:
1161
                data.append(rd)
1162
                self._filepos += len(rd)
1163
                rd = self.arvadosfile.readfrom(self._filepos, config.KEEP_BLOCK_SIZE, num_retries)
1164
            return b''.join(data)
1165
        else:
1166
            data = self.arvadosfile.readfrom(self._filepos, size, num_retries, exact=True)
1167
            self._filepos += len(data)
1168
            return data
1169

    
1170
    @_FileLikeObjectBase._before_close
1171
    @retry_method
1172
    def readfrom(self, offset, size, num_retries=None):
1173
        """Read up to `size` bytes from the stream, starting at the specified file offset.
1174

1175
        This method does not change the file position.
1176
        """
1177
        return self.arvadosfile.readfrom(offset, size, num_retries)
1178

    
1179
    def flush(self):
1180
        pass
1181

    
1182

    
1183
class ArvadosFileWriter(ArvadosFileReader):
1184
    """Wraps ArvadosFile in a file-like object supporting both reading and writing.
1185

1186
    Be aware that this class is NOT thread safe as there is no locking around
1187
    updating file pointer.
1188

1189
    """
1190

    
1191
    def __init__(self, arvadosfile, mode, num_retries=None):
1192
        super(ArvadosFileWriter, self).__init__(arvadosfile, num_retries=num_retries)
1193
        self.mode = mode
1194
        self.arvadosfile.add_writer(self)
1195

    
1196
    @_FileLikeObjectBase._before_close
1197
    @retry_method
1198
    def write(self, data, num_retries=None):
1199
        if self.mode[0] == "a":
1200
            self.arvadosfile.writeto(self.size(), data, num_retries)
1201
        else:
1202
            self.arvadosfile.writeto(self._filepos, data, num_retries)
1203
            self._filepos += len(data)
1204
        return len(data)
1205

    
1206
    @_FileLikeObjectBase._before_close
1207
    @retry_method
1208
    def writelines(self, seq, num_retries=None):
1209
        for s in seq:
1210
            self.write(s, num_retries=num_retries)
1211

    
1212
    @_FileLikeObjectBase._before_close
1213
    def truncate(self, size=None):
1214
        if size is None:
1215
            size = self._filepos
1216
        self.arvadosfile.truncate(size)
1217
        if self._filepos > self.size():
1218
            self._filepos = self.size()
1219

    
1220
    @_FileLikeObjectBase._before_close
1221
    def flush(self):
1222
        self.arvadosfile.flush()
1223

    
1224
    def close(self, flush=True):
1225
        if not self.closed:
1226
            self.arvadosfile.remove_writer(self, flush)
1227
            super(ArvadosFileWriter, self).close()