# -*- test-case-name: mv3d.test.util.test_statemachine -*- # Copyright (C) 2010-2012 Mortal Coil Games # See LICENSE for details. """ A fairly basic state machine class. @author: mike """ from time import time from twisted.internet.defer import inlineCallbacks, returnValue from twisted.internet import reactor from twisted.python.log import deferr from random import random class State(object): """ A state in a state machine """ def __init__(self, stateMachine, previousState): """ Called when this state is entered """ self.startTime = stateMachine._timer() self._timer = stateMachine._timer def __str__(self): """ Convert to a string """ return self.__class__.__name__ def _getTimeInState(self): """ Return the time since startTime """ return self._timer() - self.startTime timeInState = property(_getTimeInState) @classmethod def getWeight(cls, stateMachine): """ If there's a tie between entering two or more states, a weighted random choice will be made between them. This function returns the weight value. A value of 0 will cause this state to never be picked if there are multiple choices. """ return 0 @classmethod def getDesire(cls, stateMachine): """ States can be picked based on which one has the highest desire value. This function returns the desire higher numbers being more desirable. """ return 0 class StateMachine(object): """ A fairly customizable light-weight state machine object """ _callLater = reactor.callLater #@UndefinedVariable _timer = time startTime = 0 currentState = None debug = False def __init__(self, callLater=None, timer=None): if callLater is not None: self._callLater = callLater if timer is not None: self._timer = timer self._updateCalls = [] def _getRunTime(self): """ Return the time since startTime """ return self._timer() - self.startTime runTime = property(_getRunTime) def scheduleEvent(self, inSeconds, name, *args, **kwargs): """ Schedule an event to happen later """ self._updateCalls.append(self._callLater(inSeconds, self.handleEvent, name, *args, **kwargs)) def handleEvent(self, name, *args, **kwargs): """ Call this to send an event to the current state. """ if self.debug: print self.name, "Message", name, self.currentState if self.currentState is None: return try: handler = getattr(self.currentState, "event_%s" % name) except AttributeError: return return handler(self, *args, **kwargs) @inlineCallbacks def changeState(self, newStateOrStates, *args, **kwargs): """ Change the current state """ if not isinstance(newStateOrStates, (list, tuple)): newStateOrStates = [newStateOrStates] if len(newStateOrStates) > 1: newState = self.pickBestState(newStateOrStates) else: newState = newStateOrStates[0] if self.debug: print self.name, "Change state", self.currentState, "new", newState if newState is not None and isinstance(self.currentState, newState): returnValue(None) if self.currentState is None: self.startTime = self._timer() try: if self.currentState is not None: yield self.handleEvent("exit", newState) while self._updateCalls: call = self._updateCalls.pop() if not call.called: call.cancel() oldState = self.currentState if newState is not None: self.currentState = newState(self, oldState, *args, **kwargs) else: self.currentState = None except: deferr() raise def pickBestState(self, states): """ Given a list of states, pick which one is most desirable. """ try: states = sorted(states, key=lambda state: state.getDesire(self)) for state in states[:]: if state.getDesire(self) < states[-1].getDesire(self): states.remove(state) if len(states) == 1: return states[0] totalDesire = sum([state.getWeight(self) for state in states]) roll = random() * totalDesire curDesire = 0 for state in states: weight = state.getWeight(self) if roll >= curDesire and roll < curDesire + weight: return state curDesire += weight raise ValueError("Something strange happened.") except: deferr() raise