1
|
#!/usr/bin/env python3
|
2
|
|
3
|
import argparse
|
4
|
import dataclasses
|
5
|
import io
|
6
|
import itertools
|
7
|
import json
|
8
|
import logging
|
9
|
import logging.handlers
|
10
|
# Using multiprocessing not for the usual GIL avoidance reasons,
|
11
|
# but because of historical risks of mixing subprocess+threading.
|
12
|
import multiprocessing
|
13
|
import os
|
14
|
import subprocess
|
15
|
import sys
|
16
|
import tempfile
|
17
|
import time
|
18
|
|
19
|
from pathlib import Path
|
20
|
from typing import Optional
|
21
|
|
22
|
logger = logging.getLogger('arv-mount-stress')
|
23
|
logger.addHandler(logging.handlers.SysLogHandler('/dev/log'))
|
24
|
logger.setLevel(logging.DEBUG)
|
25
|
|
26
|
@dataclasses.dataclass
|
27
|
class MountState:
|
28
|
mount_path: Path
|
29
|
unit_name: str
|
30
|
ready_flag: multiprocessing.Event
|
31
|
crash_flag: multiprocessing.Event
|
32
|
returncode: Optional[int]
|
33
|
|
34
|
def __init__(self, mount_parent):
|
35
|
self.mount_path = Path(tempfile.mkdtemp(
|
36
|
prefix='arv-mount-stress-',
|
37
|
dir=mount_parent,
|
38
|
))
|
39
|
self.unit_name = f'{self.mount_path.stem}.service'
|
40
|
self.ready_flag = multiprocessing.Event()
|
41
|
self.crash_flag = multiprocessing.Event()
|
42
|
self.returncode = None
|
43
|
|
44
|
|
45
|
def follow_journal(mount_state, journal_fd):
|
46
|
with open(journal_fd) as journal_out:
|
47
|
for line in journal_out:
|
48
|
if line.endswith(' ERROR: Unhandled exception during FUSE operation\n'):
|
49
|
mount_state.crash_flag.set()
|
50
|
|
51
|
def schedule_tries(mount_state, start_sleep=90, sleep_mult=2, stop_sleep=900):
|
52
|
tries = itertools.count(1)
|
53
|
sleep_time = start_sleep
|
54
|
while sleep_time < stop_sleep:
|
55
|
start_time = time.time()
|
56
|
yield next(tries)
|
57
|
if mount_state.crash_flag.is_set():
|
58
|
break
|
59
|
else:
|
60
|
elapsed_time = time.time() - start_time
|
61
|
time.sleep(max(0, sleep_time - elapsed_time))
|
62
|
sleep_time *= sleep_mult
|
63
|
|
64
|
def stress_mount_unlimited_subprocesses(mount_state):
|
65
|
procs = [
|
66
|
subprocess.Popen(
|
67
|
['ls', '-lR', str(path)],
|
68
|
stdin=subprocess.DEVNULL,
|
69
|
stdout=subprocess.DEVNULL,
|
70
|
) for path in mount_state.mount_path.iterdir()
|
71
|
]
|
72
|
logger.debug("Running %s ls processes", len(procs))
|
73
|
result = max(proc.wait() for proc in procs)
|
74
|
logger.debug("Stress returncode = %d", result)
|
75
|
return result
|
76
|
|
77
|
def walk_dir(start_path):
|
78
|
path_queue = [start_path]
|
79
|
seen_projects = set()
|
80
|
while path_queue:
|
81
|
root_path = path_queue.pop()
|
82
|
try:
|
83
|
arv_meta_file = (root_path / '.arvados#project').open('rb')
|
84
|
except FileNotFoundError:
|
85
|
descend = not (root_path / '.arvados#collection').exists()
|
86
|
else:
|
87
|
with arv_meta_file:
|
88
|
arv_metadata = json.load(arv_meta_file)
|
89
|
descend = arv_metadata['uuid'] not in seen_projects
|
90
|
seen_projects.add(arv_metadata['uuid'])
|
91
|
child_paths = [path for path in root_path.iterdir() if path.is_dir()]
|
92
|
if descend:
|
93
|
path_queue.extend(child_paths)
|
94
|
|
95
|
def stress_mount_process_pool(mount_state, pool_size):
|
96
|
with multiprocessing.Pool(pool_size) as pool:
|
97
|
for result in pool.imap_unordered(walk_dir, mount_state.mount_path.iterdir()):
|
98
|
pass
|
99
|
return os.EX_OK
|
100
|
|
101
|
def clean_mount(mount_state):
|
102
|
with subprocess.Popen(
|
103
|
['fusermount', '-qu', str(mount_state.mount_path)],
|
104
|
stdin=subprocess.DEVNULL,
|
105
|
) as umount_proc:
|
106
|
try:
|
107
|
umount_proc.wait(10)
|
108
|
except subprocess.TimeoutExpired:
|
109
|
subprocess.run([
|
110
|
'systemctl', '--user',
|
111
|
'kill', mount_state.unit_name,
|
112
|
])
|
113
|
umount_proc.wait(20)
|
114
|
return umount_proc.returncode
|
115
|
|
116
|
def parse_arguments(arglist=None):
|
117
|
parser = argparse.ArgumentParser()
|
118
|
parser.add_argument(
|
119
|
'--jobs', '-j',
|
120
|
type=int,
|
121
|
default=8,
|
122
|
help="Number of parallel accesses during test",
|
123
|
)
|
124
|
return parser.parse_known_args(arglist)
|
125
|
|
126
|
def main(arglist=None):
|
127
|
args, arv_mount_opts = parse_arguments(arglist)
|
128
|
mount_parent = Path(
|
129
|
os.environ.get('XDG_RUNTIME_DIR')
|
130
|
or os.environ.get('TMPDIR')
|
131
|
or '/tmp'
|
132
|
)
|
133
|
mount_state = MountState(mount_parent)
|
134
|
logger.debug("starting mount service %s", mount_state.unit_name)
|
135
|
unit_arg = f'--unit={mount_state.unit_name}'
|
136
|
subprocess.run([
|
137
|
'systemd-run', '--user', unit_arg, '--quiet',
|
138
|
'arv-mount', '--foreground', '--read-only', '--shared',
|
139
|
f'--directory-cache={2 << 20}',
|
140
|
*arv_mount_opts,
|
141
|
'--', str(mount_state.mount_path),
|
142
|
], stdin=subprocess.DEVNULL, check=True)
|
143
|
journal_proc = subprocess.Popen(
|
144
|
['journalctl', '--user', unit_arg, '--follow', '--output=cat'],
|
145
|
stdin=subprocess.DEVNULL,
|
146
|
stdout=subprocess.PIPE,
|
147
|
)
|
148
|
follow_proc = multiprocessing.Process(
|
149
|
target=follow_journal,
|
150
|
args=(mount_state, journal_proc.stdout.fileno()),
|
151
|
)
|
152
|
follow_proc.start()
|
153
|
|
154
|
try:
|
155
|
logger.debug("waiting for mount")
|
156
|
for _ in schedule_tries(mount_state, 1, 2, 60):
|
157
|
if have_contents := any(mount_state.mount_path.iterdir()):
|
158
|
break
|
159
|
assert have_contents, "mount never had contents"
|
160
|
for count in schedule_tries(mount_state):
|
161
|
logger.debug("starting stress test #%d", count)
|
162
|
stress_returncode = stress_mount_process_pool(mount_state, args.jobs)
|
163
|
if stress_returncode != os.EX_OK:
|
164
|
break
|
165
|
finally:
|
166
|
journal_proc.terminate()
|
167
|
umount_returncode = clean_mount(mount_state)
|
168
|
follow_proc.join()
|
169
|
mount_state.mount_path.rmdir()
|
170
|
|
171
|
if stress_returncode != os.EX_OK:
|
172
|
if sys.stdout.isatty():
|
173
|
subprocess.run(['journalctl', '--user', '--no-pager', unit_arg])
|
174
|
return stress_returncode
|
175
|
else:
|
176
|
return umount_returncode
|
177
|
|
178
|
if __name__ == '__main__':
|
179
|
log_fmt = logging.Formatter('%(asctime)s %(name)s[%(process)d]: %(levelname)s: %(message)s')
|
180
|
log_handler = logging.StreamHandler()
|
181
|
log_handler.setFormatter(log_fmt)
|
182
|
logger.addHandler(log_handler)
|
183
|
sys.exit(main())
|