Coverage for distro_tracker/core/tasks/schedulers.py: 100%

19 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2025-01-12 09:15 +0000

1# Copyright 2018 The Distro Tracker Developers 

2# See the COPYRIGHT file at the top-level directory of this distribution and 

3# at https://deb.li/DTAuthors 

4# 

5# This file is part of Distro Tracker. It is subject to the license terms 

6# in the LICENSE file found in the top-level directory of this 

7# distribution and at https://deb.li/DTLicense. No part of Distro Tracker, 

8# including this file, may be copied, modified, propagated, or distributed 

9# except according to the terms contained in the LICENSE file. 

10""" 

11Task schedulers. 

12 

13A task scheduler is tied to a task and is able to answer the question 

14whether the task needs to run or not. 

15""" 

16import logging 

17from datetime import timedelta 

18 

19from distro_tracker.core.utils import now 

20 

21logger = logging.getLogger('distro_tracker.tasks') 

22 

23 

24class Scheduler(object): 

25 """ 

26 Base class of all schedulers. 

27 

28 It doesn't implement any logic, it just always responds True to 

29 the :meth:`.needs_to_run` query. 

30 """ 

31 

32 def __init__(self, task): 

33 self.task = task 

34 

35 def needs_to_run(self): 

36 """ 

37 Checks whether the associated task needs to run. 

38 

39 :return: True if the task needs to run, False otherwise 

40 :rtype: bool 

41 """ 

42 return True 

43 

44 

45class IntervalScheduler(Scheduler): 

46 """ 

47 An IntervalScheduler runs the task at a regular interval. The interval 

48 must be specified in the :attr:`interval` class attribute of any sub-class. 

49 """ 

50 

51 def get_interval(self): 

52 """ 

53 Returns the interval between two runs in seconds. 

54 

55 :raises ValueError: when the :attr:`.interval` attribute is not parsable 

56 :return: the interval in seconds 

57 :rtype: int 

58 """ 

59 return int(self.interval) 

60 

61 def needs_to_run(self): 

62 last_try = self.task.last_attempted_run 

63 if last_try is None: 

64 return True 

65 next_try = last_try + timedelta(seconds=self.get_interval()) 

66 return now() >= next_try