Coverage for distro_tracker/core/tasks/base.py: 99%

187 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-01-12 09:15 +0000

1# Copyright 2018 The Distro Tracker Developers 

2# See the COPYRIGHT file at the top-level directory of this distribution and 

3# at https://deb.li/DTAuthors 

4# 

5# This file is part of Distro Tracker. It is subject to the license terms 

6# in the LICENSE file found in the top-level directory of this 

7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker, 

8# including this file, may be copied, modified, propagated, or distributed 

9# except according to the terms contained in the LICENSE file. 

10""" 

11Base class to implement Tasks. 

12 

13Tasks are used to execute (possibly long-running) operations that need 

14to happen regularly to update distro-tracker's data. 

15""" 

16import importlib 

17import logging 

18from datetime import timedelta 

19 

20from django.conf import settings 

21 

22from distro_tracker.core.models import TaskData 

23from distro_tracker.core.tasks.schedulers import Scheduler 

24from distro_tracker.core.utils import now 

25from distro_tracker.core.utils.misc import ( 

26 call_methods_with_prefix, 

27 get_data_checksum, 

28) 

29from distro_tracker.core.utils.plugins import PluginRegistry 

30 

31logger = logging.getLogger('distro_tracker.tasks') 

32 

33 

34class BaseTask(metaclass=PluginRegistry): 

35 """ 

36 A class representing the base class for all data processing tasks of 

37 Distro Tracker. 

38 

39 Sub-classes should provide 'execute_*' methods that do the real work 

40 of the task. They should also override the 'Scheduler' class to have 

41 a more useful scheduling policy than the default (which will always 

42 decide to run the task). 

43 

44 ..note:: 

45 Subclasses of this class are automatically registered when created which 

46 allows the :class:`BaseTask` to have the full picture of all tasks and 

47 their mutual dependencies. However, to make sure the subclass is always 

48 loaded, make sure to place it in a ``tracker_tasks`` module at the top 

49 level of a Django app. 

50 """ 

51 

52 class ConcurrentDataUpdate(RuntimeError): 

53 pass 

54 

55 class LockError(RuntimeError): 

56 pass 

57 

58 class Scheduler(Scheduler): 

59 """ 

60 Each task has an associated 

61 :class:`~distro_tracker.core.tasks.schedulers.Scheduler` class that 

62 will be used to decide when to run the task. This class 

63 is meant to be overriden by a custom class deriving from a more 

64 useful Scheduler class provided in 

65 :mod:`distro_tracker.core.tasks.schedulers`. 

66 """ 

67 pass 

68 

69 @classmethod 

70 def task_name(cls): 

71 """ 

72 Returns the name of the task. By default, it uses the class name 

73 but this can be overriden by setting a :py:attr:`.NAME` attribute on the 

74 class. 

75 

76 This name is used to uniquely identify the task, notably to store 

77 its internal data into :class:`~distro_tracker.core.models.TaskData`. 

78 

79 :return: the name of the task 

80 :rtype: str 

81 """ 

82 if hasattr(cls, 'NAME'): 

83 return cls.NAME 

84 else: 

85 return cls.__name__ 

86 

87 def __init__(self, *args, **kwargs): 

88 self.scheduler = self.Scheduler(self) 

89 self.data_is_modified = False 

90 self.event_handlers = {} 

91 self.initialize(*args, **kwargs) 

92 super().__init__() 

93 

94 def initialize(self, *args, **kwargs): 

95 """ 

96 Process arguments passed to :meth:`__init__()`. Can be overriden 

97 to do other runtime preparation. 

98 

99 For proper cooperation, you should usually call the method on the 

100 object returned by ``super()`` (if it exists). 

101 """ 

102 self.force_update = kwargs.get('force_update', False) 

103 self.fake_update = kwargs.get('fake_update', False) 

104 self.parameters = kwargs 

105 

106 # Call other implementations of the initialize method 

107 super_object = super() 

108 if super_object and hasattr(super_object, 'initialize'): 108 ↛ 109line 108 didn't jump to line 109, because the condition on line 108 was never true

109 super_object.initialize(*args, **kwargs) 

110 

111 @property 

112 def data(self): 

113 """ 

114 A data dictionnary that matches the corresponding 

115 :class:`~distro_tracker.core.models.TaskData`. It is loaded from the 

116 database on first access, and it's saved when you call 

117 the :meth:`.save_data` method. 

118 """ 

119 if not hasattr(self, '_data'): 

120 self.refresh_data() 

121 if self.data_is_modified is False: 

122 self.data_is_modified = None 

123 return self._data 

124 

125 def data_mark_modified(self): 

126 """ 

127 Record the fact that the data dictionnary has been modified and will 

128 have to be saved. 

129 """ 

130 self.data_is_modified = True 

131 

132 @property 

133 def task_data(self): 

134 """ 

135 Returns the corresponding :class:`~distro_tracker.core.models.TaskData`. 

136 """ 

137 if not hasattr(self, '_task_data'): 

138 self.refresh_data() 

139 return self._task_data 

140 

141 def save_data(self, **kwargs): 

