Source code for GLXBob.MainLoop

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import logging
from GLXBob import Timer
from GLXBob import EventBus
from GLXBob import EventKey
from random import randint
from time import sleep
import sys
import os

# It script it publish under GNU GENERAL PUBLIC LICENSE
# http://www.gnu.org/licenses/gpl-3.0.en.html
# Author: Tuuux <tuxa at rtnp dot org> all rights reserved


[docs]class Signal(Exception): """ Generic Signal exception for Galaxie-BoB The tips consist to use the Exception module as class parent """ def __init__(self, msg, original_exception, callback=None): super(Signal, self).__init__(msg + (": %s" % original_exception)) self.original_exception = original_exception # Quit Message if msg == 'QUIT': try: sys.stdout.write('\rException: ' + str(self.original_exception) + '\n') sys.stdout.flush() except IOError: pass if callback is None: return else: callback() elif msg == 'KEY': sys.stdout.write('\rException: ' + str(self.original_exception) + '\n') sys.stdout.flush() elif msg == 'EVENT': sys.stdout.write('\rException: ' + str(self.original_exception) + '\n') sys.stdout.flush() elif msg == 'TIMER': sys.stdout.write('\rException: ' + str(self.original_exception) + '\n') sys.stdout.flush()
[docs]class Singleton(type): def __init__(cls, name, bases, dictionary): super(Singleton, cls).__init__(name, bases, dictionary) cls.instance = None def __call__(cls, *args, **kw): if cls.instance is None: cls.instance = super(Singleton, cls).__call__(*args) return cls.instance
[docs]class MainLoop(object): """ :Description: The :class:`MainLoop <GLXBob.MainLoop.MainLoop>` object is a Alderson loop , it's something close to a infinity loop but with a :func:`MainLoop.run() <GLXBob.MainLoop.MainLoop.run()>` and :func:`MainLoop.quit() <GLXBob.MainLoop.MainLoop.quit()>` method's. The :class:`MainLoop <GLXBob.MainLoop.MainLoop>` make it work and take a adaptive sleep for impose a global Frame Rate. Default: 25 That loop , should be a low power consumption, that was our target for the beginning. Feature: * Alderson loop with **run** and **quit** method's * Don't use 100% of CPU Time * Frame Per Second with adaptive limitation * Limitation will be apply with a knee (percentage) it depend of the **Event list** size To Do: * The **Event list** size should control interact with Frame Rate Limitation """ # http://code.activestate.com/recipes/579053-high-precision-fps/ __metaclass__ = Singleton def __init__(self): """ :Property's Details: .. py:data:: is_running It is running or not +---------------+-------------------------------+ | Type | :py:data:`bool` | +---------------+-------------------------------+ | Flags | Read / Write | +---------------+-------------------------------+ | Default value | False | +---------------+-------------------------------+ .. py:data:: timer The GLXBob.Timer() object is stored on that property +---------------+-------------------------------+ | Type | :py:data:`GLXBob.Timer()` | +---------------+-------------------------------+ | Flags | Read / Write | +---------------+-------------------------------+ | Default value | :py:data:`GLXBob.Timer()` | +---------------+-------------------------------+ """ self.__is_running = False self.__timer = Timer() self.__event_bus = EventBus() self.__event_key = EventKey() self.pid = None
[docs] def is_running(self): """ Checks if the :class:`MainLoop <GLXBob.MainLoop.MainLoop>` is currently being run via :func:`MainLoop.run() <GLXBob.MainLoop.MainLoop.run()>` method. :return: :py:obj:`True` if the :class:`MainLoop <GLXBob.MainLoop.MainLoop>` is currently being run. :rtype: bool """ return self.__is_running
[docs] def run(self): """ Run the :class:`MainLoop <GLXBob.MainLoop.MainLoop>` until :func:`MainLoop.quit() <GLXBob.MainLoop.MainLoop.quit()>` is called on the loop. If this is called for the thread of the loop's, it will process events from the loop, otherwise it will simply wait. """ self._set_is_running(True) logging.info(self.__class__.__name__ + ': Starting ...') self._run()
[docs] def quit(self): """ Stops the :class:`MainLoop <GLXBob.MainLoop.MainLoop>` from running. Any calls to :func:`MainLoop.quit() <GLXBob.MainLoop.MainLoop.quit()>` for the loop will return. Note that sources that have already been dispatched when :func:`MainLoop.quit() <GLXBob.MainLoop.MainLoop.quit()>` is called will still be executed. A :func:`MainLoop.quit() <GLXBob.MainLoop.MainLoop.quit()>` call will certainly cause the end of you programme. """ self.__event_key.destroy() self._set_is_running(False) if self.pid != 0: msg = self.__class__.__name__ + ': Children stopping ...' print_something(msg) logging.info(msg) os._exit(0) else: msg = self.__class__.__name__ + ': Stopping ...' print_something(msg) logging.info(msg)
[docs] def set_timer(self, timer=None): """ Set the :py:obj:`timer` property. Buy default the :class:`MainLoop <GLXBob.MainLoop.MainLoop>` class initialization will create automatically a :class:`Timer <GLXBob.Timer.Timer>` object then store it in the :py:obj:`timer` property. You can set you own :class:`Timer <GLXBob.Timer.Timer>` object with it method. :param timer: a object initialize by you self or :py:obj:`None` for a self created :class:`Timer <GLXBob.Timer.Timer>` :type timer: GLXBob.Timer """ if timer is None: timer = Timer() self.__timer = timer
[docs] def get_timer(self): """ Return the timer property value :return: a object :class:`Timer <GLXBob.Timer.Timer>` ready to be requested :rtype: GLXBob.Timer """ return self.__timer
[docs] def set_event_bus(self, event_bus=None): """ Set the :py:obj:`event_bus` property. By default the :class:`MainLoop <GLXBob.MainLoop.MainLoop>` class initialization will create automatically a :class:`EventBus <GLXBob.EventBus.EventBus>` object then store it in the :py:obj:`event_bus` property. You can set you own :class:`EventBus <GLXBob.EventBus.EventBus>` object with it method. That :py:obj:`event_bus` property object is available via :func:`MainLoop.get_event_bus() <GLXBob.MainLoop.MainLoop.get_event_bus()>` :param event_bus: a object initialize by you self or :py:obj:`None` for a self created :class:`EventBus <GLXBob.EventBus.EventBus>` :type event_bus: GLXBob.EventBus """ if event_bus is None: event_bus = EventBus() self.__event_bus = event_bus
[docs] def get_event_bus(self): """ Return the event_bus property value :return: a object :class:`EventBus <GLXBob.EventBus.eventBus>` ready to be requested :rtype: GLXBob.EventBus """ return self.__event_bus
# Internal Method's def _set_is_running(self, boolean): """ Set the __is_running attribute :param boolean: True or False :type boolean: bool :raise TypeError: if ``boolean`` parameter is not a :py:data:`bool` type """ if type(boolean) != bool: raise TypeError(u'>boolean< parameter must be a bool type') self.__is_running = boolean def _get_is_running(self): """ Get the __is_running attribute value :return: :py:obj:`True` if the :class:`MainLoop <GLXBob.MainLoop.MainLoop>` is running or :py:obj:`False` if not. :rtype: bool """ return self.__is_running def _handle_curses_input(self, event): print_something('[KEY ]-> {0}'.format(self.__event_key.get_keyname(event))) print_something('[CHAR]-> {0}'.format(event)) if self.__event_key.get_keyname(event) == 'q': self.get_event_bus().emit('QUIT') def _handle_event(self): # noinspection PyBroadException try: event = self._pop_last_event() while event: # If it have event dispatch it Application().dispatch(event[0], event[1]) # Delete the last event inside teh event list event = self._pop_last_event() except: pass def _run(self): self.pid = os.fork() while self.is_running(): try: if self.pid: # That is the Parent Process input_event = self.__event_key.get_ch() if input_event != -1: self._handle_curses_input(input_event) else: # That is the Children Process # Must be the first starting_time = self.get_timer().get_time() # Do stuff that might take significant time here # sleep_for = 1.0 / randint(1, randint(2, 500)) # sleep_for = 1.0 / randint(40, randint(41, 500)) # sleep_for = 1.0 / randint(1, 25) # sleep_for = 1.0 / randint(50, 200) # sleep(sleep_for) # Finally # Timer control if self.get_timer().tick(): print_something('[ OK ]-> {1} fps, iteration take {0} sec'.format( self.get_timer().get_time() - starting_time, self.get_timer().get_fps() )) else: print_something('[ ]-> {1} fps, iteration take {0} sec'.format( self.get_timer().get_time() - starting_time, self.get_timer().get_fps() )) except KeyboardInterrupt: Signal("QUIT", KeyboardInterrupt, self.quit) except MemoryError: logging.info(self.__class__.__name__ + ': MemoryError Stopping ...') self._set_is_running(False) break