1# Copyright 2018 The Distro Tracker Developers
2# See the COPYRIGHT file at the top-level directory of this distribution and
3# at https://deb.li/DTAuthors
4#
5# This file is part of Distro Tracker. It is subject to the license terms
6# in the LICENSE file found in the top-level directory of this
7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker,
8# including this file, may be copied, modified, propagated, or distributed
9# except according to the terms contained in the LICENSE file.
10"""
11Base class to implement Tasks.
13Tasks are used to execute (possibly long-running) operations that need
14to happen regularly to update distro-tracker's data.
15"""
16import importlib
17import logging
18from datetime import timedelta
20from django.conf import settings
22from distro_tracker.core.models import TaskData
23from distro_tracker.core.tasks.schedulers import Scheduler
24from distro_tracker.core.utils import now
25from distro_tracker.core.utils.misc import (
26 call_methods_with_prefix,
27 get_data_checksum,
28)
29from distro_tracker.core.utils.plugins import PluginRegistry
31logger = logging.getLogger('distro_tracker.tasks')
34class BaseTask(metaclass=PluginRegistry):
35 """
36 A class representing the base class for all data processing tasks of
37 Distro Tracker.
39 Sub-classes should provide 'execute_*' methods that do the real work
40 of the task. They should also override the 'Scheduler' class to have
41 a more useful scheduling policy than the default (which will always
42 decide to run the task).
44 ..note::
45 Subclasses of this class are automatically registered when created which
46 allows the :class:`BaseTask` to have the full picture of all tasks and
47 their mutual dependencies. However, to make sure the subclass is always
48 loaded, make sure to place it in a ``tracker_tasks`` module at the top
49 level of a Django app.
50 """
52 class ConcurrentDataUpdate(RuntimeError):
53 pass
55 class LockError(RuntimeError):
56 pass
58 class Scheduler(Scheduler):
59 """
60 Each task has an associated
61 :class:`~distro_tracker.core.tasks.schedulers.Scheduler` class that
62 will be used to decide when to run the task. This class
63 is meant to be overriden by a custom class deriving from a more
64 useful Scheduler class provided in
65 :mod:`distro_tracker.core.tasks.schedulers`.
66 """
67 pass
69 @classmethod
70 def task_name(cls):
71 """
72 Returns the name of the task. By default, it uses the class name
73 but this can be overriden by setting a :py:attr:`.NAME` attribute on the
74 class.
76 This name is used to uniquely identify the task, notably to store
77 its internal data into :class:`~distro_tracker.core.models.TaskData`.
79 :return: the name of the task
80 :rtype: str
81 """
82 if hasattr(cls, 'NAME'):
83 return cls.NAME
84 else:
85 return cls.__name__
87 def __init__(self, *args, **kwargs):
88 self.scheduler = self.Scheduler(self)
89 self.data_is_modified = False
90 self.event_handlers = {}
91 self.initialize(*args, **kwargs)
92 super().__init__()
94 def initialize(self, *args, **kwargs):
95 """
96 Process arguments passed to :meth:`__init__()`. Can be overriden
97 to do other runtime preparation.
99 For proper cooperation, you should usually call the method on the
100 object returned by ``super()`` (if it exists).
101 """
102 self.force_update = kwargs.get('force_update', False)
103 self.fake_update = kwargs.get('fake_update', False)
104 self.parameters = kwargs
106 # Call other implementations of the initialize method
107 super_object = super()
108 if super_object and hasattr(super_object, 'initialize'): 108 ↛ 109line 108 didn't jump to line 109, because the condition on line 108 was never true
109 super_object.initialize(*args, **kwargs)
111 @property
112 def data(self):
113 """
114 A data dictionnary that matches the corresponding
115 :class:`~distro_tracker.core.models.TaskData`. It is loaded from the
116 database on first access, and it's saved when you call
117 the :meth:`.save_data` method.
118 """
119 if not hasattr(self, '_data'):
120 self.refresh_data()
121 if self.data_is_modified is False:
122 self.data_is_modified = None
123 return self._data
125 def data_mark_modified(self):
126 """
127 Record the fact that the data dictionnary has been modified and will
128 have to be saved.
129 """
130 self.data_is_modified = True
132 @property
133 def task_data(self):
134 """
135 Returns the corresponding :class:`~distro_tracker.core.models.TaskData`.
136 """
137 if not hasattr(self, '_task_data'):
138 self.refresh_data()
139 return self._task_data
141 def save_data(self, **kwargs):
142 """
143 Save the :attr:`.data` attribute in the corresponding
144 :class:`~distro_tracker.core.models.TaskData` model in a way
145 that ensures that we don't overwrite any concurrent update.
147 :raises BaseTask.ConcurrentUpdateError: when the update is not possible
148 without risking to lose another update that happened in parallel.
149 """
150 kwargs['data'] = self._data
151 if not self._task_data.versioned_update(**kwargs):
152 raise self.ConcurrentDataUpdate(
153 'Data from task {} have been updated in parallel'.format(
154 self.task_name()))
155 self.data_is_modified = False
157 def refresh_data(self):
158 """
159 Load (or reload) task data from the database.
160 """
161 task_data, _ = TaskData.objects.get_or_create(
162 task_name=self.task_name())
163 self._data = task_data.data
164 self._task_data = task_data
166 def update_field(self, field, value):
167 """
168 Update a field of the associated TaskData with the given value
169 and save it to the database. None of the other fields are saved.
170 This update does not increase the version in the TaskData.
172 :param str field: The name of the field to update.
173 :param str value: The value to store in the field.
174 """
175 setattr(self.task_data, field, value)
176 self.task_data.save(update_fields=[field])
178 def update_last_attempted_run(self, value):
179 self.update_field('last_attempted_run', value)
181 def update_last_completed_run(self, value):
182 self.update_field('last_completed_run', value)
184 def update_task_is_pending(self, value):
185 self.update_field('task_is_pending', value)
187 @property
188 def task_is_pending(self):
189 return self.task_data.task_is_pending
191 @property
192 def last_attempted_run(self):
193 return self.task_data.last_attempted_run
195 @property
196 def last_completed_run(self):
197 return self.task_data.last_completed_run
199 def log(self, message, *args, **kwargs):
200 """Log a message about the progress of the task"""
201 if 'level' in kwargs:
202 level = kwargs['level']
203 del kwargs['level']
204 else:
205 level = logging.INFO
206 message = "{} {}".format(self.task_name(), message)
207 logger.log(level, message, *args, **kwargs)
209 @classmethod
210 def get_task_class_by_name(cls, task_name):
211 """
212 Returns a :class:`BaseTask` subclass which has the given name, i.e. its
213 :meth:`.task_name` method returns the ``task_name`` given in the
214 parameters.
216 :param str task_name: The name of the task which should be returned.
217 """
218 for task_class in cls.plugins:
219 if task_class.task_name() == task_name:
220 return task_class
221 return None
223 def schedule(self):
224 """
225 Asks the scheduler if the task needs to be executed. If yes, then
226 records this information in the ``task_is_pending`` field. If the task
227 is already marked as pending, then returns True immediately.
229 :return: True if the task needs to be executed, False otherwise.
230 :rtype: bool
231 """
232 if self.task_is_pending:
233 return True
234 if self.scheduler.needs_to_run():
235 self.update_task_is_pending(True)
236 return self.task_is_pending
238 def execute(self):
239 """
240 Performs the actual processing of the task.
242 First records the timestamp of the run, stores it in the
243 'last_attempted_run' field, then executes all the methods whose names
244 are starting with ``execute_``, then updates the 'last_completed_run'
245 field with the same timestamp (thus documenting the success of the last
246 run) and clears the 'task_is_pending' flag.
247 """
248 if not self.task_data.get_run_lock():
249 raise self.LockError('Could not get lock for task {}'.format(
250 self.task_name()))
252 try:
253 timestamp = now()
254 self.update_last_attempted_run(timestamp)
255 self.handle_event('execute-started')
256 call_methods_with_prefix(self, 'execute_')
257 self.handle_event('execute-finished')
258 except Exception:
259 self.handle_event('execute-failed')
260 raise
261 finally:
262 self.update_field('run_lock', None) 262 ↛ exitline 262 didn't except from function 'execute', because the raise on line 260 wasn't executed
264 if self.data_is_modified is True:
265 self.save_data()
266 elif self.data_is_modified is None:
267 checksum = get_data_checksum(self._data)
268 if checksum != self.task_data.data_checksum:
269 self.save_data(data_checksum=checksum)
271 self.update_last_completed_run(timestamp)
272 self.update_task_is_pending(False)
274 def register_event_handler(self, event, function):
275 """
276 Register a function to execute in response to a specific event.
278 There's no validation done on the event name. The following events are
279 known to be in use:
281 * execute-started (at the start of the execute method)
282 * execute-finished (at the end of the execute method, in case of
283 success)
284 * execute-failed (at the end of the execute method, in case of failure)
286 :param str event: the name of the event to handle
287 :param function: a function or any callable object
288 """
289 handlers = self.event_handlers.setdefault(event, [])
290 if function not in handlers:
291 handlers.append(function)
293 def handle_event(self, event, *args, **kwargs):
294 """
295 This method is called at various places (with different values passed
296 to the event parameter) and is a way to let sub-classes, mixins, and
297 users add their own behaviour.
299 :param str event: a string describing the event that happened
300 """
301 for function in self.event_handlers.get(event, []):
302 function(*args, **kwargs)
304 def lock_expires_soon(self, delay=600):
305 """
306 :param int delay: The number of seconds allowed before the lock is
307 considered to expire soon.
308 :return: True if the lock is about to expire in the given delay. Returns
309 False otherwise.
310 :rtype: bool
311 """
312 if self.task_data.run_lock is None:
313 return False
314 return self.task_data.run_lock <= now() + timedelta(seconds=delay)
316 def extend_lock(self, delay=1800, expire_delay=600):
317 """
318 Extends the duration of the lock with the given `delay` if it's
319 about to expire soon (as defined by the `expire_delay` parameter).
321 :param int expire_delay: The number of seconds allowed before the lock
322 is considered to expire soon.
323 :param int delay: The number of seconds to add the expiration time of
324 the lock.
325 """
326 if self.lock_expires_soon(delay=expire_delay):
327 self.task_data.extend_run_lock(delay=delay)
328 return True
329 return False
332def import_all_tasks():
333 """
334 Imports tasks found in each installed app's ``tracker_tasks`` module.
335 """
336 for app in settings.INSTALLED_APPS:
337 try:
338 module_name = app + '.' + 'tracker_tasks'
339 importlib.import_module(module_name)
340 except ImportError:
341 # The app does not implement Distro Tracker tasks.
342 pass
343 # This one is an exception, many core tasks are there
344 import distro_tracker.core.retrieve_data # noqa
347def run_task(task, *args, **kwargs):
348 """
349 Executes the requested task.
351 :param task: The task which should be run. Either the class object
352 of the task, or a string giving the task's name, or the task object
353 itself.
354 :type task: :class:`BaseTask` subclass or :class:`str`
356 :returns: True is the task executed without errors, False if it raised
357 an exception during its execution.
358 """
359 # Import tasks implemented by all installed apps
360 import_all_tasks()
362 task_class = None
363 if isinstance(task, str):
364 task_name = task
365 task_class = BaseTask.get_task_class_by_name(task_name)
366 if not task_class:
367 raise ValueError("Task '%s' doesn't exist." % task_name)
368 task = task_class(*args, **kwargs)
369 elif isinstance(task, BaseTask):
370 pass
371 elif callable(task) and hasattr(task, 'execute'):
372 task_class = task
373 task = task_class(*args, **kwargs)
374 else:
375 raise ValueError("Can't run a task with a '{}'.".format(repr(task)))
376 logger.info("Starting task %s", task.task_name())
377 try:
378 task.execute()
379 except task.LockError:
380 logger.info("Task %s has been skipped due to its run lock",
381 task.task_name())
382 return False
383 except Exception:
384 logger.exception("Task %s failed with the following traceback.",
385 task.task_name())
386 return False
387 logger.info("Completed task %s", task.task_name())
388 return True
391def build_all_tasks(*args, **kwargs):
392 """
393 Builds all the task objects out of the BaseTask sub-classes registered.
395 :returns: a dict mapping the task name to the corresponding Task instance.
396 :rtype dict:
397 :raises ValueError: if multiple tasks have the same name.
398 """
399 import_all_tasks()
400 tasks = {}
401 for task_class in BaseTask.plugins:
402 task_name = task_class.task_name()
403 if task_name in tasks:
404 raise ValueError("Multiple tasks with the same name: {}".format(
405 task_name))
406 tasks[task_class.task_name()] = task_class(*args, **kwargs)
407 return tasks
410def run_all_tasks(*args, **kwargs):
411 """
412 Builds all task and then iterates over them to check if they need
413 to be scheduled. If yes, then executes them with :func:`run_task`.
415 The special task "UpdateRepositoriesTask" is always executed
416 first. The execution order of the other tasks is undetermined.
417 """
418 tasks = build_all_tasks(*args, **kwargs)
420 for task in tasks.values():
421 task.schedule()
423 if 'UpdateRepositoriesTask' in tasks: 423 ↛ 428line 423 didn't jump to line 428, because the condition on line 423 was never false
424 task = tasks.pop('UpdateRepositoriesTask')
425 if task.task_is_pending:
426 run_task(task)
428 for task in tasks.values():
429 if task.task_is_pending:
430 run_task(task)