Project

General

Profile

Bug #21568 » arv-mount-stress-test.py

Version 1 - Brett Smith, 03/01/2024 09:35 PM

 
1
#!/usr/bin/env python3
2

    
3
import argparse
4
import dataclasses
5
import itertools
6
# Using multiprocessing not for the usual GIL avoidance reasons,
7
# but because of historical risks of mixing subprocess+threading.
8
import multiprocessing
9
import os
10
import subprocess
11
import sys
12
import tempfile
13
import time
14

    
15
from pathlib import Path
16
from typing import Optional
17

    
18
@dataclasses.dataclass
19
class MountState:
20
    mount_path: Path
21
    unit_name: str
22
    ready_flag: multiprocessing.Event
23
    crash_flag: multiprocessing.Event
24
    returncode: Optional[int]
25

    
26
    def __init__(self, mount_parent):
27
        self.mount_path = Path(tempfile.mkdtemp(
28
            prefix='arv-mount-stress-',
29
            dir=mount_parent,
30
        ))
31
        self.unit_name = f'{self.mount_path.stem}.service'
32
        self.ready_flag = multiprocessing.Event()
33
        self.crash_flag = multiprocessing.Event()
34
        self.returncode = None
35

    
36

    
37
def follow_journal(mount_state, journal_fd):
38
    with open(journal_fd) as journal_out:
39
        for line in journal_out:
40
            if line.endswith(' ERROR: Unhandled exception during FUSE operation\n'):
41
                mount_state.crash_flag.set()
42
    
43
def schedule_tries(mount_state, start_sleep=90, sleep_mult=2, stop_sleep=900):
44
    tries = itertools.count(1)
45
    sleep_time = start_sleep
46
    while sleep_time < stop_sleep:
47
        start_time = time.time()
48
        yield next(tries)
49
        if mount_state.crash_flag.is_set():
50
            break
51
        else:
52
            elapsed_time = time.time() - start_time
53
            time.sleep(max(0, sleep_time - elapsed_time))
54
            sleep_time *= sleep_mult
55

    
56
def stress_mount(mount_state):
57
    procs = [
58
        subprocess.Popen(
59
            ['ls', '-lR', str(path)],
60
            stdin=subprocess.DEVNULL,
61
            stdout=subprocess.DEVNULL,
62
        ) for path in mount_state.mount_path.iterdir()
63
    ]
64
    print("Running", len(procs), "ls processes")
65
    result = max(proc.wait() for proc in procs)
66
    print("Stress returncode =", result)
67
    return result
68

    
69
def clean_mount(mount_state):
70
    with subprocess.Popen(
71
        ['fusermount', '-qu', str(mount_state.mount_path)],
72
        stdin=subprocess.DEVNULL,
73
    ) as umount_proc:
74
        try:
75
            umount_proc.wait(10)
76
        except subprocess.TimeoutExpired:
77
            subprocess.run([
78
                'systemctl', '--user',
79
                'kill', f'{mount_state.mount_path.stem}.service',
80
            ])
81
            umount_proc.wait(20)
82
    return umount_proc.returncode
83

    
84
def main(arglist=None):
85
    mount_parent = Path(
86
        os.environ.get('XDG_RUNTIME_DIR')
87
        or os.environ.get('TMPDIR')
88
        or '/tmp'
89
    )
90
    mount_state = MountState(mount_parent)
91
    unit_arg = f'--unit={mount_state.unit_name}'
92
    subprocess.run([
93
        'systemd-run', '--user', unit_arg,
94
        'arv-mount', '--foreground', '--shared',
95
        f'--directory-cache={2 << 20}',
96
        '--', str(mount_state.mount_path),
97
    ], stdin=subprocess.DEVNULL, check=True)
98
    journal_proc = subprocess.Popen(
99
        ['journalctl', '--user', unit_arg, '--follow', '--output=cat'],
100
        stdin=subprocess.DEVNULL,
101
        stdout=subprocess.PIPE,
102
    )
103
    follow_proc = multiprocessing.Process(
104
        target=follow_journal,
105
        args=(mount_state, journal_proc.stdout.fileno()),
106
    )
107
    follow_proc.start()
108
    try:
109
        print("waiting for mount")
110
        for _ in schedule_tries(mount_state, 1, 2, 60):
111
            if have_contents := any(mount_state.mount_path.iterdir()):
112
                break
113
        assert have_contents, "mount never had contents"
114
        for _ in schedule_tries(mount_state):
115
            stress_returncode = stress_mount(mount_state)
116
            if stress_returncode != os.EX_OK:
117
                break
118
    finally:
119
        journal_proc.terminate()
120
        umount_returncode = clean_mount(mount_state)
121
        follow_proc.join()
122
        mount_state.mount_path.rmdir()
123
    if stress_returncode != os.EX_OK:
124
        if sys.stdout.isatty():
125
            subprocess.run(['journalctl', '--user', '--no-pager', unit_arg])
126
        return stress_returncode
127
    else:
128
        return umount_returncode
129

    
130
if __name__ == '__main__':
131
    sys.exit(main(sys.argv[1:]))
(1-1/2)