Source code for stopping_rules

# -*- coding: utf-8 -*-
"""
Created on Fri Nov 29 12:22:12 2013

Collection of criteria for automatic stopping of RichLucy iterations.

The RichLucy.RichLucy function requires to call such a function as

Crit1,Crit2,indopt = calcCrit(SH,M,bdtilde,display,autostop)

with inputs

    SH ....... a (maxiter,len(M)) dimensional matrix of RichLucy intermediate results
    M  ....... the measured spectrum
    bdtilde .. the (discrete) line-spread function
    display .. boolean, whether to print information to screen
    autostop.. boolean, whether to actually calculate the automatic stopping
    
and outputs

    Crit1 .... a len(maxiter) vector of criterion values (e.g. rms of progress)
    Crit2 .... a len(maxiter) vector of other criterion values (e.g. curvature)
    indopt ... the determined optimal stopping
    
If anything went wrong, indopt should be equal to -1.
The stopping rules can be tested using the function Test_CalcOptFun in the 
module test_modules.py.

    
implemented stopping rules

* MaxCurv
* EME
* RMSthreshold

@author: Sascha Eichstaedt
"""

import sys
import numpy as np

min_auto_iter = 3 # smallest number of iterations for automatic stopping
min_curv_iter = 3 # smallest number of iterations for using MaxCurv criterion

