Since the days were getting shorter, I thought it was about time to set up the wakeup light in my new apartment. This got me looking at the code and wondering about improvements. One of the first things I wanted to fix was the ugly way it handled fading the light by setting a new intensity once per second:
while(1): client.loop() # Do next fade step if needed if (rgb.fadeState != 0): if (time() - lastTime >= rgb.fadeTickSeconds): lastTime = time() rgb.fade()
At work, I have spent some time programming with TI-RTOS lately. Like any RTOS, it has the ability to set up periodic tasks. TI-RTOS calls them clock objects and the idea is pretty simple; you assign the clock object a period and a callback function and just start it. Clearly there should be a way to do this with a single line of Python? Not that I could find! There is one way to do it, apparently, in something called Celery. Some comments on Stack Overflow convinced me that it is way overkill for what I am trying to achieve though, requiring setting up ques and handlers and stuff.
Python does provide the event scheduler sched. It can schedule a single event to happen some time in the future. Another Stack Overflow comment pointed out you can use sched to recursively schedule new events and thus get a periodic scheduler:
def periodic(scheduler, interval, action, actionargs=()): scheduler.enter(interval, 1, periodic, (scheduler, interval, action, actionargs)) action(*actionargs)
I went with this idea and added a few features to it. First, I wanted the task to be able to stop itself if it wants to. Second, it should also be possible to stop the task from the callers context. Third, I wanted to be able to schedule only a specific number of periodic events. Fourth, I wanted it to be non-blocking. Sched can run in a non-blocking mode since Python 3.3, but I was using 2.7 so I resorted to wrapping the scheduler in its own thread instead. Here it is (only 44 lines!):
import sched import time import thread import threading # Periodic task scheduler class periodicTask: def __init__(self, periodSec, task, taskArguments = (), repetitions = 0): self.thread = threading.Thread(target = self.threadEntryPoint, args = (periodSec, task, taskArguments, repetitions)) self.thread.start() def threadEntryPoint(self, periodSec, task, taskArguments, repetitions): self.repetitions = repetitions self.scheduler = sched.scheduler(time.time, time.sleep) self.periodic(self.scheduler, periodSec, task, taskArguments) self.scheduler.run() def periodic(self, scheduler, delaySec, task, taskArguments): # Schedule another recursive event self.nextEvent = self.scheduler.enter(delaySec, 1, self.periodic, (self.scheduler, delaySec, task, taskArguments)) # Do task and get return status stopTask = task(*taskArguments) # Stop task if it returned true if (stopTask == True): self.stop self.repetitions = self.repetitions - 1 # Stop if we ran through all repetitions # If repetitions was initialized to 0, run forever # (or at least until integer underflow?) if (self.repetitions == 0): self.stop() def stop(self): self.scheduler.cancel(self.nextEvent) thread.exit()
Now I can define a task and then run it three times with a 1 second interval in a single line of code(!):
def testTask(): print "Test" return False myTask = periodicTask(periodSec = 1, task = testTask, repetitions = 3)
I thought this might come in handy for other projects, so I put it in a separate GitHub project. The next step will be to include the scheduler in the wakeup light code. It will probably require adding some more functionality, like separating creating the task and starting it so that it can be re-used, we’ll see.
Leave a Reply