Periodic Python Scheduler

Since the days were getting  shorter, I thought it was about time to set up the wakeup light in my new apartment. This got me looking at the code and wondering about improvements. One of the first things I wanted to fix was the ugly way it handled fading the light by setting a new intensity once per second:

while(1):
    client.loop()

    # Do next fade step if needed
    if (rgb.fadeState != 0):
        if (time() - lastTime >= rgb.fadeTickSeconds):
            lastTime = time()
            rgb.fade()

At work, I have spent some time programming with TI-RTOS lately. Like any RTOS, it has the ability to set up periodic tasks. TI-RTOS calls them clock objects and the idea is pretty simple; you assign the clock object a period and a callback function and just start it. Clearly there should be a way to do this with a single line of Python? Not that I could find! There is one way to do it, apparently, in something called Celery. Some comments on Stack Overflow convinced me that it is way overkill for what I am trying to achieve though, requiring setting up ques and handlers and stuff.

Python does provide the event scheduler sched. It can schedule a single event to happen some time in the future. Another Stack Overflow comment pointed out you can use sched to recursively schedule new events and thus get a periodic scheduler:

def periodic(scheduler, interval, action, actionargs=()):
  scheduler.enter(interval, 1, periodic,
                  (scheduler, interval, action, actionargs))
  action(*actionargs)

I went with this idea and added a few features to it. First, I wanted the task to be able to stop itself if it wants to. Second, it should also be possible to stop the task from the callers context. Third, I wanted to be able to schedule only a specific number of periodic events. Fourth, I wanted it to be non-blocking. Sched can run in a non-blocking mode since Python 3.3, but I was using 2.7 so I resorted to wrapping the scheduler in its own thread instead. Here it is (only 44 lines!):

import sched
import time
import thread
import threading

# Periodic task scheduler
class periodicTask:

	def __init__(self, periodSec, task, taskArguments = (), repetitions = 0):
		self.thread = threading.Thread(target = self.threadEntryPoint, 
			args = (periodSec, task, taskArguments, repetitions))
		self.thread.start()

	def threadEntryPoint(self, periodSec, task, taskArguments, repetitions):
		self.repetitions = repetitions
		self.scheduler = sched.scheduler(time.time, time.sleep)
		self.periodic(self.scheduler, periodSec, task, taskArguments)
		self.scheduler.run()

	def periodic(self, scheduler, delaySec, task, taskArguments):
	  	# Schedule another recursive event
	 	self.nextEvent = self.scheduler.enter(delaySec, 
	 		1,
	 		self.periodic,
	 		(self.scheduler, delaySec, task, taskArguments))

	 	# Do task and get return status
		stopTask = task(*taskArguments)

		# Stop task if it returned true
	 	if (stopTask == True):
			self.stop

		self.repetitions = self.repetitions - 1

		# Stop if we ran through all repetitions
		# If repetitions was initialized to 0, run forever 
		# (or at least until integer underflow?)
		if (self.repetitions == 0):
			self.stop()

	def stop(self):
		self.scheduler.cancel(self.nextEvent)
		thread.exit()

Now I can define a task and then run it three times with a 1 second interval in a single line of code(!):

def testTask():
	print "Test"
	return False

myTask = periodicTask(periodSec = 1, task = testTask, repetitions = 3)

I thought this might come in handy for other projects, so I put it in a separate GitHub project. The next step will be to include the scheduler in the wakeup light code. It will probably require adding some more functionality, like separating creating the task and starting it so that it can be re-used, we’ll see.


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *

Are you a robot? * Time limit is exhausted. Please reload CAPTCHA.

This site uses Akismet to reduce spam. Learn how your comment data is processed.