[docs]def MaxCurv(SH,M,btilde,display=True,autostop=True): """ Calculate the root-mean-squared progress in the RichLucy iterations. Determine the optimal number of iterations from the curvature of the rms values. :param SH: ndarray of shape (N,K) with N the number of iterations and K the number of wavelengths :param M: ndarray of shape (K,) containing the measured spectrum values (possibly after interpolation) :param btilde: ndarray of shape (L,) containg the values of the reversed bandpass function (i.e., the line spread function) :param autostop: (optional) int, if equal to 1 then the point of maximum curvature is calculated :param display: (optional) boolean, if True then comments and intermediate results are printed to the command line :param orig_mode: (optional) boolean, if True then the optimal point is the global maximum curvature, if False then the optimal point is taken to be that local maximum with smallest error in the estimated measurement :returns: RMS, ndarray of shape (N,) containing the root-mean-squared progress in the RL estimate per iteration :returns: curv ndarray of shape (N,) containg the curvature values of the RMS as a function of iteration :returns: indopt the optimal stopping point as calculated by this criterion This criterion is published in: Eichstaedt, Schmaehling, Wuebbeler, Anhalt, Buenger, Krueger & Elster *Comparison of the Richardson-Lucy method and a classical approach for spectrometer bandwidth correction*, Metrologia vol. 50, 2013 """ from scipy.interpolate import InterpolatedUnivariateSpline from scipy.signal import argrelmax def calcEME(SHcandidates): m,n = SHcandidates.shape EstMeasDiff = np.zeros((m,)) for r in range(m): Mest = np.convolve(SHcandidates[r],btilde,'same') EstMeasDiff[r] = np.sqrt(np.sum( (Mest - M)**2 ) / len(M)) return EstMeasDiff lenSH,lM = np.shape(SH) maxiter = lenSH-1 if display: sys.stdout.write('Calculating optimal stopping iteration using curvature information\n') sys.stdout.write('number of estimated spectrum values = ' + repr(lM) + '\n') sys.stdout.write('maximum number of RichLucy iterations = ' + repr(maxiter) + '\n') RMS = np.zeros((maxiter)) if display: sys.stdout.write('Calculating root-mean-squared value of change from iteration to iteration..') for r in range(1,maxiter+1): RMS[r-1] = np.sqrt( np.mean( (SH[r,:]-SH[r-1,:])**2 ) ) if display and np.mod(np.round(maxiter/r),10)==0: sys.stdout.write('.') if display: sys.stdout.write('done.\n') if maxiter>min_curv_iter: if any(np.isnan(RMS)) or any(np.isinf(RMS)) or np.min(RMS)==np.max(RMS): if display: print "Calculation of automatic stopping by max curvature method failed." return RMS,RMS,-1 if display: sys.stdout.write('Calculating curvature of rms values.\n') iters = np.arange(1,maxiter+1) q = np.linspace(np.log10(iters[0]),np.log10(iters[-1]),len(RMS)) dq = q[1]-q[0] spline_qrms = InterpolatedUnivariateSpline(np.log10(iters),np.log10(RMS)) qrms = spline_qrms(q) if np.count_nonzero(np.isnan(qrms))>0 and display: sys.stdout.write('Warning: In calcOptCurv interpolation of rms values resulted in ' + repr(np.count_nonzero(np.isnan(qrms))) + ' NaN entries!\n') sys.stdout.write('Results may be inaccurate.\n') first_derivative = np.gradient(qrms,dq) snd_derivative = np.gradient(first_derivative,dq) curv = snd_derivative/(1+first_derivative**2)**1.5 if autostop: # calculate optimal q-value qmax = argrelmax(curv)[0] relevant_qmax = [] for qm in qmax: if qm >= np.log10(min_auto_iter): relevant_qmax.append(qm) if len(relevant_qmax)>0: indopts = [] for k in range(len(relevant_qmax)): qopt = q[curv==curv[relevant_qmax[k]]] qopt = qopt[0] # transform q-value to iteration number iopt = np.flatnonzero(np.log10(iters)<=qopt) if iopt[-1]>min_auto_iter: indopts.append(iters[iopt[-1]]) if display: sys.stdout.write("Candidates for optimal iteration are \n" + repr(indopts) + "\n") # calculate error of estimated measurement for the candidates EME = calcEME(SH[indopts,:]) if display: print "Error in estimated measurement for solution candidates are:\n " + repr(EME) + "\n" optk = np.nonzero(EME == min(EME))[0] indopt = indopts[optk] else: indopt = -1 else: indopt = maxiter else: if display: sys.stdout.write('Warning: Maximum number of iterations is too for calculation of curvature. Automatic stopping not available.\n') if autostop: indopt = -1 else: indopt = maxiter curv = np.zeros_like(RMS) if indopt < 0 and display: sys.stdout.write('Calculation of optimal stopping using the max curvature method failed.\n') elif display: sys.stdout.write('Estimated optimal stopping point is iter:' + repr(indopt) + '\n') return RMS,curv,indopt
[docs]def EEM(SH,M,bdtilde,display=True,autostop=True): """ Determine the optimal stopping by calculating the estimated measurement for each intermediate estimate and the weighted rms difference from the actual measurement. The optimal stopping point is that with the first local minimal weighted rms difference. :param SH: ndarray of shape (N,K) with N the number of iterations and K the number of wavelengths :param M: ndarray of shape (K,) containing the measured spectrum values (possibly after interpolation) :param btilde: ndarray of shape (L,) containg the values of the reversed bandpass function (i.e., the line spread function) :param autostop: (optional) int, if equal to 1 then the point of maximum curvature is calculated :param display: (optional) boolean, if True then comments and intermediate results are printed to the command line :returns: wDiff, ndarray of shape (N,) containing the weighted rms errors of the estimated measurement for each iteration :returns: wDiff (twice, to have the same input-output structure as `calcOptCurv`) :returns: indopt, int -- the index of the optimal stopping iteration This criterion is published in: Jan Audenaert et al. *Bayesian deconvolution method applied to experimental bidirectional transmittance distribution functions*, Meas. Sci. Techol. 24, 2013 """ from scipy.signal import argrelmin lenSH,lM = np.shape(SH) maxiter = lenSH-1 if lM!=len(M): raise ValueError('Dimension of estimated spectra and measured spectrum do not fit together.\n') if display: sys.stdout.write('Calculating optimal stopping iteration by calculating the estimated measurement.\n') sys.stdout.write('number of estimated spectrum values = ' + repr(lenSH) + '\n') sys.stdout.write('maximum number of RichLucy iterations = ' + repr(maxiter) + '\n') wDiff = np.zeros((maxiter)) if display: sys.stdout.write('Convolving the estimated spectra with the bandpass to calculate estimated measurement') for r in range(maxiter): Mest = np.convolve(SH[r+1],bdtilde,'same') inds = np.nonzero(Mest>1e-12) if np.count_nonzero(Mest<0)>0: raise ValueError('Estimated measurement for estimated spectrum results in negative values (Nr. '+repr(r)+')') wDiff[r] = np.sqrt( np.sum( ( ((Mest[inds]-M[inds])/Mest[inds])**2 ) )/len(inds)) if display and np.mod(r,maxiter*0.1)==0: sys.stdout.write('.') if display: sys.stdout.write('done\n') if autostop and maxiter>min_auto_iter: if any(np.isnan(wDiff)) or any(np.isinf(wDiff)) or np.min(wDiff)==np.max(wDiff): if display: print "Calculation of automatic stopping by EEM method failed." return wDiff,wDiff,-1 indopts= argrelmin(wDiff)[0] if display: sys.stdout.write('Found ' + repr(len(indopts)) + ' local minima\n') if len(indopts)>0: if indopts[0]<min_auto_iter and len(indopts)>1: indopts = indopts[1:] indopt = np.nonzero(wDiff==np.min(wDiff[indopts]))[0] else: if display: sys.stdout.write('Taking absolute minimum instead.\n') indopt = np.argmin(wDiff) else: if autostop and display: sys.stdout.write('Warning: Maximum number of iterations is too small. Automatic stopping not available.\n') indopt = maxiter if isinstance(indopt,np.ndarray): indopt = indopt[0] if indopt>0 and display: sys.stdout.write('Estimated optimal stopping point is iter:' + repr(indopt) + '\n') return wDiff,wDiff,indopt
def RMSthreshold(SH,M,bdtilde,display=True,autostop=True,tau=1e-4): """Stop the iterations once the rms difference between subsequent iterations is relatively below tau """ lenSH, lM = np.shape(SH) maxiter = lenSH-1 if lM!=len(M): raise ValueError('Dimension of estimated spectra and measured spectrum do not fit together.\n') if display: sys.stdout.write('Calculating optimal stopping iteration by using a threshold value on the RMS of change.\n') sys.stdout.write('number of estimated spectrum values = ' + repr(lenSH) + '\n') sys.stdout.write('maximum number of RichLucy iterations = ' + repr(maxiter) + '\n') sys.stdout.write('tolerance value = ' + repr(tau) + '\n') RMSDiff = np.zeros((maxiter)) relRMSDiff = np.zeros((maxiter)) if display: sys.stdout.write('Calculating the rms difference between subsequent iteration results...') for r in range(1,maxiter+1): RMSDiff[r-1] = np.sqrt( np.mean( (SH[r,:]-SH[r-1,:])**2 ) ) relRMSDiff[r-1] = RMSDiff[r-1]/np.linalg.norm(SH[r-1,:]) if display: sys.stdout.write('done\n') if autostop and maxiter>min_auto_iter: if any(np.isnan(RMSDiff)) or any(np.isinf(RMSDiff)) or np.min(RMSDiff)==np.max(RMSDiff): if display: print "Calculation of optimal stopping using the RMS threshold method failed." return RMSDiff,RMSDiff,-1 iters = np.arange(maxiter) indopts= iters[relRMSDiff<=tau] if len(indopts)>0: if indopts[0]<min_auto_iter and len(indopts)>1: indopts = indopts[1:] indopt = indopts[0] else: indopt = maxiter if display: sys.stdout.write('Relative rms between subsequent iterations was never below chosen threshold - taking maxiter as optimal iteration.\n') else: if autostop and display: sys.stdout.write('Warning: Maximum number of iterations is too small. Automatic stopping not available.\n') indopt = maxiter if indopt>0 and display: sys.stdout.write('Estimated optimal stopping point is iter:' + repr(indopt) + '\n') return RMSDiff,relRMSDiff,indopt