Project

General

Profile

Bug #21541 » arv-mount-stress-test.py

Version 3 - Brett Smith, 03/22/2024 03:13 PM

 
1
#!/usr/bin/env python3
2

    
3
import argparse
4
import dataclasses
5
import io
6
import itertools
7
import json
8
import logging
9
import logging.handlers
10
# Using multiprocessing not for the usual GIL avoidance reasons,
11
# but because of historical risks of mixing subprocess+threading.
12
import multiprocessing
13
import os
14
import subprocess
15
import sys
16
import tempfile
17
import time
18

    
19
from pathlib import Path
20
from typing import Optional
21

    
22
logger = logging.getLogger('arv-mount-stress')
23
logger.addHandler(logging.handlers.SysLogHandler('/dev/log'))
24
logger.setLevel(logging.DEBUG)
25

    
26
@dataclasses.dataclass
27
class MountState:
28
    mount_path: Path
29
    unit_name: str
30
    ready_flag: multiprocessing.Event
31
    crash_flag: multiprocessing.Event
32
    returncode: Optional[int]
33

    
34
    def __init__(self, mount_parent):
35
        self.mount_path = Path(tempfile.mkdtemp(
36
            prefix='arv-mount-stress-',
37
            dir=mount_parent,
38
        ))
39
        self.unit_name = f'{self.mount_path.stem}.service'
40
        self.ready_flag = multiprocessing.Event()
41
        self.crash_flag = multiprocessing.Event()
42
        self.returncode = None
43

    
44

    
45
def follow_journal(mount_state, journal_fd):
46
    with open(journal_fd) as journal_out:
47
        for line in journal_out:
48
            if line.endswith(' ERROR: Unhandled exception during FUSE operation\n'):
49
                mount_state.crash_flag.set()
50
    
51
def schedule_tries(mount_state, start_sleep=90, sleep_mult=2, stop_sleep=900):
52
    tries = itertools.count(1)
53
    sleep_time = start_sleep
54
    while sleep_time < stop_sleep:
55
        start_time = time.time()
56
        yield next(tries)
57
        if mount_state.crash_flag.is_set():
58
            break
59
        else:
60
            elapsed_time = time.time() - start_time
61
            time.sleep(max(0, sleep_time - elapsed_time))
62
            sleep_time *= sleep_mult
63

    
64
def stress_mount_unlimited_subprocesses(mount_state):
65
    procs = [
66
        subprocess.Popen(
67
            ['ls', '-lR', str(path)],
68
            stdin=subprocess.DEVNULL,
69
            stdout=subprocess.DEVNULL,
70
        ) for path in mount_state.mount_path.iterdir()
71
    ]
72
    logger.debug("Running %s ls processes", len(procs))
73
    result = max(proc.wait() for proc in procs)
74
    logger.debug("Stress returncode = %d", result)
75
    return result
76

    
77
def walk_dir(start_path):
78
    path_queue = [start_path]
79
    seen_projects = set()
80
    while path_queue:
81
        root_path = path_queue.pop()
82
        try:
83
            arv_meta_file = (root_path / '.arvados#project').open('rb')
84
        except FileNotFoundError:
85
            descend = not (root_path / '.arvados#collection').exists()
86
        else:
87
            with arv_meta_file:
88
                arv_metadata = json.load(arv_meta_file)
89
            descend = arv_metadata['uuid'] not in seen_projects
90
            seen_projects.add(arv_metadata['uuid'])
91
        child_paths = [path for path in root_path.iterdir() if path.is_dir()]
92
        if descend:
93
            path_queue.extend(child_paths)
94

    
95
def stress_mount_process_pool(mount_state, pool_size):
96
    with multiprocessing.Pool(pool_size) as pool:
97
        for result in pool.imap_unordered(walk_dir, mount_state.mount_path.iterdir()):
98
            pass
99
    return os.EX_OK
100

    
101
def clean_mount(mount_state):
102
    with subprocess.Popen(
103
        ['fusermount', '-qu', str(mount_state.mount_path)],
104
        stdin=subprocess.DEVNULL,
105
    ) as umount_proc:
106
        try:
107
            umount_proc.wait(10)
108
        except subprocess.TimeoutExpired:
109
            subprocess.run([
110
                'systemctl', '--user',
111
                'kill', mount_state.unit_name,
112
            ])
113
            umount_proc.wait(20)
114
    return umount_proc.returncode
115

    
116
def parse_arguments(arglist=None):
117
    parser = argparse.ArgumentParser()
118
    parser.add_argument(
119
        '--jobs', '-j',
120
        type=int,
121
        default=8,
122
        help="Number of parallel accesses during test",
123
    )
124
    return parser.parse_known_args(arglist)
125

    
126
def main(arglist=None):
127
    args, arv_mount_opts = parse_arguments(arglist)
128
    mount_parent = Path(
129
        os.environ.get('XDG_RUNTIME_DIR')
130
        or os.environ.get('TMPDIR')
131
        or '/tmp'
132
    )
133
    mount_state = MountState(mount_parent)
134
    logger.debug("starting mount service %s", mount_state.unit_name)
135
    unit_arg = f'--unit={mount_state.unit_name}'
136
    subprocess.run([
137
        'systemd-run', '--user', unit_arg, '--quiet',
138
        'arv-mount', '--foreground', '--read-only', '--shared',
139
        f'--directory-cache={2 << 20}',
140
        *arv_mount_opts,
141
        '--', str(mount_state.mount_path),
142
    ], stdin=subprocess.DEVNULL, check=True)
143
    journal_proc = subprocess.Popen(
144
        ['journalctl', '--user', unit_arg, '--follow', '--output=cat'],
145
        stdin=subprocess.DEVNULL,
146
        stdout=subprocess.PIPE,
147
    )
148
    follow_proc = multiprocessing.Process(
149
        target=follow_journal,
150
        args=(mount_state, journal_proc.stdout.fileno()),
151
    )
152
    follow_proc.start()
153

    
154
    try:
155
        logger.debug("waiting for mount")
156
        for _ in schedule_tries(mount_state, 1, 2, 60):
157
            if have_contents := any(mount_state.mount_path.iterdir()):
158
                break
159
        assert have_contents, "mount never had contents"
160
        for count in schedule_tries(mount_state):
161
            logger.debug("starting stress test #%d", count)
162
            stress_returncode = stress_mount_process_pool(mount_state, args.jobs)
163
            if stress_returncode != os.EX_OK:
164
                break
165
    finally:
166
        journal_proc.terminate()
167
        umount_returncode = clean_mount(mount_state)
168
        follow_proc.join()
169
        mount_state.mount_path.rmdir()
170

    
171
    if stress_returncode != os.EX_OK:
172
        if sys.stdout.isatty():
173
            subprocess.run(['journalctl', '--user', '--no-pager', unit_arg])
174
        return stress_returncode
175
    else:
176
        return umount_returncode
177

    
178
if __name__ == '__main__':
179
    log_fmt = logging.Formatter('%(asctime)s %(name)s[%(process)d]: %(levelname)s: %(message)s')
180
    log_handler = logging.StreamHandler()
181
    log_handler.setFormatter(log_fmt)
182
    logger.addHandler(log_handler)
183
    sys.exit(main())
(2-2/2)