# -*- test-case-name: mv3d.test.util.test_debugvis -*- # Copyright (C) 2010-2012 Mortal Coil Games # See LICENSE for details. """ Debug visualization for MV3D Apps @author: mike """ import os from time import time try: import matplotlib matplotlib.use("Agg") from matplotlib import pyplot from PIL import Image except ImportError: matplotlib = None except RuntimeError: matplotlib = None from twisted.internet.task import LoopingCall from twisted.internet.defer import inlineCallbacks, returnValue from twisted.spread import pb import mv3d from mv3d.util.guide import ChangeNotifier, NotifierProperty, IntConverter from mv3d.util.profiler import theProfiler from StringIO import StringIO class StatOverTime(object): """ Tracks the value of a stat over time. """ timer = time lastTick = None currentValue = None def __init__(self, initalValue=0.0, averages=None, histories=None): self.values = {} self.history = {} self.historyCounters = {} self.currentValue = initalValue if averages is not None: for duration in averages: self[duration] if histories is not None: for interval, historyLength in histories: self.addHistory(interval, historyLength) self._tick() def __getitem__(self, timeValue): assert timeValue > 0 self._tick() try: return self.values[timeValue] except KeyError: self.values[timeValue] = self.currentValue return self.currentValue def _getValue(self): """ Returns the current instantaneous value """ self._tick() return self.currentValue def _setValue(self, value): """ Sets the current value """ self._tick() self.currentValue = value value = property(_getValue, _setValue) def _tick(self): """ Update the value of each duration that we are tracking based on the delta time since the last tick. """ ltick = self.lastTick self.lastTick = self.timer() if ltick is None: return delta = self.lastTick - ltick for duration in self.values.keys(): if delta > 0: self.values[duration] += (self.currentValue - self.values[duration]) * delta / duration for interval, historyDelta in self.historyCounters.items(): historyDelta += delta values = self.history[interval] curVal = self.values[interval] while historyDelta >= interval: historyDelta -= interval values.pop(0) values.append(curVal) self.historyCounters[interval] = historyDelta return delta def addHistory(self, interval, historyLength): """ Add a new history recorder which takes readings every interval seconds and records data for historyLength seconds. """ self.history[interval] = [self.currentValue] * int(historyLength / interval) self[interval] # make sure we have one for this interval self.historyCounters[interval] = 0 class ProfileResults(ChangeNotifier): """ Window for displaying profile results. """ resultCount = NotifierProperty("resultCount", 20, converter=IntConverter()) sortField = NotifierProperty("sortField", "totalTime") reverse = NotifierProperty("reverse", True) results = NotifierProperty("results", None) autoRefresh = NotifierProperty("autoRefresh") autoRefreshInterval = NotifierProperty("autoRefreshInterval") autoRefreshCall = None def __init__(self, parser, profiler): self.parser = parser self.profiler = profiler self.autoRefreshInterval = 0.5 self.onRefresh(None, None) def onClose(self, _item, _prop): """ Hide the window """ self.window.hide() @inlineCallbacks def onRefresh(self, _item, _prop): """ The refresh button was hit. """ self.results = yield self.profiler.top(count=self.resultCount, key=self.sortField, reverse=self.reverse) if self.autoRefresh and self.autoRefreshCall is None: self.autoRefreshCall = LoopingCall(self.onRefresh, None, None) self.autoRefreshCall.start(float(self.autoRefreshInterval)) elif not self.autoRefresh and self.autoRefreshCall is not None: self.autoRefreshCall.stop() self.autoRefreshCall = None def onReset(self, _item, _prop): """ The reset button was hit. """ return self.profiler.resetProfile() def show(self): """ Show the window """ self.window.show() class StatCall(object): """ A stat getter """ timeAccumulator = 0 def __init__(self, name, frequency, function, args, kwargs): self.name = name self.frequency = frequency self.function = function self.args = args self.kwargs = kwargs self.stat = StatOverTime(histories=[(1, 60), (10, 600), (60, 3600), (3600, 3600 * 24)]) def update(self): """ Make the call """ self.stat.value = self.function(*self.args, **self.kwargs) class StatVisualizer(ChangeNotifier, pb.Referenceable): """ Provides the data context for the stat grapher. Also holds info on all the stats to graph and schedules updates of them. """ graph = NotifierProperty("graph") autoRefresh = NotifierProperty("autoRefresh") autoRefreshInterval = NotifierProperty("autoRefreshInterval") statsToGraph = NotifierProperty("statsToGraph") timeScale = NotifierProperty("timeScale") statCall = None autoRefreshCall = None def __init__(self): self.stats = {} self.statsToGraph = [] self.timeScale = 1 self.autoRefreshInterval = 0.5 self.autoRefresh = False self.onRefresh(None, None) def _getAvailableStats(self): """ Returns all available stats. """ return self.stats.keys() availableStats = property(_getAvailableStats) def stop(self): """ Stops updating. """ if self.statCall is not None and self.statCall.running: self.statCall.stop() if self.autoRefreshCall is not None: self.autoRefreshCall.stop() def addStat(self, name, frequency, function, *args, **kwargs): """ Add a stat via a callback function that will be called on frequency. """ self.stats[name] = StatCall(name, frequency, function, args, kwargs) if self.statCall is None: self.statCall = LoopingCall(self.updateStats) self.statCall.start(frequency) else: frequencies = sorted([stat.frequency for stat in self.stats.values()]) if self.statCall.interval != frequencies[0]: self.statCall.stop() self.statCall.start(frequencies[0]) def updateStats(self): """ Update all the stats """ for stat in self.stats.values(): stat.timeAccumulator += self.statCall.interval while stat.timeAccumulator >= stat.frequency: stat.timeAccumulator -= stat.frequency stat.update() def onRefresh(self, _item, _prop): """ The refresh button was hit. """ if matplotlib is None: return fig = pyplot.figure() ax = fig.add_subplot(111) fig.set_size_inches((5, 3)) for stat in self.statsToGraph: ax.plot(self.stats[stat].stat.history[int(self.timeScale)]) fig.canvas.draw() imgdata = StringIO() fig.savefig(imgdata, format="png") imgdata.seek(0) self.graph = Image.open(imgdata) if self.autoRefresh and self.autoRefreshCall is None: self.autoRefreshCall = LoopingCall(self.onRefresh, None, None) self.autoRefreshCall.start(float(self.autoRefreshInterval)) elif not self.autoRefresh and self.autoRefreshCall is not None: self.autoRefreshCall.stop() self.autoRefreshCall = None def remote_getStatList(self): """ Return the list of stat names """ return self.availableStats def remote_getStatData(self, statsToGraph, timeScale): """ Return data needed to graph the stats. """ return dict(zip(statsToGraph, [self.stats[stat].stat.history[timeScale] for stat in statsToGraph])) class RemoteStatVisualizer(pb.Referenceable, ChangeNotifier): """ Takes a remote reference to a stat visualizer and provides a datacontext suitable for the ui """ graph = NotifierProperty("graph") autoRefresh = NotifierProperty("autoRefresh") autoRefreshInterval = NotifierProperty("autoRefreshInterval") statsToGraph = NotifierProperty("statsToGraph") timeScale = NotifierProperty("timeScale") availableStats = NotifierProperty("availableStats") statCall = None autoRefreshCall = None def __init__(self, remoteVisualizer): self.remoteVisualizer = remoteVisualizer self.availableStats = {} self.statsToGraph = [] self.timeScale = 1 self.autoRefreshInterval = 0.5 self.autoRefresh = False self.onRefresh(None, None) self.getAvailableStats() @inlineCallbacks def getAvailableStats(self): """ Retrieves the available stat list """ try: self.availableStats = yield self.remoteVisualizer.callRemote( "getStatList") except pb.PBConnectionLost: self.availableStats = None returnValue(self.availableStats) @inlineCallbacks def onRefresh(self, _item, _prop): """ The refresh button was hit. """ if matplotlib is None: return fig = pyplot.figure() ax = fig.add_subplot(111) fig.set_size_inches((5, 3)) try: stats = yield self.remoteVisualizer.callRemote("getStatData", self.statsToGraph, self.timeScale) except pb.PBConnectionLost: returnValue(None) for _statName, statValues in stats.items(): ax.plot(statValues) fig.canvas.draw() imgdata = StringIO() fig.savefig(imgdata, format="png") imgdata.seek(0) self.graph = Image.open(imgdata) if self.autoRefresh and self.autoRefreshCall is None: self.autoRefreshCall = LoopingCall(self.onRefresh, None, None) self.autoRefreshCall.start(float(self.autoRefreshInterval)) elif not self.autoRefresh and self.autoRefreshCall is not None: self.autoRefreshCall.stop() self.autoRefreshCall = None def remote_getStatList(self): """ Return the list of stat names """ return self.getAvailableStats() def remote_getStatData(self, statsToGraph, timeScale): """ Return data needed to graph the stats. """ return self.remoteVisualizer.callRemote("getStatData", statsToGraph, timeScale) class DebugVisualizer(ChangeNotifier): """ The main debug window. """ conductor = NotifierProperty("conductor") profiler = NotifierProperty("profiler") stats = NotifierProperty("stats") def __init__(self, parser, conductor, profiler=theProfiler): self.parser = parser self.conductor = conductor self.profiler = ProfileResults(parser, profiler) self.stats = conductor.statVisualizer self.window = parser.load(os.path.join(os.path.abspath(os.path.dirname( mv3d.__file__)), "..", "templates", "guide", "debugvis.xml"), self) def show(self): """ Show the main debug visualization window """ self.window.show() def onClose(self, _obj, _prop): """ Hide the main debug visualization window. """ self.window.hide()