Source code for astroquery.utils.timer

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""General purpose timer related functions."""

# STDLIB
import time
import warnings
from collections import OrderedDict
from collections.abc import Iterable
from functools import partial, wraps

# THIRD-PARTY
import numpy as np

# LOCAL
from astropy import units as u
from astroquery import log
from astropy import modeling
from astropy.utils.exceptions import AstropyUserWarning

__all__ = ['timefunc', 'RunTimePredictor']
__doctest_skip__ = ['timefunc']


[docs]def timefunc(num_tries=1, verbose=True): """Decorator to time a function or method. Parameters ---------- num_tries : int, optional Number of calls to make. Timer will take the average run time. verbose : bool, optional Extra log information. Returns ------- tt : float Average run time in seconds. result Output(s) from the function. Examples -------- To add timer to time `numpy.log` for 100 times with verbose output:: import numpy as np from astroquery.utils.timer import timefunc @timefunc(100) def timed_log(x): return np.log(x) To run the decorated function above: >>> t, y = timed_log(100) INFO: timed_log took 9.29832458496e-06 s on AVERAGE for 100 call(s). [...] >>> t 9.298324584960938e-06 >>> y 4.6051701859880918 """ def real_decorator(function): @wraps(function) def wrapper(*args, **kwargs): ts = time.time() for i in range(num_tries): result = function(*args, **kwargs) te = time.time() tt = (te - ts) / num_tries if verbose: # pragma: no cover log.info('{} took {} s on AVERAGE for {} call(s).'.format( function.__name__, tt, num_tries)) return tt, result return wrapper return real_decorator
[docs]class RunTimePredictor: """Class to predict run time. .. note:: Only predict for single varying numeric input parameter. Parameters ---------- func : function Function to time. args : tuple Fixed positional argument(s) for the function. kwargs : dict Fixed keyword argument(s) for the function. Examples -------- >>> from astroquery.utils.timer import RunTimePredictor Set up a predictor for :math:`10^{x}`: >>> p = RunTimePredictor(pow, 10) Give it baseline data to use for prediction and get the function output values: >>> p.time_func(range(10, 1000, 200)) >>> for input, result in sorted(p.results.items()): ... print("pow(10, {0})\\n{1}".format(input, result)) pow(10, 10) 10000000000 pow(10, 210) 10000000000... pow(10, 410) 10000000000... pow(10, 610) 10000000000... pow(10, 810) 10000000000... Fit a straight line assuming :math:`\\text{arg}^{1}` relationship (coefficients are returned): >>> p.do_fit() # doctest: +SKIP array([1.16777420e-05, 1.00135803e-08]) Predict run time for :math:`10^{5000}`: >>> p.predict_time(5000) # doctest: +SKIP 6.174564361572262e-05 Plot the prediction: >>> p.plot(xlabeltext='Power of 10') # doctest: +SKIP .. image:: /_static/timer_prediction_pow10.png :width: 450px :alt: Example plot from `astroquery.utils.timer.RunTimePredictor` When the changing argument is not the last, e.g., :math:`x^{2}`, something like this might work: >>> p = RunTimePredictor(lambda x: pow(x, 2)) >>> p.time_func([2, 3, 5]) >>> sorted(p.results.items()) [(2, 4), (3, 9), (5, 25)] """ def __init__(self, func, *args, **kwargs): self._funcname = func.__name__ self._pfunc = partial(func, *args, **kwargs) self._cache_good = OrderedDict() self._cache_bad = [] self._cache_est = OrderedDict() self._cache_out = OrderedDict() self._fit_func = None self._power = None @property def results(self): """Function outputs from `time_func`. A dictionary mapping input arguments (fixed arguments are not included) to their respective output values. """ return self._cache_out @timefunc(num_tries=1, verbose=False) def _timed_pfunc(self, arg): """Run partial func once for single arg and time it.""" return self._pfunc(arg) def _cache_time(self, arg): """Cache timing results without repetition.""" if arg not in self._cache_good and arg not in self._cache_bad: try: result = self._timed_pfunc(arg) except Exception as e: warnings.warn(str(e), AstropyUserWarning) self._cache_bad.append(arg) else: self._cache_good[arg] = result[0] # Run time self._cache_out[arg] = result[1] # Function output
[docs] def time_func(self, arglist): """Time the partial function for a list of single args and store run time in a cache. This forms a baseline for the prediction. This also stores function outputs in `results`. Parameters ---------- arglist : list of numbers List of input arguments to time. """ if not isinstance(arglist, Iterable): arglist = [arglist] # Preserve arglist order for arg in arglist: self._cache_time(arg)
# FUTURE: Implement N^x * O(log(N)) fancy fitting.
[docs] def do_fit(self, model=None, fitter=None, power=1, min_datapoints=3): """Fit a function to the lists of arguments and their respective run time in the cache. By default, this does a linear least-square fitting to a straight line on run time w.r.t. argument values raised to the given power, and returns the optimal intercept and slope. Parameters ---------- model : `astropy.modeling.Model` Model for the expected trend of run time (Y-axis) w.r.t. :math:`\\text{arg}^{\\text{power}}` (X-axis). If `None`, will use `~astropy.modeling.polynomial.Polynomial1D` with ``degree=1``. fitter : `astropy.modeling.fitting.Fitter` Fitter for the given model to extract optimal coefficient values. If `None`, will use `~astropy.modeling.fitting.LinearLSQFitter`. power : int, optional Power of values to fit. min_datapoints : int, optional Minimum number of data points required for fitting. They can be built up with `time_func`. Returns ------- a : array-like Fitted `~astropy.modeling.FittableModel` parameters. Raises ------ ValueError Insufficient data points for fitting. ModelsError Invalid model or fitter. """ # Reset related attributes self._power = power self._cache_est = OrderedDict() x_arr = np.array(list(self._cache_good.keys())) if x_arr.size < min_datapoints: raise ValueError('requires {} points but has {}'.format( min_datapoints, x_arr.size)) if model is None: model = modeling.models.Polynomial1D(1) elif not isinstance(model, modeling.core.Model): raise modeling.fitting.ModelsError( '{} is not a model.'.format(model)) if fitter is None: fitter = modeling.fitting.LinearLSQFitter() elif not isinstance(fitter, modeling.fitting.Fitter): raise modeling.fitting.ModelsError( '{} is not a fitter.'.format(fitter)) self._fit_func = fitter( model, x_arr**power, list(self._cache_good.values())) return self._fit_func.parameters
[docs] def predict_time(self, arg): """Predict run time for given argument. If prediction is already cached, cached value is returned. Parameters ---------- arg : number Input argument to predict run time for. Returns ------- t_est : float Estimated run time for given argument. Raises ------ RuntimeError No fitted data for prediction. """ if arg in self._cache_est: t_est = self._cache_est[arg] else: if self._fit_func is None: raise RuntimeError('no fitted data for prediction') t_est = self._fit_func(arg**self._power) self._cache_est[arg] = t_est return t_est
[docs] def plot(self, xscale='linear', yscale='linear', xlabeltext='args', save_as=''): # pragma: no cover """Plot prediction. .. note:: Uses `matplotlib <http://matplotlib.org/>`_. Parameters ---------- xscale, yscale : {'linear', 'log', 'symlog'} Scaling for `matplotlib.axes.Axes`. xlabeltext : str, optional Text for X-label. save_as : str, optional Save plot as given filename. Raises ------ RuntimeError Insufficient data for plotting. """ import matplotlib.pyplot as plt # Actual data x_arr = sorted(self._cache_good) y_arr = np.array([self._cache_good[x] for x in x_arr]) if len(x_arr) <= 1: raise RuntimeError('insufficient data for plotting') # Auto-ranging qmean = y_arr.mean() * u.second for cur_u in (u.minute, u.second, u.millisecond, u.microsecond, u.nanosecond): val = qmean.to_value(cur_u) if 1000 > val >= 1: break y_arr = (y_arr * u.second).to_value(cur_u) fig, ax = plt.subplots() ax.plot(x_arr, y_arr, 'kx-', label='Actual') # Fitted data if self._fit_func is not None: x_est = list(self._cache_est.keys()) y_est = (np.array(list(self._cache_est.values())) * u.second).to_value(cur_u) ax.scatter(x_est, y_est, marker='o', c='r', label='Predicted') x_fit = np.array(sorted(x_arr + x_est)) y_fit = (self._fit_func(x_fit**self._power) * u.second).to_value(cur_u) ax.plot(x_fit, y_fit, 'b--', label='Fit') ax.set_xscale(xscale) ax.set_yscale(yscale) ax.set_xlabel(xlabeltext) ax.set_ylabel('Run time ({})'.format(cur_u.to_string())) ax.set_title(self._funcname) ax.legend(loc='best', numpoints=1) plt.draw() if save_as: plt.savefig(save_as)