Package flumotion :: Package component :: Package base :: Module scheduler
[hide private]

Source Code for Module flumotion.component.base.scheduler

  1  # -*- test-case-name: flumotion.test.test_component_base_scheduler -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import time 
 23  import datetime 
 24   
 25  from twisted.internet import reactor 
 26   
 27  from flumotion.common import log, eventcalendar 
 28  from flumotion.component.base import watcher 
 29   
 30  __version__ = "$Rev: 7587 $" 
 31   
 32   
33 -def _timedeltaToSeconds(td):
34 return max(td.days * 24 * 60 * 60 + td.seconds + td.microseconds / 1e6, 0)
35 36
37 -class Scheduler(log.Loggable):
38 """ 39 I provide notifications when events start and end. 40 I use a L{eventcalendar.Calendar} for scheduling. 41 42 @cvar windowSize: how much time to look ahead when scheduling 43 @type windowSize: L{datetime.timedelta} 44 """ 45 windowSize = datetime.timedelta(days=1) 46
47 - def __init__(self):
48 self._delayedCall = None # tracks next call for scheduling 49 self._subscribeId = 0 # counter fo unique sid's 50 self._subscribers = {} # sid -> tuple of callable 51 self._nextStart = 0 # only used in testsuite 52 self._calendar = None # our currently active calendar
53 54 ### public API 55
56 - def getCalendar(self):
57 """ 58 Return the calendar used for scheduling. 59 60 @rtype: L{eventcalendar.Calendar} 61 """ 62 return self._calendar
63
64 - def setCalendar(self, calendar, when=None):
65 """ 66 Set the given calendar to use for scheduling. 67 68 This function will send start notifications for all new events that 69 should currently be in progress, if they were not registered in 70 the old calendar or if there was no old calendar. 71 72 If the scheduler previously had a calendar, it will send end 73 notifications for all events currently in progress that are not in the 74 new calendar. 75 76 @param calendar: the new calendar to set 77 @type calendar: L{eventcalendar.Calendar} 78 @param when: the time at which to consider the calendar to be set; 79 defaults to now 80 @type when: L{datetime.datetime} 81 """ 82 if not self._calendar: 83 self.debug('Setting new calendar %r', calendar) 84 else: 85 self.debug('Replacing existing calendar %r with new %r', 86 self._calendar, calendar) 87 88 # we want to make sure we use the same when for getting old and new 89 # instances if it wasn't specified 90 if not when: 91 when = datetime.datetime.now(eventcalendar.UTC) 92 93 # FIXME: convert Content lists to dicts to speed things up 94 # because they are used as a lookup inside loops 95 oldInstances = [] 96 if self._calendar: 97 oldInstances = self._calendar.getActiveEventInstances(when) 98 oldInstancesContent = [i.event.content for i in oldInstances] 99 100 newInstances = calendar.getActiveEventInstances(when) 101 newInstancesContent = [i.event.content for i in newInstances] 102 103 # we do comparison of instances by content, since, while the timing 104 # information may have changed, if the content is still the same, 105 # then the event is still considered 'active' 106 for instance in oldInstances: 107 if instance.event.content not in newInstancesContent: 108 self.debug( 109 'old active %r for %r not in new calendar, ending', 110 instance, instance.event.content) 111 self._eventInstanceEnded(instance) 112 113 for instance in newInstances: 114 if instance.event.content not in oldInstancesContent: 115 self.debug( 116 'new active %r for %r not in old calendar, starting', 117 instance, instance.event.content) 118 self._eventInstanceStarted(instance) 119 120 self._calendar = calendar 121 self._reschedule()
122
123 - def getPoints(self, when=None):
124 """ 125 Get all points on this scheduler's event horizon. 126 """ 127 if not when: 128 when = datetime.datetime.now(eventcalendar.LOCAL) 129 130 self.debug('getPoints at %s', str(when)) 131 earliest = when + self.windowSize 132 133 points = self._calendar.getPoints(when, self.windowSize) 134 135 self.debug('%d points in given windowsize %s', 136 len(points), str(self.windowSize)) 137 138 return points
139
140 - def cleanup(self):
141 """ 142 Clean up all resources used by this scheduler. 143 144 This cancels all pending scheduling calls. 145 """ 146 self._cancelScheduledCalls()
147 148 ### subscription interface 149
150 - def subscribe(self, eventInstanceStarted, eventInstanceEnded):
151 """ 152 Subscribe to event happenings in the scheduler. 153 154 @param eventInstanceStarted: function that will be called when an 155 event instance starts 156 @type eventInstanceStarted: function with signature L{EventInstance} 157 @param eventInstanceEnded: function that will be called when an 158 event instance ends 159 @type eventInstanceEnded: function with signature L{EventInstance} 160 161 @rtype: int 162 @returns: A subscription ID that can later be passed to 163 unsubscribe(). 164 """ 165 sid = self._subscribeId 166 self._subscribeId += 1 167 self._subscribers[sid] = (eventInstanceStarted, eventInstanceEnded) 168 return sid
169
170 - def unsubscribe(self, id):
171 """ 172 Unsubscribe from event happenings in the scheduler. 173 174 @type id: int 175 @param id: Subscription ID received from subscribe() 176 """ 177 del self._subscribers[id]
178
179 - def _eventInstanceStarted(self, eventInstance):
180 self.debug('notifying %d subscribers of start of instance %r', 181 len(self._subscribers), eventInstance) 182 for started, _ in self._subscribers.values(): 183 started(eventInstance)
184
185 - def _eventInstanceEnded(self, eventInstance):
186 self.debug('notifying %d subscribers of end of instance %r', 187 len(self._subscribers), eventInstance) 188 for _, ended in self._subscribers.values(): 189 ended(eventInstance)
190 191 ### private API 192
193 - def _reschedule(self):
194 195 start = time.time() 196 197 self.debug("reschedule events") 198 self._cancelScheduledCalls() 199 200 now = datetime.datetime.now(eventcalendar.LOCAL) 201 202 def _getNextPoints(): 203 # get the next list of points in time that all start at the same 204 # time 205 self.debug('_getNextPoints at %s', str(now)) 206 result = [] 207 208 points = self.getPoints(now) 209 210 if not points: 211 return result 212 213 earliest = points[0].dt 214 for point in points: 215 if point.dt > earliest: 216 break 217 result.append(point) 218 219 if result: 220 self.debug('%d points at %s, first point is for %r', 221 len(result), str(result[0].dt), 222 result[0].eventInstance.event.content) 223 224 return result
225 226 def _handlePoints(points): 227 for point in points: 228 self.debug( 229 "handle %s event %r in %s at %s", 230 point.which, 231 point.eventInstance.event.content, 232 str(point.dt - now), 233 point.dt) 234 if point.which == 'start': 235 self._eventInstanceStarted(point.eventInstance) 236 elif point.which == 'end': 237 self._eventInstanceEnded(point.eventInstance) 238 239 self._reschedule()
240 241 points = _getNextPoints() 242 243 if points: 244 seconds = _timedeltaToSeconds(points[0].dt - now) 245 self.debug( 246 "schedule next point at %s in %.2f seconds", 247 str(points[0].dt), seconds) 248 dc = reactor.callLater(seconds, _handlePoints, points) 249 250 else: 251 self.debug( 252 "schedule rescheduling in %s", str(self.windowSize / 2)) 253 seconds = _timedeltaToSeconds(self.windowSize / 2) 254 dc = reactor.callLater(seconds, self._reschedule) 255 self._nextStart = seconds 256 self._delayedCall = dc 257 258 delta = time.time() - start 259 if delta < 0.5: 260 self.debug('_reschedule took %.3f seconds', delta) 261 else: 262 self.warning('Rescheduling took more than half a second') 263
264 - def _cancelScheduledCalls(self):
265 if self._delayedCall: 266 if self._delayedCall.active(): 267 self._delayedCall.cancel() 268 self._delayedCall = None
269 270
271 -class ICalScheduler(Scheduler):
272 273 watcher = None 274 275 # FIXME: having fileObj in the constructor causes events to be sent 276 # before anything can subscribe 277 # FIXME: this class should also be able to handle watching a URL 278 # and downloading it when it changes 279
280 - def __init__(self, fileObj):
281 """ 282 I am a scheduler that takes its data from an ical file and watches 283 that file every timeout. 284 285 @param fileObj: The fileObj. It must be already opened. 286 @type fileObj: file handle 287 """ 288 Scheduler.__init__(self) 289 290 self.watcher = None 291 292 if not fileObj: 293 return 294 295 self._parseFromFile(fileObj) 296 297 if hasattr(fileObj, 'name'): 298 299 def fileChanged(filename): 300 self.info("ics file %s changed", filename) 301 self._parseFromFile(open(filename, 'r'))
302 303 self.watcher = watcher.FilesWatcher([fileObj.name]) 304 fileObj.close() 305 self.watcher.subscribe(fileChanged=fileChanged) 306 self.watcher.start()
307
308 - def stopWatchingIcalFile(self):
309 """ 310 Stop watching the ical file. 311 """ 312 if self.watcher: 313 self.watcher.stop()
314
315 - def cleanup(self):
316 Scheduler.cleanup(self) 317 self.stopWatchingIcalFile()
318
319 - def _parseFromFile(self, f):
320 calendar = eventcalendar.fromFile(f) 321 self.setCalendar(calendar)
322