Source code for GLXBob.MainLoop
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
from GLXBob import Timer
from GLXBob import EventBus
from GLXBob import EventKey
from random import randint
from time import sleep
import sys
import os
# It script it publish under GNU GENERAL PUBLIC LICENSE
# http://www.gnu.org/licenses/gpl-3.0.en.html
# Author: Tuuux <tuxa at rtnp dot org> all rights reserved
[docs]class Signal(Exception):
"""
Generic Signal exception for Galaxie-BoB
The tips consist to use the Exception module as class parent
"""
def __init__(self, msg, original_exception, callback=None):
super(Signal, self).__init__(msg + (": %s" % original_exception))
self.original_exception = original_exception
# Quit Message
if msg == 'QUIT':
try:
sys.stdout.write('\rException: ' + str(self.original_exception) + '\n')
sys.stdout.flush()
except IOError:
pass
if callback is None:
return
else:
callback()
elif msg == 'KEY':
sys.stdout.write('\rException: ' + str(self.original_exception) + '\n')
sys.stdout.flush()
elif msg == 'EVENT':
sys.stdout.write('\rException: ' + str(self.original_exception) + '\n')
sys.stdout.flush()
elif msg == 'TIMER':
sys.stdout.write('\rException: ' + str(self.original_exception) + '\n')
sys.stdout.flush()
[docs]class Singleton(type):
def __init__(cls, name, bases, dictionary):
super(Singleton, cls).__init__(name, bases, dictionary)
cls.instance = None
def __call__(cls, *args, **kw):
if cls.instance is None:
cls.instance = super(Singleton, cls).__call__(*args)
return cls.instance
[docs]def print_something(msg):
sys.stdout.write('\r')
sys.stdout.write(msg)
sys.stdout.write('\n')
sys.stdout.flush()
[docs]class MainLoop(object):
"""
:Description:
The :class:`MainLoop <GLXBob.MainLoop.MainLoop>` object is a Alderson loop , it's something close to a
infinity loop but with a :func:`MainLoop.run() <GLXBob.MainLoop.MainLoop.run()>` and
:func:`MainLoop.quit() <GLXBob.MainLoop.MainLoop.quit()>` method's.
The :class:`MainLoop <GLXBob.MainLoop.MainLoop>` make it work and take a adaptive sleep for impose a
global Frame Rate. Default: 25
That loop , should be a low power consumption, that was our target for the beginning.
Feature:
* Alderson loop with **run** and **quit** method's
* Don't use 100% of CPU Time
* Frame Per Second with adaptive limitation
* Limitation will be apply with a knee (percentage) it depend of the **Event list** size
To Do:
* The **Event list** size should control interact with Frame Rate Limitation
"""
# http://code.activestate.com/recipes/579053-high-precision-fps/
__metaclass__ = Singleton
def __init__(self):
"""
:Property's Details:
.. py:data:: is_running
It is running or not
+---------------+-------------------------------+
| Type | :py:data:`bool` |
+---------------+-------------------------------+
| Flags | Read / Write |
+---------------+-------------------------------+
| Default value | False |
+---------------+-------------------------------+
.. py:data:: timer
The GLXBob.Timer() object is stored on that property
+---------------+-------------------------------+
| Type | :py:data:`GLXBob.Timer()` |
+---------------+-------------------------------+
| Flags | Read / Write |
+---------------+-------------------------------+
| Default value | :py:data:`GLXBob.Timer()` |
+---------------+-------------------------------+
"""
self.__is_running = False
self.__timer = Timer()
self.__event_bus = EventBus()
self.__event_key = EventKey()
self.pid = None
[docs] def is_running(self):
"""
Checks if the :class:`MainLoop <GLXBob.MainLoop.MainLoop>` is currently being run via
:func:`MainLoop.run() <GLXBob.MainLoop.MainLoop.run()>` method.
:return: :py:obj:`True` if the :class:`MainLoop <GLXBob.MainLoop.MainLoop>` is currently being run.
:rtype: bool
"""
return self.__is_running
[docs] def run(self):
"""
Run the :class:`MainLoop <GLXBob.MainLoop.MainLoop>` until
:func:`MainLoop.quit() <GLXBob.MainLoop.MainLoop.quit()>` is called on the loop.
If this is called for the thread of the loop's, it will process events from the loop,
otherwise it will simply wait.
"""
self._set_is_running(True)
logging.info(self.__class__.__name__ + ': Starting ...')
self._run()
[docs] def quit(self):
"""
Stops the :class:`MainLoop <GLXBob.MainLoop.MainLoop>` from running. Any calls to
:func:`MainLoop.quit() <GLXBob.MainLoop.MainLoop.quit()>` for the loop will return.
Note that sources that have already been dispatched when
:func:`MainLoop.quit() <GLXBob.MainLoop.MainLoop.quit()>` is called will still be executed.
A :func:`MainLoop.quit() <GLXBob.MainLoop.MainLoop.quit()>` call will certainly cause the end
of you programme.
"""
self.__event_key.destroy()
self._set_is_running(False)
if self.pid != 0:
msg = self.__class__.__name__ + ': Children stopping ...'
print_something(msg)
logging.info(msg)
os._exit(0)
else:
msg = self.__class__.__name__ + ': Stopping ...'
print_something(msg)
logging.info(msg)
[docs] def set_timer(self, timer=None):
"""
Set the :py:obj:`timer` property.
Buy default the :class:`MainLoop <GLXBob.MainLoop.MainLoop>` class initialization will create
automatically a :class:`Timer <GLXBob.Timer.Timer>` object then store it in the :py:obj:`timer` property.
You can set you own :class:`Timer <GLXBob.Timer.Timer>` object with it method.
:param timer: a object initialize by you self or :py:obj:`None` for a self created
:class:`Timer <GLXBob.Timer.Timer>`
:type timer: GLXBob.Timer
"""
if timer is None:
timer = Timer()
self.__timer = timer
[docs] def get_timer(self):
"""
Return the timer property value
:return: a object :class:`Timer <GLXBob.Timer.Timer>` ready to be requested
:rtype: GLXBob.Timer
"""
return self.__timer
[docs] def set_event_bus(self, event_bus=None):
"""
Set the :py:obj:`event_bus` property.
By default the :class:`MainLoop <GLXBob.MainLoop.MainLoop>` class initialization will create
automatically a :class:`EventBus <GLXBob.EventBus.EventBus>` object then store it in the
:py:obj:`event_bus` property.
You can set you own :class:`EventBus <GLXBob.EventBus.EventBus>` object with it method.
That :py:obj:`event_bus` property object is available via
:func:`MainLoop.get_event_bus() <GLXBob.MainLoop.MainLoop.get_event_bus()>`
:param event_bus: a object initialize by you self or :py:obj:`None` for a self created
:class:`EventBus <GLXBob.EventBus.EventBus>`
:type event_bus: GLXBob.EventBus
"""
if event_bus is None:
event_bus = EventBus()
self.__event_bus = event_bus
[docs] def get_event_bus(self):
"""
Return the event_bus property value
:return: a object :class:`EventBus <GLXBob.EventBus.eventBus>` ready to be requested
:rtype: GLXBob.EventBus
"""
return self.__event_bus
# Internal Method's
def _set_is_running(self, boolean):
"""
Set the __is_running attribute
:param boolean: True or False
:type boolean: bool
:raise TypeError: if ``boolean`` parameter is not a :py:data:`bool` type
"""
if type(boolean) != bool:
raise TypeError(u'>boolean< parameter must be a bool type')
self.__is_running = boolean
def _get_is_running(self):
"""
Get the __is_running attribute value
:return: :py:obj:`True` if the :class:`MainLoop <GLXBob.MainLoop.MainLoop>` is running
or :py:obj:`False` if not.
:rtype: bool
"""
return self.__is_running
def _handle_curses_input(self, event):
print_something('[KEY ]-> {0}'.format(self.__event_key.get_keyname(event)))
print_something('[CHAR]-> {0}'.format(event))
if self.__event_key.get_keyname(event) == 'q':
self.get_event_bus().emit('QUIT')
def _handle_event(self):
# noinspection PyBroadException
try:
event = self._pop_last_event()
while event:
# If it have event dispatch it
Application().dispatch(event[0], event[1])
# Delete the last event inside teh event list
event = self._pop_last_event()
except:
pass
def _run(self):
self.pid = os.fork()
while self.is_running():
try:
if self.pid:
# That is the Parent Process
input_event = self.__event_key.get_ch()
if input_event != -1:
self._handle_curses_input(input_event)
else:
# That is the Children Process
# Must be the first
starting_time = self.get_timer().get_time()
# Do stuff that might take significant time here
# sleep_for = 1.0 / randint(1, randint(2, 500))
# sleep_for = 1.0 / randint(40, randint(41, 500))
# sleep_for = 1.0 / randint(1, 25)
# sleep_for = 1.0 / randint(50, 200)
# sleep(sleep_for)
# Finally
# Timer control
if self.get_timer().tick():
print_something('[ OK ]-> {1} fps, iteration take {0} sec'.format(
self.get_timer().get_time() - starting_time,
self.get_timer().get_fps()
))
else:
print_something('[ ]-> {1} fps, iteration take {0} sec'.format(
self.get_timer().get_time() - starting_time,
self.get_timer().get_fps()
))
except KeyboardInterrupt:
Signal("QUIT", KeyboardInterrupt, self.quit)
except MemoryError:
logging.info(self.__class__.__name__ + ': MemoryError Stopping ...')
self._set_is_running(False)
break