142 """ 

143 Save the :attr:`.data` attribute in the corresponding 

144 :class:`~distro_tracker.core.models.TaskData` model in a way 

145 that ensures that we don't overwrite any concurrent update. 

146 

147 :raises BaseTask.ConcurrentUpdateError: when the update is not possible 

148 without risking to lose another update that happened in parallel. 

149 """ 

150 kwargs['data'] = self._data 

151 if not self._task_data.versioned_update(**kwargs): 

152 raise self.ConcurrentDataUpdate( 

153 'Data from task {} have been updated in parallel'.format( 

154 self.task_name())) 

155 self.data_is_modified = False 

156 

157 def refresh_data(self): 

158 """ 

159 Load (or reload) task data from the database. 

160 """ 

161 task_data, _ = TaskData.objects.get_or_create( 

162 task_name=self.task_name()) 

163 self._data = task_data.data 

164 self._task_data = task_data 

165 

166 def update_field(self, field, value): 

167 """ 

168 Update a field of the associated TaskData with the given value 

169 and save it to the database. None of the other fields are saved. 

170 This update does not increase the version in the TaskData. 

171 

172 :param str field: The name of the field to update. 

173 :param str value: The value to store in the field. 

174 """ 

175 setattr(self.task_data, field, value) 

176 self.task_data.save(update_fields=[field]) 

177 

178 def update_last_attempted_run(self, value): 

179 self.update_field('last_attempted_run', value) 

180 

181 def update_last_completed_run(self, value): 

182 self.update_field('last_completed_run', value) 

183 

184 def update_task_is_pending(self, value): 

185 self.update_field('task_is_pending', value) 

186 

187 @property 

188 def task_is_pending(self): 

189 return self.task_data.task_is_pending 

190 

191 @property 

192 def last_attempted_run(self): 

193 return self.task_data.last_attempted_run 

194 

195 @property 

196 def last_completed_run(self): 

197 return self.task_data.last_completed_run 

198 

199 def log(self, message, *args, **kwargs): 

200 """Log a message about the progress of the task""" 

201 if 'level' in kwargs: 

202 level = kwargs['level'] 

203 del kwargs['level'] 

204 else: 

205 level = logging.INFO 

206 message = "{} {}".format(self.task_name(), message) 

207 logger.log(level, message, *args, **kwargs) 

208 

209 @classmethod 

210 def get_task_class_by_name(cls, task_name): 

211 """ 

212 Returns a :class:`BaseTask` subclass which has the given name, i.e. its 

213 :meth:`.task_name` method returns the ``task_name`` given in the 

214 parameters. 

215 

216 :param str task_name: The name of the task which should be returned. 

217 """ 

218 for task_class in cls.plugins: 

219 if task_class.task_name() == task_name: 

220 return task_class 

221 return None 

222 

223 def schedule(self): 

224 """ 

225 Asks the scheduler if the task needs to be executed. If yes, then 

226 records this information in the ``task_is_pending`` field. If the task 

227 is already marked as pending, then returns True immediately. 

228 

229 :return: True if the task needs to be executed, False otherwise. 

230 :rtype: bool 

231 """ 

232 if self.task_is_pending: 

233 return True 

234 if self.scheduler.needs_to_run(): 

235 self.update_task_is_pending(True) 

236 return self.task_is_pending 

237 

238 def execute(self): 

239 """ 

240 Performs the actual processing of the task. 

241 

242 First records the timestamp of the run, stores it in the 

243 'last_attempted_run' field, then executes all the methods whose names 

244 are starting with ``execute_``, then updates the 'last_completed_run' 

245 field with the same timestamp (thus documenting the success of the last 

246 run) and clears the 'task_is_pending' flag. 

247 """ 

248 if not self.task_data.get_run_lock(): 

249 raise self.LockError('Could not get lock for task {}'.format( 

250 self.task_name())) 

251 

252 try: 

253 timestamp = now() 

254 self.update_last_attempted_run(timestamp) 

255 self.handle_event('execute-started') 

256 call_methods_with_prefix(self, 'execute_') 

257 self.handle_event('execute-finished') 

258 except Exception: 

259 self.handle_event('execute-failed') 

260 raise 

261 finally: 

262 self.update_field('run_lock', None) 

263 

264 if self.data_is_modified is True: 

265 self.save_data() 

266 elif self.data_is_modified is None: 

267 checksum = get_data_checksum(self._data) 

268 if checksum != self.task_data.data_checksum: 

269 self.save_data(data_checksum=checksum) 

270 

271 self.update_last_completed_run(timestamp) 

272 self.update_task_is_pending(False) 

273 

274 def register_event_handler(self, event, function): 

275 """ 

276 Register a function to execute in response to a specific event. 

277 

278 There's no validation done on the event name. The following events are 

279 known to be in use: 

280 

281 * execute-started (at the start of the execute method) 

282 * execute-finished (at the end of the execute method, in case of 

283 success) 

284 * execute-failed (at the end of the execute method, in case of failure) 

285 

286 :param str event: the name of the event to handle 

287 :param function: a function or any callable object 

288 """ 

289 handlers = self.event_handlers.setdefault(event, []) 

290 if function not in handlers: 

291 handlers.append(function) 

292 

293 def handle_event(self, event, *args, **kwargs): 

294 """ 

295 This method is called at various places (with different values passed 

296 to the event parameter) and is a way to let sub-classes, mixins, and 

297 users add their own behaviour. 

298 

299 :param str event: a string describing the event that happened 

300 """ 

301 for function in self.event_handlers.get(event, []): 

302 function(*args, **kwargs) 

303 

304 def lock_expires_soon(self, delay=600): 

305 """ 

306 :param int delay: The number of seconds allowed before the lock is 

307 considered to expire soon. 

308 :return: True if the lock is about to expire in the given delay. Returns 

309 False otherwise. 

310 :rtype: bool 

311 """ 

312 if self.task_data.run_lock is None: 

313 return False 

314 return self.task_data.run_lock <= now() + timedelta(seconds=delay) 

315 

316 def extend_lock(self, delay=1800, expire_delay=600): 

317 """ 

318 Extends the duration of the lock with the given `delay` if it's 

319 about to expire soon (as defined by the `expire_delay` parameter). 

320 

321 :param int expire_delay: The number of seconds allowed before the lock 

322 is considered to expire soon. 

323 :param int delay: The number of seconds to add the expiration time of 

324 the lock. 

325 """ 

326 if self.lock_expires_soon(delay=expire_delay): 

327 self.task_data.extend_run_lock(delay=delay) 

328 return True 

329 return False 

330 

331 

332def import_all_tasks(): 

333 """ 

334 Imports tasks found in each installed app's ``tracker_tasks`` module. 

335 """ 

336 for app in settings.INSTALLED_APPS: 

337 try: 

338 module_name = app + '.' + 'tracker_tasks' 

339 importlib.import_module(module_name) 

340 except ImportError: 

341 # The app does not implement Distro Tracker tasks. 

342 pass 

343 # This one is an exception, many core tasks are there 

344 import distro_tracker.core.retrieve_data # noqa 

345 

346 

347def run_task(task, *args, **kwargs): 

348 """ 

349 Executes the requested task. 

350 

351 :param task: The task which should be run. Either the class object 

352 of the task, or a string giving the task's name, or the task object 

353 itself. 

354 :type task: :class:`BaseTask` subclass or :class:`str` 

355 

356 :returns: True is the task executed without errors, False if it raised 

357 an exception during its execution. 

358 """ 

359 # Import tasks implemented by all installed apps 

360 import_all_tasks() 

361 

362 task_class = None 

363 if isinstance(task, str): 

364 task_name = task 

365 task_class = BaseTask.get_task_class_by_name(task_name) 

366 if not task_class: 

367 raise ValueError("Task '%s' doesn't exist." % task_name) 

368 task = task_class(*args, **kwargs) 

369 elif isinstance(task, BaseTask): 

370 pass 

371 elif callable(task) and hasattr(task, 'execute'): 

372 task_class = task 

373 task = task_class(*args, **kwargs) 

374 else: 

375 raise ValueError("Can't run a task with a '{}'.".format(repr(task))) 

376 logger.info("Starting task %s", task.task_name()) 

377 try: 

378 task.execute() 

379 except task.LockError: 

380 logger.info("Task %s has been skipped due to its run lock", 

381 task.task_name()) 

382 return False 

383 except Exception: 

384 logger.exception("Task %s failed with the following traceback.", 

385 task.task_name()) 

386 return False 

387 logger.info("Completed task %s", task.task_name()) 

388 return True 

389 

390 

391def build_all_tasks(*args, **kwargs): 

392 """ 

393 Builds all the task objects out of the BaseTask sub-classes registered. 

394 

395 :returns: a dict mapping the task name to the corresponding Task instance. 

396 :rtype dict: 

397 :raises ValueError: if multiple tasks have the same name. 

398 """ 

399 import_all_tasks() 

400 tasks = {} 

401 for task_class in BaseTask.plugins: 

402 task_name = task_class.task_name() 

403 if task_name in tasks: 

404 raise ValueError("Multiple tasks with the same name: {}".format( 

405 task_name)) 

406 tasks[task_class.task_name()] = task_class(*args, **kwargs) 

407 return tasks 

408 

409 

410def run_all_tasks(*args, **kwargs): 

411 """ 

412 Builds all task and then iterates over them to check if they need 

413 to be scheduled. If yes, then executes them with :func:`run_task`. 

414 

415 The special task "UpdateRepositoriesTask" is always executed 

416 first. The execution order of the other tasks is undetermined. 

417 """ 

418 tasks = build_all_tasks(*args, **kwargs) 

419 

420 for task in tasks.values(): 

421 task.schedule() 

422 

423 if 'UpdateRepositoriesTask' in tasks: 423 ↛ 428line 423 didn't jump to line 428, because the condition on line 423 was never false

424 task = tasks.pop('UpdateRepositoriesTask') 

425 if task.task_is_pending: 

426 run_task(task) 

427 

428 for task in tasks.values(): 

429 if task.task_is_pending: 

430 run_task(task)